Learning asyncio: how to use it correctly with other context managers (original) (raw)

September 7, 2023, 9:10am 1

I am trying to design a snippet of code to learn asyncio along with the Azure SDK for Python for its Blob Storage. To date, I created a simple class to manage Azure resources and access, exploiting also asyncio in order to download multiple blobs (files) faster.

My concern is that when I create an instance of this class, the container_client is created and then in the main it is managed with the with async with BSA.container_client: line, e.g. the resource is closed after. What if I wish to call again some function depending on that resource?

How can I move the creation of the resource from the __init__ function and its management with the with keyword in the main function inside the download_blobs_async function?

import asyncio
import logging
from io import BytesIO
from azure.storage.blob.aio import ContainerClient
from azure.core.exceptions import ResourceNotFoundError

logger = logging.getLogger('app')
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)

class BlobStorageAsync:
    def __init__(self, connection_string, container_name):
        self.connection_string = connection_string
        self.container_name = container_name
        container_client = ContainerClient.from_connection_string(
            conn_str=connection_string,
            container_name=container_name,
        )
        self.container_client = container_client

    async def download_blob_async(self, blob_name: str) -> bytes:
        try:
            blob_client = self.container_client.get_blob_client(blob=blob_name)
            async with blob_client:
                stream = await blob_client.download_blob()
                data = await stream.readall()
            logger.info("The file %s was downloaded", blob_name)
            return BytesIO(data)
        except ResourceNotFoundError:
            logger.info("The file %s was not found", blob_name)
            return None

    async def download_blobs_async(self, blobs_list):
         #task = tg.create_task(self.download_blob_async(blob_name))
         tasks = []
         for blob_name in blobs_list:
             task = asyncio.create_task(self.download_blob_async(blob_name))
             tasks.append(task)
         results = await asyncio.gather(*tasks)
         return results

async def main():
    connection_string = ...
    container_name = ...
    blobs_list = ['one.parquet', 'two.parquet', 'three.parquet']
    BSA = BlobStorageAsync(connection_string, container_name)
    async with BSA.container_client:
        results = await BSA.download_blobs_async(blobs_list)
    results = [result for result in results if result is not None] # filter only for data that exist

if __name__ == '__main__':
    asyncio.run(main())

Eneg (Eneg) February 29, 2024, 12:23am 2

Make BlobStorageAsync accept ContainerClient from outside
Instantiate ContainerClient in main and enter its context
Then so long as you’re within the context you can instantiate BSA and call its methods at will

lmaurelli (Luca Maurelli) February 29, 2024, 7:22am 3

In the end I chose to save the container names and their “url/connection” on a data structure and instantiate container clients on-demand inside the async functions retrieving those inputs when needed.