Resource provider — Dependency Injector 4.47.0a3 documentation (original) (raw)

Resource provider provides a component with initialization and shutdown.

import sys import logging from concurrent.futures import ThreadPoolExecutor

from dependency_injector import containers, providers

def init_thread_pool(max_workers: int): thread_pool = ThreadPoolExecutor(max_workers=max_workers) yield thread_pool thread_pool.shutdown(wait=True)

class Container(containers.DeclarativeContainer):

config = providers.Configuration()

thread_pool = providers.Resource(
    init_thread_pool,
    max_workers=config.max_workers,
)

logging = providers.Resource(
    logging.basicConfig,
    level=logging.INFO,
    stream=sys.stdout,
)

if name == "main": container = Container(config={"max_workers": 4})

container.init_resources()

logging.info("Resources are initialized")
thread_pool = container.thread_pool()
thread_pool.map(print, range(10))

container.shutdown_resources()

Resource providers help to initialize and configure logging, event loop, thread or process pool, etc.

Resource provider is similar to Singleton. Resource initialization happens only once. You can make injections and use provided instance the same way like you do with any other provider.

class Container(containers.DeclarativeContainer):

config = providers.Configuration()

thread_pool = providers.Resource(
    init_thread_pool,
    max_workers=config.max_workers,
)

dispatcher = providers.Factory(
    TaskDispatcher,
    executor=thread_pool,
)

Container has an interface to initialize and shutdown all resources at once:

container = Container() container.init_resources() container.shutdown_resources()

You can also initialize and shutdown resources one-by-one using init() andshutdown() methods of the provider:

container = Container() container.thread_pool.init() container.thread_pool.shutdown()

When you call .shutdown() method on a resource provider, it will remove the reference to the initialized resource, if any, and switch to uninitialized state. Some of resource initializer types support specifying custom resource shutdown.

Resource provider supports 3 types of initializers:

Function initializer

Function is the most common way to specify resource initialization:

def init_resource(argument1=..., argument2=...): return SomeResource()

class Container(containers.DeclarativeContainer):

resource = providers.Resource(
    init_resource,
    argument1=...,
    argument2=...,
)

Function initializer may not return a value. This often happens when you configure global resource:

import logging.config

class Container(containers.DeclarativeContainer):

configure_logging = providers.Resource(
    logging.config.fileConfig,
    fname="logging.ini",
)

Function initializer does not provide a way to specify custom resource shutdown.

Generator initializer

Resource provider can use 2-step generators:

def init_resource(argument1=..., argument2=...): resource = SomeResource() # initialization

yield resource

# shutdown
...

class Container(containers.DeclarativeContainer):

resource = providers.Resource(
    init_resource,
    argument1=...,
    argument2=...,
)

Generator initialization phase ends on the first yield statement. You can return a resource object using yield resource like in the example above. Returning of the object is not mandatory. You can leave yield statement empty:

def init_resource(argument1=..., argument2=...): # initialization ...

yield

# shutdown
...

class Container(containers.DeclarativeContainer):

resource = providers.Resource(
    init_resource,
    argument1=...,
    argument2=...,
)

Subclass initializer

You can create resource initializer by implementing a subclass of the resources.Resource:

from dependency_injector import resources

class MyResource(resources.Resource):

def init(self, argument1=..., argument2=...) -> SomeResource:
    return SomeResource()

def shutdown(self, resource: SomeResource) -> None:
    # shutdown
    ...

class Container(containers.DeclarativeContainer):

resource = providers.Resource(
    MyResource,
    argument1=...,
    argument2=...,
)

Subclass must implement two methods: init() and shutdown().

Method init() receives arguments specified in resource provider. It performs initialization and returns resource object. Returning of the object is not mandatory.

Method shutdown() receives resource object returned from init(). If init()didn’t return an object shutdown() method will be called anyway with None as a first argument.

from dependency_injector import resources

class MyResource(resources.Resource):

def init(self, argument1=..., argument2=...) -> None:
    # initialization
    ...

def shutdown(self, _: None) -> None:
    # shutdown
    ...

Resources, wiring, and per-function execution scope

You can compound Resource provider with Wiring to implement per-function execution scope. For doing this you need to use additional Closing marker fromwiring module.

from dependency_injector import containers, providers from dependency_injector.wiring import Closing, Provide, inject from flask import Flask, current_app

class Service: ...

def init_service() -> Service: print("Init service") yield Service() print("Shutdown service")

class Container(containers.DeclarativeContainer):

service = providers.Resource(init_service)

@inject def index_view(service: Service = Closing[Provide[Container.service]]): assert service is current_app.container.service() return "Hello World!"

container = Container() container.wire(modules=[name])

app = Flask(name) app.container = container app.add_url_rule("/", "index", view_func=index_view)

if name == "main": app.run()

Framework initializes and injects the resource into the function. With the Closing marker framework calls resource shutdown() method when function execution is over.

The example above produces next output:

Init service Shutdown service 127.0.0.1 - - [29/Oct/2020 22:39:40] "GET / HTTP/1.1" 200 - Init service Shutdown service 127.0.0.1 - - [29/Oct/2020 22:39:41] "GET / HTTP/1.1" 200 - Init service Shutdown service 127.0.0.1 - - [29/Oct/2020 22:39:41] "GET / HTTP/1.1" 200 -

Asynchronous initializers

When you write an asynchronous application, you might need to initialize resources asynchronously. Resource provider supports asynchronous initialization and shutdown.

Asynchronous function initializer:

async def init_async_resource(argument1=..., argument2=...): return await connect()

class Container(containers.DeclarativeContainer):

resource = providers.Resource(
    init_resource,
    argument1=...,
    argument2=...,
)

Asynchronous generator initializer:

async def init_async_resource(argument1=..., argument2=...): connection = await connect() yield connection await connection.close()

class Container(containers.DeclarativeContainer):

resource = providers.Resource(
    init_async_resource,
    argument1=...,
    argument2=...,
)

Asynchronous subclass initializer:

from dependency_injector import resources

class AsyncConnection(resources.AsyncResource):

async def init(self, argument1=..., argument2=...):
    yield await connect()

async def shutdown(self, connection):
    await connection.close()

class Container(containers.DeclarativeContainer):

resource = providers.Resource(
    AsyncConnection,
    argument1=...,
    argument2=...,
)

When you use resource provider with asynchronous initializer you need to call its __call__(),init(), and shutdown() methods asynchronously:

import asyncio

class Container(containers.DeclarativeContainer):

connection = providers.Resource(init_async_connection)

async def main(): container = Container() connection = await container.connection() connection = await container.connection.init() connection = await container.connection.shutdown()

if name == "main": asyncio.run(main())

Container init_resources() and shutdown_resources() methods should be used asynchronously if there is at least one asynchronous resource provider:

import asyncio

class Container(containers.DeclarativeContainer):

connection1 = providers.Resource(init_async_connection)

connection2 = providers.Resource(init_sync_connection)

async def main(): container = Container() await container.init_resources() await container.shutdown_resources()

if name == "main": asyncio.run(main())

See also: