Powertools for AWS Lambda (Python) (original) (raw)
Tracer
Tracer is an opinionated thin wrapper for AWS X-Ray Python SDK.
Key features¶
- Auto capture cold start as annotation, and responses or full exceptions as metadata
- Auto-disable when not running in AWS Lambda environment
- Support tracing async methods, generators, and context managers
- Auto patch supported modules by AWS X-Ray
Getting started¶
Tip
All examples shared in this documentation are available within the project repository.
Install¶
This is not necessary if you're installing Powertools for AWS Lambda (Python) via Lambda Layer/SAR
Add aws-lambda-powertools[tracer]
as a dependency in your preferred tool: e.g., requirements.txt, pyproject.toml. This will ensure you have the required dependencies before using Tracer.
Permissions¶
Before your use this utility, your AWS Lambda function must have permissions to send traces to AWS X-Ray.
AWS Serverless Application Model (SAM) example | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: Powertools for AWS Lambda (Python) version Globals: Function: Timeout: 5 Runtime: python3.12 Tracing: Active Environment: Variables: POWERTOOLS_SERVICE_NAME: payment Layers: # Find the latest Layer version in the official documentation # https://docs.powertools.aws.dev/lambda/python/latest/#lambda-layer - !Sub arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPythonV3-python312-x86_64:16 Resources: CaptureLambdaHandlerExample: Type: AWS::Serverless::Function Properties: CodeUri: ../src Handler: capture_lambda_handler.handler |
Lambda handler¶
You can quickly start by initializing Tracer
and use capture_lambda_handler
decorator for your Lambda handler.
Tracing Lambda handler with capture_lambda_handler | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() # Sets service via POWERTOOLS_SERVICE_NAME env var # OR tracer = Tracer(service="example") def collect_payment(charge_id: str) -> str: return f"dummy payment collected for charge: {charge_id}" @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") return collect_payment(charge_id=charge_id) |
capture_lambda_handler
performs these additional tasks to ease operations:
- Creates a
ColdStart
annotation to easily filter traces that have had an initialization overhead - Creates a
Service
annotation ifservice
parameter orPOWERTOOLS_SERVICE_NAME
is set - Captures any response, or full exceptions generated by the handler, and include as tracing metadata
Annotations & Metadata¶
Annotations are key-values associated with traces and indexed by AWS X-Ray. You can use them to filter traces and to create Trace Groups to slice and dice your transactions.
Adding annotations with put_annotation method | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() def collect_payment(charge_id: str) -> str: tracer.put_annotation(key="PaymentId", value=charge_id) return f"dummy payment collected for charge: {charge_id}" @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") return collect_payment(charge_id=charge_id) |
Metadata are key-values also associated with traces but not indexed by AWS X-Ray. You can use them to add additional context for an operation using any native object.
Adding arbitrary metadata with put_metadata method | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() def collect_payment(charge_id: str) -> str: return f"dummy payment collected for charge: {charge_id}" @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> str: payment_context = { "charge_id": event.get("charge_id", ""), "merchant_id": event.get("merchant_id", ""), "request_id": context.aws_request_id, } payment_context["receipt_id"] = collect_payment(charge_id=payment_context["charge_id"]) tracer.put_metadata(key="payment_response", value=payment_context) return payment_context["receipt_id"] |
Synchronous functions¶
You can trace synchronous functions using the capture_method
decorator.
Tracing an arbitrary function with capture_method | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() @tracer.capture_method def collect_payment(charge_id: str) -> str: tracer.put_annotation(key="PaymentId", value=charge_id) return f"dummy payment collected for charge: {charge_id}" @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") return collect_payment(charge_id=charge_id) |
Note: Function responses are auto-captured and stored as JSON, by default.
Use capture_response parameter to override this behaviour.
The serialization is performed by aws-xray-sdk via jsonpickle
module. This can cause side effects for file-like objects like boto S3 StreamingBody, where its response will be read only once during serialization.
Asynchronous and generator functions¶
Warning
We do not support asynchronous Lambda handler
You can trace asynchronous functions and generator functions (including context managers) using capture_method
.
capture_method_async.pycapture_method_context_manager.pycapture_method_generators.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import asyncio from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() @tracer.capture_method async def collect_payment(charge_id: str) -> str: tracer.put_annotation(key="PaymentId", value=charge_id) await asyncio.sleep(0.5) return f"dummy payment collected for charge: {charge_id}" @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") return asyncio.run(collect_payment(charge_id=charge_id)) |
---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | import contextlib from collections.abc import Generator from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() logger = Logger() @contextlib.contextmanager @tracer.capture_method def collect_payment(charge_id: str) -> Generator[str, None, None]: try: yield f"dummy payment collected for charge: {charge_id}" finally: tracer.put_annotation(key="PaymentId", value=charge_id) @tracer.capture_lambda_handler @logger.inject_lambda_context def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") with collect_payment(charge_id=charge_id) as receipt_id: logger.info(f"Processing payment collection for charge {charge_id} with receipt {receipt_id}") return receipt_id |
---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | from collections.abc import Generator from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() @tracer.capture_method def collect_payment(charge_id: str) -> Generator[str, None, None]: yield f"dummy payment collected for charge: {charge_id}" @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") return next(collect_payment(charge_id=charge_id)) |
---|
Environment variables¶
The following environment variables are available to configure Tracer at a global scope:
Setting | Description | Environment variable | Default |
---|---|---|---|
Disable Tracing | Explicitly disables all tracing. | POWERTOOLS_TRACE_DISABLED | false |
Response Capture | Captures Lambda or method return as metadata. | POWERTOOLS_TRACER_CAPTURE_RESPONSE | true |
Exception Capture | Captures Lambda or method exception as metadata. | POWERTOOLS_TRACER_CAPTURE_ERROR | true |
Both POWERTOOLS_TRACER_CAPTURE_RESPONSE and POWERTOOLS_TRACER_CAPTURE_ERROR can be set on a per-method basis, consequently overriding the environment variable value.
Advanced¶
Patching modules¶
Tracer automatically patches all supported libraries by X-Ray during initialization, by default. Underneath, AWS X-Ray SDK checks whether a supported library has been imported before patching.
If you're looking to shave a few microseconds, or milliseconds depending on your function memory configuration, you can patch specific modules using patch_modules
param:
Example of explicitly patching requests only | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import requests from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext MODULES = ["requests"] tracer = Tracer(patch_modules=MODULES) @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> str: ret = requests.get("https://httpbin.org/get") ret.raise_for_status() return ret.json() |
Disabling response auto-capture¶
Use capture_response=False
parameter in both capture_lambda_handler
and capture_method
decorators to instruct Tracer not to serialize function responses as metadata.
Info: This is useful in three common scenarios
- You might return sensitive information you don't want it to be added to your traces
- You might manipulate streaming objects that can be read only once; this prevents subsequent calls from being empty
- You might return more than 64K of data e.g.,
message too long
error
disable_capture_response.pydisable_capture_response_streaming_body.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() logger = Logger() @tracer.capture_method(capture_response=False) def collect_payment(charge_id: str) -> str: tracer.put_annotation(key="PaymentId", value=charge_id) logger.debug("Returning sensitive information....") return f"dummy payment collected for charge: {charge_id}" @tracer.capture_lambda_handler(capture_response=False) def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") return collect_payment(charge_id=charge_id) |
---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | import os import boto3 from botocore.response import StreamingBody from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.typing import LambdaContext BUCKET = os.getenv("BUCKET_NAME", "") REPORT_KEY = os.getenv("REPORT_KEY", "") tracer = Tracer() logger = Logger() session = boto3.session.Session() s3 = session.client("s3") @tracer.capture_method(capture_response=False) def fetch_payment_report(payment_id: str) -> StreamingBody: ret = s3.get_object(Bucket=BUCKET, Key=f"{REPORT_KEY}/{payment_id}") logger.debug("Returning streaming body from S3 object....") return ret["Body"] @tracer.capture_lambda_handler(capture_response=False) def lambda_handler(event: dict, context: LambdaContext) -> str: payment_id = event.get("payment_id", "") report = fetch_payment_report(payment_id=payment_id) return report.read().decode() |
---|
Disabling exception auto-capture¶
Use capture_error=False
parameter in both capture_lambda_handler
and capture_method
decorators to instruct Tracer not to serialize exceptions as metadata.
Info
Useful when returning sensitive information in exceptions/stack traces you don't control
Disabling exception auto-capture for tracing metadata | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | import os import requests from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() ENDPOINT = os.getenv("PAYMENT_API", "") class PaymentError(Exception): ... @tracer.capture_method(capture_error=False) def collect_payment(charge_id: str) -> dict: try: ret = requests.post(url=f"{ENDPOINT}/collect", data={"charge_id": charge_id}) ret.raise_for_status() return ret.json() except requests.HTTPError as e: raise PaymentError(f"Unable to collect payment for charge {charge_id}") from e @tracer.capture_lambda_handler(capture_error=False) def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") ret = collect_payment(charge_id=charge_id) return ret.get("receipt_id", "") |
Ignoring certain HTTP endpoints¶
You might have endpoints you don't want requests to be traced, perhaps due to the volume of calls or sensitive URLs.
You can use ignore_endpoint
method with the hostname and/or URLs you'd like it to be ignored - globs (*
) are allowed.
Ignoring certain HTTP endpoints from being traced | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import os import requests from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext ENDPOINT = os.getenv("PAYMENT_API", "") IGNORE_URLS = ["/collect", "/refund"] tracer = Tracer() tracer.ignore_endpoint(hostname=ENDPOINT, urls=IGNORE_URLS) tracer.ignore_endpoint(hostname=f"*.{ENDPOINT}", urls=IGNORE_URLS) # `.ENDPOINT` class PaymentError(Exception): ... @tracer.capture_method(capture_error=False) def collect_payment(charge_id: str) -> dict: try: ret = requests.post(url=f"{ENDPOINT}/collect", data={"charge_id": charge_id}) ret.raise_for_status() return ret.json() except requests.HTTPError as e: raise PaymentError(f"Unable to collect payment for charge {charge_id}") from e @tracer.capture_lambda_handler(capture_error=False) def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") ret = collect_payment(charge_id=charge_id) return ret.get("receipt_id", "") |
Tracing aiohttp requests¶
Info
This snippet assumes you have aiohttp as a dependency
You can use aiohttp_trace_config
function to create a valid aiohttp trace_config object. This is necessary since X-Ray utilizes aiohttp trace hooks to capture requests end-to-end.
Tracing aiohttp requests | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import asyncio import os import aiohttp from aws_lambda_powertools import Tracer from aws_lambda_powertools.tracing import aiohttp_trace_config from aws_lambda_powertools.utilities.typing import LambdaContext ENDPOINT = os.getenv("PAYMENT_API", "") tracer = Tracer() @tracer.capture_method async def collect_payment(charge_id: str) -> dict: async with aiohttp.ClientSession(trace_configs=[aiohttp_trace_config()]) as session: async with session.get(f"{ENDPOINT}/collect") as resp: return await resp.json() @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> dict: charge_id = event.get("charge_id", "") return asyncio.run(collect_payment(charge_id=charge_id)) |
Escape hatch mechanism¶
You can use tracer.provider
attribute to access all methods provided by AWS X-Ray xray_recorder
object.
This is useful when you need a feature available in X-Ray that is not available in the Tracer utility, for example thread-safe, or context managers.
Tracing a code block with in_subsegment escape hatch | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() def collect_payment(charge_id: str) -> str: return f"dummy payment collected for charge: {charge_id}" @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") with tracer.provider.in_subsegment("## collect_payment") as subsegment: subsegment.put_annotation(key="PaymentId", value=charge_id) ret = collect_payment(charge_id=charge_id) subsegment.put_metadata(key="payment_response", value=ret) return ret |
Concurrent asynchronous functions¶
Warning
X-Ray SDK will raise an exception when async functions are run and traced concurrently
A safe workaround mechanism is to use in_subsegment_async
available via Tracer escape hatch (tracer.provider
).
Workaround to safely trace async concurrent functions | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | import asyncio from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() async def another_async_task(): async with tracer.provider.in_subsegment_async("## another_async_task") as subsegment: subsegment.put_annotation(key="key", value="value") subsegment.put_metadata(key="key", value="value", namespace="namespace") ... async def another_async_task_2(): async with tracer.provider.in_subsegment_async("## another_async_task_2") as subsegment: subsegment.put_annotation(key="key", value="value") subsegment.put_metadata(key="key", value="value", namespace="namespace") ... async def collect_payment(charge_id: str) -> str: await asyncio.gather(another_async_task(), another_async_task_2()) return f"dummy payment collected for charge: {charge_id}" @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") return asyncio.run(collect_payment(charge_id=charge_id)) |
Reusing Tracer across your code¶
Tracer keeps a copy of its configuration after the first initialization. This is useful for scenarios where you want to use Tracer in more than one location across your code base.
Warning: Import order matters when using Lambda Layers or multiple modules
Do not set auto_patch=False
when reusing Tracer in Lambda Layers, or in multiple modules.
This can result in the first Tracer config being inherited by new instances, and their modules not being patched.
Tracer will automatically ignore imported modules that have been patched.
tracer_reuse.pytracer_reuse_module.py
1 2 3 4 5 6 7 8 9 10 11 12 | from tracer_reuse_module import collect_payment from aws_lambda_powertools import Tracer from aws_lambda_powertools.utilities.typing import LambdaContext tracer = Tracer() @tracer.capture_lambda_handler def lambda_handler(event: dict, context: LambdaContext) -> str: charge_id = event.get("charge_id", "") return collect_payment(charge_id=charge_id) |
---|
A new instance of Tracer will be created but will reuse the previous Tracer instance configuration, similar to a Singleton.
from aws_lambda_powertools import Tracer tracer = Tracer() @tracer.capture_method def collect_payment(charge_id: str) -> str: return f"dummy payment collected for charge: {charge_id}" |
---|
Testing your code¶
Tracer is disabled by default when not running in the AWS Lambda environment, including AWS SAM CLI and Chalice environments. This means no code changes or environment variables to be set.
Tips¶
- Use annotations on key operations to slice and dice traces, create unique views, and create metrics from it via Trace Groups
- Use a namespace when adding metadata to group data more easily
- Annotations and metadata are added to the current subsegment opened. If you want them in a specific subsegment, use a context manager via the escape hatch mechanism