Building a Modern Python API with Azure Cosmos DB: A 5-Part Video Series - Azure Cosmos DB Blog (original) (raw)
I’m excited to share our new video series where I walk through building a production-ready inventory management API using Python, FastAPI, and Azure Cosmos DB NoSQL. This project demonstrates modern async patterns, clean architecture, and enterprise-grade features like batch operations and optimistic concurrency control.
This builds on our Getting Started Series, check out those videos to grasp the fundamentals first.
Video 1: Data Modeling with Pydantic
In the first video, I dive deep into data modeling using Pydantic v2, showing how to create robust, type-safe models with built-in validation. The key insight is using model inheritance to separate concerns between what clients send and what the database returns.
class Product(BaseModel):
"""Fields that are intrinsic to a product"""
name: Annotated[str, Field(min_length=1, max_length=255)]
description: Annotated[Optional[str], Field(None, max_length=1000)]
category: Annotated[str, Field(min_length=1, max_length=100)]
price: Annotated[Decimal, Field(gt=0, decimal_places=2)]
sku: Annotated[str, Field(min_length=1, max_length=50, pattern=r"^[A-Z0-9-]+$")]
quantity: Annotated[int, Field(ge=0)] = 0
status: ProductStatus = ProductStatus.ACTIVE
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
class ProductResponse(Product):
"""All product fields plus system-generated fields."""
id: str
etag: str = Field(alias="_etag") # Cosmos DB concurrency control token
last_updated: datetime
The model hierarchy ensures that input validation happens automatically, while response models include system-generated fields like ETags for optimistic concurrency control.
Video 2: Client Configuration and Connection Management
The second video covers setting up a production-ready Azure Cosmos DB client with proper authentication, connection pooling, and singleton pattern implementation. I show how to use Azure’s managed identity for secure, credential-free authentication.
async def _ensure_client() -> CosmosClient:
"""
Ensures a single CosmosClient instance exists for the lifetime of the application.
"""
global _client, _credential
if _client is None:
try:
logger.info("Initializing Cosmos DB client with DefaultAzureCredential")
_credential = DefaultAzureCredential()
client_options = {
"connection_timeout": 60,
}
_client = CosmosClient(COSMOSDB_ENDPOINT, _credential, **client_options)
logger.info(
"Cosmos DB client initialized successfully",
extra={
"endpoint": COSMOSDB_ENDPOINT,
"auth_method": "DefaultAzureCredential",
},
)
except Exception as e:
logger.error(f"Failed to initialize Cosmos DB client: {e}")
raise
return _client
This pattern ensures efficient resource usage while maintaining thread safety in async environments.
Video 3: Async Operations Done Right
In the third video, I demonstrate how to leverage FastAPI’s async capabilities with Azure Cosmos DB’s async SDK. The combination of async/await patterns with dependency injection creates clean, testable code.
async def update_product(
container: ContainerProxy,
identifier: VersionedProductIdentifier,
updates: ProductUpdate
) -> ProductResponse:
"""Update an existing product with optimistic concurrency control."""
update_dict = updates.model_dump(exclude_unset=True)
normalized_category = normalize_category(identifier.category)
update_dict["last_updated"] = datetime.now(timezone.utc).isoformat()
# Create patch operations for partial updates
patch_operations = []
for key, value in update_dict.items():
if key not in ["id", "category", "_etag"]:
patch_operations.append(
{"op": "set", "path": f"/{key}", "value": value}
)
try:
result = await container.patch_item(
item=identifier.id,
partition_key=normalized_category,
patch_operations=patch_operations,
headers={"if-match": identifier.etag}, # ETag for concurrency control
)
return ProductResponse.model_validate(result)
except Exception as e:
handle_cosmos_error(e, operation="update", product_id=identifier.id)
The use of patch operations enables efficient partial updates while ETags prevent concurrent modification conflicts.
Video 4: Batch Operations for Performance
The fourth video explores batch operations, showing how to process multiple items efficiently using Azure Cosmos DB’s batch API and Python’s asyncio for concurrent execution across partitions.
async def _execute_batch_by_category(
items_by_category: dict[str, list[T]],
process_func: Callable[[str, list[T]], Any],
result_extractor: Callable[[Any], list[R]] | None = None
) -> list[R] | list[Any]:
"""
Generic function to execute batch operations grouped by category
with concurrent processing.
"""
# Schedule tasks to run concurrently for each category
tasks = [
asyncio.create_task(process_func(category, items))
for category, items in items_by_category.items()
]
# Wait for all tasks to complete and gather results
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for exceptions and collect successful results
all_results = []
exceptions = []
for result in results:
if isinstance(result, Exception):
exceptions.append(result)
else:
if result_extractor and result is not None:
all_results.extend(result_extractor(result))
if exceptions:
raise exceptions[0]
return all_results
This generic approach allows efficient batch processing while respecting Azure Cosmos DB’s partition key requirements.
Video 5: Centralized Error Handling and Logging
The final video demonstrates how to implement centralized error handling that transforms low-level database errors into meaningful application exceptions, complete with structured logging for observability.
def handle_cosmos_error(e: Exception, operation: str, **context: Any) -> None:
"""Convert Cosmos DB exceptions to application-specific exceptions."""
if isinstance(e, CosmosBatchOperationError):
status_code = getattr(e, 'status_code', None)
if status_code == 409:
error_message = str(e).lower()
if "unique index constraint" in error_message and "/sku" in error_message:
sku = context.get("sku", "unknown")
raise ProductDuplicateSKUError(
f"Product with SKU '{sku}' already exists"
) from e
else:
# Handle ID conflicts - prefer showing SKU if available
sku = context.get("sku")
if sku:
raise ProductAlreadyExistsError(
f"Product with SKU '{sku}' already exists"
) from e
@app.exception_handler(ApplicationError)
async def application_error_handler(request: Request, exc: ApplicationError) -> JSONResponse:
"""Handle application-specific errors with structured logging."""
logger = logging.getLogger(request.scope.get("route").endpoint.__module__)
# Log at appropriate level based on status code
log_level = "warning" if exc.status_code < 500 else "error"
extra = {"error_type": type(exc).__name__}
getattr(logger, log_level)(str(exc), extra=extra, exc_info=exc)
This approach ensures that errors are handled consistently across the application while providing meaningful feedback to API consumers.
Conclusion
This project demonstrates how modern Python features combined with Azure Cosmos DB can create a robust, scalable API. Key takeaways include:
- Leveraging Pydantic for strong typing and validation
- Implementing proper async patterns for optimal performance
- Using batch operations to minimize round trips to the database
- Building comprehensive error handling for production reliability
- Maintaining clean architecture with clear separation of concerns
The complete source code is available on GitHub, and I encourage you to watch the video series for detailed explanations. Whether you’re building a new API or modernizing an existing one, these patterns will help you create more maintainable and performant applications. Happy coding!
Leave a review
Tell us about your Azure Cosmos DB experience! Leave a review on PeerSpot and we’ll gift you $50. Get started here.
About Azure Cosmos DB
Azure Cosmos DB is a fully managed and serverless NoSQL and vector database for modern app development, including AI applications. With its SLA-backed speed and availability as well as instant dynamic scalability, it is ideal for real-time NoSQL and MongoDB applications that require high performance and distributed computing over massive volumes of NoSQL and vector data.
To stay in the loop on Azure Cosmos DB updates, follow us on X, YouTube, and LinkedIn.
To quickly build your first database, watch our Get Started videos on YouTube and explore ways to dev/test free.