Learning asyncio: how to use it correctly with other context managers (original) (raw)
September 7, 2023, 9:10am 1
I am trying to design a snippet of code to learn asyncio along with the Azure SDK for Python for its Blob Storage. To date, I created a simple class to manage Azure resources and access, exploiting also asyncio in order to download multiple blobs (files) faster.
My concern is that when I create an instance of this class, the container_client
is created and then in the main
it is managed with the with async with BSA.container_client:
line, e.g. the resource is closed after. What if I wish to call again some function depending on that resource?
How can I move the creation of the resource from the __init__
function and its management with the with
keyword in the main
function inside the download_blobs_async
function?
import asyncio
import logging
from io import BytesIO
from azure.storage.blob.aio import ContainerClient
from azure.core.exceptions import ResourceNotFoundError
logger = logging.getLogger('app')
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
class BlobStorageAsync:
def __init__(self, connection_string, container_name):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
)
self.container_client = container_client
async def download_blob_async(self, blob_name: str) -> bytes:
try:
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall()
logger.info("The file %s was downloaded", blob_name)
return BytesIO(data)
except ResourceNotFoundError:
logger.info("The file %s was not found", blob_name)
return None
async def download_blobs_async(self, blobs_list):
#task = tg.create_task(self.download_blob_async(blob_name))
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def main():
connection_string = ...
container_name = ...
blobs_list = ['one.parquet', 'two.parquet', 'three.parquet']
BSA = BlobStorageAsync(connection_string, container_name)
async with BSA.container_client:
results = await BSA.download_blobs_async(blobs_list)
results = [result for result in results if result is not None] # filter only for data that exist
if __name__ == '__main__':
asyncio.run(main())
Eneg (Eneg) February 29, 2024, 12:23am 2
Make BlobStorageAsync
accept ContainerClient
from outside
Instantiate ContainerClient
in main and enter its context
Then so long as you’re within the context you can instantiate BSA and call its methods at will
lmaurelli (Luca Maurelli) February 29, 2024, 7:22am 3
In the end I chose to save the container names and their “url/connection” on a data structure and instantiate container clients on-demand inside the async functions retrieving those inputs when needed.