Event Source Data Classes - Powertools for AWS Lambda (Python) (original) (raw)

Event Source Data Classes provides self-describing and strongly-typed classes for various AWS Lambda event sources.

Key features

Getting started

Tip

All examples shared in this documentation are available within the project repository.

There are two ways to use Event Source Data Classes in your Lambda functions.

Method 1: Direct Initialization

You can initialize the appropriate data class by passing the Lambda event object to its constructor.

app.pyAPI Gateway Proxy Example Event

from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent def lambda_handler(event: dict, context): api_event = APIGatewayProxyEvent(event) if "hello" in api_event.path and api_event.http_method == "GET": return {"statusCode": 200, "body": f"Hello from path: {api_event.path}"} else: return {"statusCode": 400, "body": "No Hello from path"}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 { "resource": "/helloworld", "path": "/hello", "httpMethod": "GET", "headers": { "Accept": "*/*", "Host": "api.example.com" }, "queryStringParameters": { "name": "John" }, "pathParameters": null, "stageVariables": null, "requestContext": { "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef", "stage": "prod" }, "body": null, "isBase64Encoded": false }

Method 2: Using the event_source Decorator

Alternatively, you can use the event_source decorator to automatically parse the event.

app.pyAPI Gateway Proxy Example Event

from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent, event_source @event_source(data_class=APIGatewayProxyEvent) def lambda_handler(event: APIGatewayProxyEvent, context): if "hello" in event.path and event.http_method == "GET": return {"statusCode": 200, "body": f"Hello from path: {event.path}"} else: return {"statusCode": 400, "body": "No Hello from path"}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 { "resource": "/helloworld", "path": "/hello", "httpMethod": "GET", "headers": { "Accept": "*/*", "Host": "api.example.com" }, "queryStringParameters": { "name": "John" }, "pathParameters": null, "stageVariables": null, "requestContext": { "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef", "stage": "prod" }, "body": null, "isBase64Encoded": false }

Autocomplete with self-documented properties and methods

Event Source Data Classes has the ability to leverage IDE autocompletion and inline documentation. When using the APIGatewayProxyEvent class, for example, the IDE will offer autocomplete suggestions for various properties and methods.

Utilities Data Classes

Supported event sources

Each event source is linked to its corresponding GitHub file with the full set of properties, methods, and docstrings specific to each event type.

Event Source Data_class Properties
Active MQ ActiveMQEvent Github
API Gateway Authorizer APIGatewayAuthorizerRequestEvent Github
API Gateway Authorizer V2 APIGatewayAuthorizerEventV2 Github
API Gateway Proxy APIGatewayProxyEvent Github
API Gateway Proxy V2 APIGatewayProxyEventV2 Github
Application Load Balancer ALBEvent Github
AppSync Authorizer AppSyncAuthorizerEvent Github
AppSync Resolver AppSyncResolverEvent Github
AWS Config Rule AWSConfigRuleEvent Github
Bedrock Agent - OpenAPI BedrockAgentEvent Github
Bedrock Agent - Function BedrockAgentFunctionEvent Github
CloudFormation Custom Resource CloudFormationCustomResourceEvent Github
CloudWatch Alarm State Change Action CloudWatchAlarmEvent Github
CloudWatch Dashboard Custom Widget CloudWatchDashboardCustomWidgetEvent Github
CloudWatch Logs CloudWatchLogsEvent Github
CodeDeploy Lifecycle Hook CodeDeployLifecycleHookEvent Github
CodePipeline Job Event CodePipelineJobEvent Github
Cognito User Pool Multiple available under cognito_user_pool_event Github
Connect Contact Flow ConnectContactFlowEvent Github
DynamoDB streams DynamoDBStreamEvent, DynamoDBRecordEventName Github
EventBridge EventBridgeEvent Github
Kafka KafkaEvent Github
Kinesis Data Stream KinesisStreamEvent Github
Kinesis Firehose Delivery Stream KinesisFirehoseEvent Github
Lambda Function URL LambdaFunctionUrlEvent Github
Rabbit MQ RabbitMQEvent Github
S3 S3Event Github
S3 Batch Operations S3BatchOperationEvent Github
S3 Object Lambda S3ObjectLambdaEvent Github
S3 EventBridge Notification S3EventBridgeNotificationEvent Github
SES SESEvent Github
SNS SNSEvent Github
SQS SQSEvent Github
[TransferFamilyAuthorizer] TransferFamilyAuthorizer Github
[TransferFamilyAuthorizerResponse] TransferFamilyAuthorizerResponse Github
VPC Lattice V2 VPCLatticeV2Event Github
VPC Lattice V1 VPCLatticeEvent Github
IoT Core Thing Created/Updated/Deleted IoTCoreThingEvent GitHub
IoT Core Thing Type Created/Updated/Deprecated/Undeprecated/Deleted IoTCoreThingTypeEvent GitHub
IoT Core Thing Type Associated/Disassociated with a Thing IoTCoreThingTypeAssociationEvent GitHub
IoT Core Thing Group Created/Updated/Deleted IoTCoreThingGroupEvent GitHub
IoT Thing Added/Removed from Thing Group IoTCoreAddOrRemoveFromThingGroupEvent GitHub
IoT Child Group Added/Deleted from Parent Group IoTCoreAddOrDeleteFromThingGroupEvent GitHub

Info

The examples showcase a subset of Event Source Data Classes capabilities - for comprehensive details, leverage your IDE's autocompletion, refer to type hints and docstrings, and explore the full API reference for complete property listings of each event source.

Active MQ

It is used for Active MQ payloads, also see the AWS blog postfor more details.

app.pyActive MQ Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import json from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent logger = Logger() @event_source(data_class=ActiveMQEvent) def lambda_handler(event: ActiveMQEvent, context): for message in event.messages: msg = message.message_id msg_pn = message.destination_physicalname logger.info(f"Message ID: {msg} and physical name: {msg_pn}") return {"statusCode": 200, "body": json.dumps("Processing complete")}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 { "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "data": "QUJDOkFBQUE=", "connectionId": "myJMSCoID", "redelivered": false, "destination": { "physicalName": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "testKey": "testValue" } }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==", "connectionId": "myJMSCoID2", "redelivered": false, "destination": { "physicalName": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "testKey": "testValue" } }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "data": "3DTOOW7crj51prgVLQaGQ82S48k=", "connectionId": "myJMSCoID1", "persistent": false, "destination": { "physicalName": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "testKey": "testValue" } } ] }

It is used for API Gateway Rest API Lambda Authorizer payload.

Use APIGatewayAuthorizerRequestEvent for type REQUEST and APIGatewayAuthorizerTokenEvent for type TOKEN.

Rest APIsWebSocket APIsAPI Gateway Authorizer Request Example Eventapp_token.pyAPI Gateway Authorizer Token Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( APIGatewayAuthorizerRequestEvent, APIGatewayAuthorizerResponse, ) @event_source(data_class=APIGatewayAuthorizerRequestEvent) def lambda_handler(event: APIGatewayAuthorizerRequestEvent, context): # Simple auth check (replace with your actual auth logic) is_authorized = event.headers.get("HeaderAuth1") == "headerValue1" if not is_authorized: return {"principalId": "", "policyDocument": {"Version": "2012-10-17", "Statement": []}} arn = event.parsed_arn policy = APIGatewayAuthorizerResponse( principal_id="user", context={"user": "example"}, region=arn.region, aws_account_id=arn.aws_account_id, api_id=arn.api_id, stage=arn.stage, ) policy.allow_all_routes() return policy.asdict()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( APIGatewayAuthorizerRequestEvent, APIGatewayAuthorizerResponseWebSocket, ) @event_source(data_class=APIGatewayAuthorizerRequestEvent) def lambda_handler(event: APIGatewayAuthorizerRequestEvent, context): # Simple auth check (replace with your actual auth logic) is_authorized = event.headers.get("HeaderAuth1") == "headerValue1" if not is_authorized: return {"principalId": "", "policyDocument": {"Version": "2012-10-17", "Statement": []}} arn = event.parsed_arn policy = APIGatewayAuthorizerResponseWebSocket( principal_id="user", context={"user": "example"}, region=arn.region, aws_account_id=arn.aws_account_id, api_id=arn.api_id, stage=arn.stage, ) policy.allow_all_routes() return policy.asdict()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 { "version": "1.0", "type": "REQUEST", "methodArn": "arn:aws:execute-api:us-east-1:123456789012:abcdef123/test/GET/request", "identitySource": "user1,123", "authorizationToken": "user1,123", "resource": "/request", "path": "/request", "httpMethod": "GET", "headers": { "X-AMZ-Date": "20170718T062915Z", "Accept": "*/*", "HeaderAuth1": "headerValue1", "CloudFront-Viewer-Country": "US", "CloudFront-Forwarded-Proto": "https", "CloudFront-Is-Tablet-Viewer": "false", "CloudFront-Is-Mobile-Viewer": "false", "User-Agent": "..." }, "multiValueHeaders": { "Header1": [ "value1" ], "Origin": [ "https://aws.amazon.com" ], "Header2": [ "value1", "value2" ] }, "queryStringParameters": { "QueryString1": "queryValue1" }, "pathParameters": {}, "stageVariables": { "StageVar1": "stageValue1" }, "requestContext": { "accountId": "123456789012", "apiId": "abcdef123", "domainName": "3npb9j1tlk.execute-api.us-west-1.amazonaws.com", "domainPrefix": "3npb9j1tlk", "extendedRequestId": "EXqgWgXxSK4EJug=", "httpMethod": "GET", "identity": { "accessKey": null, "accountId": null, "caller": null, "cognitoAmr": null, "cognitoAuthenticationProvider": null, "cognitoAuthenticationType": null, "cognitoIdentityId": null, "cognitoIdentityPoolId": null, "principalOrgId": null, "apiKey": "...", "sourceIp": "test-invoke-source-ip", "user": null, "userAgent": "PostmanRuntime/7.28.3", "userArn": null, "clientCert": { "clientCertPem": "CERT_CONTENT", "subjectDN": "www.example.com", "issuerDN": "Example issuer", "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1", "validity": { "notBefore": "May 28 12:30:02 2019 GMT", "notAfter": "Aug 5 09:36:04 2021 GMT" } } }, "path": "/request", "protocol": "HTTP/1.1", "requestId": "EXqgWgXxSK4EJug=", "requestTime": "20/Aug/2021:14:36:50 +0000", "requestTimeEpoch": 1629470210043, "resourceId": "ANY /request", "resourcePath": "/request", "stage": "test" } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( APIGatewayAuthorizerResponse, APIGatewayAuthorizerTokenEvent, ) @event_source(data_class=APIGatewayAuthorizerTokenEvent) def lambda_handler(event: APIGatewayAuthorizerTokenEvent, context): # Simple token check (replace with your actual token validation logic) is_valid_token = event.authorization_token == "allow" if not is_valid_token: return {"principalId": "", "policyDocument": {"Version": "2012-10-17", "Statement": []}} arn = event.parsed_arn policy = APIGatewayAuthorizerResponse( principal_id="user", context={"user": "example"}, region=arn.region, aws_account_id=arn.aws_account_id, api_id=arn.api_id, stage=arn.stage, ) policy.allow_all_routes() return policy.asdict()
{ "type": "TOKEN", "authorizationToken": "allow", "methodArn": "arn:aws:execute-api:us-west-2:123456789012:ymy8tbxw7b/*/GET/" }

API Gateway Authorizer V2

It is used for API Gateway HTTP API Lambda Authorizer payload version 2. See also this blog post for more details.

app.pyAPI Gateway Authorizer V2 Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from secrets import compare_digest from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( APIGatewayAuthorizerEventV2, APIGatewayAuthorizerResponseV2, ) def get_user_by_token(token): if compare_digest(token, "value"): return {"name": "Foo"} return None @event_source(data_class=APIGatewayAuthorizerEventV2) def lambda_handler(event: APIGatewayAuthorizerEventV2, context): user = get_user_by_token(event.headers.get("Authorization")) if user is None: # No user was found, so we return not authorized return APIGatewayAuthorizerResponseV2(authorize=False).asdict() # Found the user and setting the details in the context response = APIGatewayAuthorizerResponseV2( authorize=True, context=user, ) return response.asdict()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 { "version": "2.0", "type": "REQUEST", "routeArn": "arn:aws:execute-api:us-east-1:123456789012:abcdef123/test/GET/request", "identitySource": ["user1", "123"], "routeKey": "GET /merchants", "rawPath": "/merchants", "rawQueryString": "parameter1=value1&parameter1=value2&parameter2=value", "cookies": ["cookie1", "cookie2"], "headers": { "x-amzn-trace-id": "Root=1-611cc4a7-0746ebee281cfd967db97b64", "Header1": "value1", "Header2": "value2", "Authorization": "value" }, "queryStringParameters": { "parameter1": "value1,value2", "parameter2": "value" }, "requestContext": { "accountId": "123456789012", "apiId": "api-id", "authentication": { "clientCert": { "clientCertPem": "CERT_CONTENT", "subjectDN": "www.example.com", "issuerDN": "Example issuer", "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1", "validity": { "notBefore": "May 28 12:30:02 2019 GMT", "notAfter": "Aug 5 09:36:04 2021 GMT" } } }, "domainName": "id.execute-api.us-east-1.amazonaws.com", "domainPrefix": "id", "http": { "method": "POST", "path": "/merchants", "protocol": "HTTP/1.1", "sourceIp": "10.10.10.10", "userAgent": "agent" }, "requestId": "id", "routeKey": "GET /merchants", "stage": "$default", "time": "12/Mar/2020:19:03:58 +0000", "timeEpoch": 1583348638390 }, "pathParameters": { "parameter1": "value1" }, "stageVariables": { "stageVariable1": "value1", "stageVariable2": "value2" } }

API Gateway Proxy

It is used for either API Gateway REST API or HTTP API using v1 proxy event.

app.pyAPI Gateway Proxy Example Event

from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent, event_source @event_source(data_class=APIGatewayProxyEvent) def lambda_handler(event: APIGatewayProxyEvent, context): if "hello" in event.path and event.http_method == "GET": return {"statusCode": 200, "body": f"Hello from path: {event.path}"} else: return {"statusCode": 400, "body": "No Hello from path"}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 { "resource": "/helloworld", "path": "/hello", "httpMethod": "GET", "headers": { "Accept": "*/*", "Host": "api.example.com" }, "queryStringParameters": { "name": "John" }, "pathParameters": null, "stageVariables": null, "requestContext": { "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef", "stage": "prod" }, "body": null, "isBase64Encoded": false }

API Gateway Proxy V2

It is used for HTTP API using v2 proxy event.

app.pyAPI Gateway Proxy V2 Example Event

from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEventV2, event_source @event_source(data_class=APIGatewayProxyEventV2) def lambda_handler(event: APIGatewayProxyEventV2, context): if "hello" in event.path and event.http_method == "POST": return {"statusCode": 200, "body": f"Hello from path: {event.path}"} else: return {"statusCode": 400, "body": "No Hello from path"}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 { "version": "2.0", "routeKey": "$default", "rawPath": "/my/path", "rawQueryString": "parameter1=value1&parameter1=value2&parameter2=value", "cookies": [ "cookie1", "cookie2" ], "headers": { "Header1": "value1", "Header2": "value1,value2" }, "queryStringParameters": { "parameter1": "value1,value2", "parameter2": "value" }, "requestContext": { "accountId": "123456789012", "apiId": "api-id", "authentication": { "clientCert": { "clientCertPem": "CERT_CONTENT", "subjectDN": "www.example.com", "issuerDN": "Example issuer", "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1", "validity": { "notBefore": "May 28 12:30:02 2019 GMT", "notAfter": "Aug 5 09:36:04 2021 GMT" } } }, "authorizer": { "jwt": { "claims": { "claim1": "value1", "claim2": "value2" }, "scopes": [ "scope1", "scope2" ] } }, "domainName": "id.execute-api.us-east-1.amazonaws.com", "domainPrefix": "id", "http": { "method": "POST", "path": "/my/path", "protocol": "HTTP/1.1", "sourceIp": "192.168.0.1/32", "userAgent": "agent" }, "requestId": "id", "routeKey": "$default", "stage": "$default", "time": "12/Mar/2020:19:03:58 +0000", "timeEpoch": 1583348638390 }, "body": "{\"message\": \"hello world\", \"username\": \"tom\"}", "pathParameters": { "parameter1": "value1" }, "isBase64Encoded": false, "stageVariables": { "stageVariable1": "value1", "stageVariable2": "value2" } }

Application Load Balancer

Is it used for Application load balancer event.

app.pyApplication Load Balancer Example Event

from aws_lambda_powertools.utilities.data_classes import ALBEvent, event_source @event_source(data_class=ALBEvent) def lambda_handler(event: ALBEvent, context): if "lambda" in event.path and event.http_method == "GET": return {"statusCode": 200, "body": f"Hello from path: {event.path}"} else: return {"statusCode": 400, "body": "No Hello from path"}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 { "requestContext": { "elb": { "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/lambda-279XGJDqGZ5rsrHC2Fjr/49e9d65c45c6791a" } }, "httpMethod": "GET", "path": "/lambda", "queryStringParameters": { "query": "1234ABCD" }, "headers": { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "accept-encoding": "gzip", "accept-language": "en-US,en;q=0.9", "connection": "keep-alive", "host": "lambda-alb-123578498.us-east-2.elb.amazonaws.com", "upgrade-insecure-requests": "1", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36", "x-amzn-trace-id": "Root=1-5c536348-3d683b8b04734faae651f476", "x-forwarded-for": "72.12.164.125", "x-forwarded-port": "80", "x-forwarded-proto": "http", "x-imforwards": "20" }, "body": "Test", "isBase64Encoded": false }

AppSync Authorizer

Used when building an AWS_LAMBDA Authorization with AppSync. See blog post Introducing Lambda authorization for AWS AppSync GraphQL APIsor read the Amplify documentation on using AWS Lambda for authorization with AppSync.

app.pyAppSync Authorizer Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from typing import Dict from aws_lambda_powertools.logging import correlation_paths from aws_lambda_powertools.logging.logger import Logger from aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event import ( AppSyncAuthorizerEvent, AppSyncAuthorizerResponse, ) from aws_lambda_powertools.utilities.data_classes.event_source import event_source logger = Logger() def get_user_by_token(token: str): """Look a user by token""" ... @logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER) @event_source(data_class=AppSyncAuthorizerEvent) def lambda_handler(event: AppSyncAuthorizerEvent, context) -> Dict: user = get_user_by_token(event.authorization_token) if not user: # No user found, return not authorized return AppSyncAuthorizerResponse().asdict() return AppSyncAuthorizerResponse( authorize=True, resolver_context={"id": user.id}, # Only allow admins to delete events deny_fields=None if user.is_admin else ["Mutation.deleteEvent"], ).asdict()
1 2 3 4 5 6 7 8 9 10 11 12 13 { "authorizationToken": "BE9DC5E3-D410-4733-AF76-70178092E681", "requestContext": { "apiId": "giy7kumfmvcqvbedntjwjvagii", "accountId": "254688921111", "requestId": "b80ed838-14c6-4500-b4c3-b694c7bef086", "queryString": "mutation MyNewTask($desc: String!) {\n createTask(description: $desc, owner: \"ccc\", taskStatus: \"cc\", title: \"ccc\") {\n id\n }\n}\n", "operationName": "MyNewTask", "variables": { "desc": "Foo" } } }

AppSync Resolver

Used when building Lambda GraphQL Resolvers with Amplify GraphQL Transform Library (@function), and AppSync Direct Lambda Resolvers.

The example serves as an AppSync resolver for the locations field of the Merchant type. It uses the @event_source decorator to parse the AppSync event, handles pagination and filtering for locations, and demonstrates AppSyncIdentityCognito.

app.pyAppSync Resolver Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import ( AppSyncIdentityCognito, AppSyncResolverEvent, ) from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=AppSyncResolverEvent) def lambda_handler(event: AppSyncResolverEvent, context: LambdaContext): # Access the AppSync event details type_name = event.type_name field_name = event.field_name arguments = event.arguments source = event.source print(f"Resolving field '{field_name}' for type '{type_name}'") print(f"Arguments: {arguments}") print(f"Source: {source}") # Check if the identity is Cognito-based if isinstance(event.identity, AppSyncIdentityCognito): user_id = event.identity.sub username = event.identity.username print(f"Request from Cognito user: {username} (ID: {user_id})") else: print("Request is not from a Cognito-authenticated user") if type_name == "Merchant" and field_name == "locations": page = arguments.get("page", 1) size = arguments.get("size", 10) name_filter = arguments.get("name") # Here you would typically fetch locations from a database # This is a mock implementation locations = [ {"id": "1", "name": "Location 1", "address": "123 Main St"}, {"id": "2", "name": "Location 2", "address": "456 Elm St"}, {"id": "3", "name": "Location 3", "address": "789 Oak St"}, ] # Apply name filter if provided if name_filter: locations = [loc for loc in locations if name_filter.lower() in loc["name"].lower()] # Apply pagination start = (page - 1) * size end = start + size paginated_locations = locations[start:end] return { "items": paginated_locations, "totalCount": len(locations), "nextToken": str(page + 1) if end < len(locations) else None, } else: raise Exception(f"Unhandled field: {field_name} for type: {type_name}")
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 { "typeName": "Merchant", "fieldName": "locations", "arguments": { "page": 2, "size": 1, "name": "value" }, "identity": { "claims": { "sub": "07920713-4526-4642-9c88-2953512de441", "iss": "https://cognito-idp.us-east-1.amazonaws.com/us-east-1\_POOL\_ID", "aud": "58rc9bf5kkti90ctmvioppukm9", "event_id": "7f4c9383-abf6-48b7-b821-91643968b755", "token_use": "id", "auth_time": 1615366261, "name": "Michael Brewer", "exp": 1615369861, "iat": 1615366261 }, "defaultAuthStrategy": "ALLOW", "groups": null, "issuer": "https://cognito-idp.us-east-1.amazonaws.com/us-east-1\_POOL\_ID", "sourceIp": [ "11.215.2.22" ], "sub": "07920713-4526-4642-9c88-2953512de441", "username": "mike" }, "source": { "name": "Value", "nested": { "name": "value", "list": [] } }, "request": { "headers": { "x-forwarded-for": "11.215.2.22, 64.44.173.11", "cloudfront-viewer-country": "US", "cloudfront-is-tablet-viewer": "false", "via": "2.0 SOMETHING.cloudfront.net (CloudFront)", "cloudfront-forwarded-proto": "https", "origin": "https://console.aws.amazon.com", "content-length": "156", "accept-language": "en-US,en;q=0.9", "host": "SOMETHING.appsync-api.us-east-1.amazonaws.com", "x-forwarded-proto": "https", "sec-gpc": "1", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) etc.", "accept": "*/*", "cloudfront-is-mobile-viewer": "false", "cloudfront-is-smarttv-viewer": "false", "accept-encoding": "gzip, deflate, br", "referer": "https://console.aws.amazon.com/", "content-type": "application/json", "sec-fetch-mode": "cors", "x-amz-cf-id": "Fo5VIuvP6V6anIEt62WzFDCK45mzM4yEdpt5BYxOl9OFqafd-WR0cA==", "x-amzn-trace-id": "Root=1-60488877-0b0c4e6727ab2a1c545babd0", "authorization": "AUTH-HEADER", "sec-fetch-dest": "empty", "x-amz-user-agent": "AWS-Console-AppSync/", "cloudfront-is-desktop-viewer": "true", "sec-fetch-site": "cross-site", "x-forwarded-port": "443" } }, "prev": { "result": {} } }

AWS Config Rule

The example utilizes AWSConfigRuleEvent to parse the incoming event. The function logs the message type of the invoking event and returns a simple success response. The example event receives a Scheduled Event Notification, but could also be ItemChanged and Oversized.

app.pyScheduledNotification Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import ( AWSConfigRuleEvent, event_source, ) logger = Logger() @event_source(data_class=AWSConfigRuleEvent) def lambda_handler(event: AWSConfigRuleEvent, context): message_type = event.invoking_event.message_type logger.info(f"Logging {message_type} event rule", invoke_event=event.raw_invoking_event) return {"Success": "OK"}
1 2 3 4 5 6 7 8 9 10 11 12 13 { "version":"1.0", "invokingEvent":"{\"awsAccountId\":\"0123456789012\",\"notificationCreationTime\":\"2023-04-27T13:26:17.741Z\",\"messageType\":\"ScheduledNotification\",\"recordVersion\":\"1.0\"}", "ruleParameters":"{\"test\":\"x\"}", "resultToken":"eyJlbmNyeXB0ZWREYXRhIjpbLTQyLDEyNiw1MiwtMzcsLTI5LDExNCwxMjYsLTk3LDcxLDIyLC0xMTAsMTEyLC0zMSwtOTMsLTQ5LC0xMDEsODIsMyw1NCw0OSwzLC02OSwtNzEsLTcyLDYyLDgxLC03MiwtODEsNTAsMzUsLTUwLC03NSwtMTE4LC0xMTgsNzcsMTIsLTEsMTQsMTIwLC03MCwxMTAsLTMsNTAsLTYwLDEwNSwtNTcsNDUsMTAyLC0xMDksLTYxLC0xMDEsLTYxLDQsNDcsLTg0LC0yNSwxMTIsNTQsLTcxLC0xMDksNDUsMTksMTIzLC0yNiwxMiwtOTYsLTczLDU0LC0xMDksOTIsNDgsLTU5LC04MywtMzIsODIsLTM2LC05MCwxOSw5OCw3Nyw3OCw0MCw4MCw3OCwtMTA1LDg3LC0xMTMsLTExNiwtNzIsMzAsLTY4LC00MCwtODksMTA5LC0xMDgsLTEyOCwyMiw3Miw3NywtMjEsNzYsODksOTQsLTU5LDgxLC0xMjEsLTEwNywtNjcsNjMsLTcsODIsLTg5LC00NiwtMzQsLTkyLDEyMiwtOTAsMTcsLTEyMywyMCwtODUsLTU5LC03MCw4MSwyNyw2Miw3NCwtODAsODAsMzcsNDAsMTE2LDkxLC0yNCw1MSwtNDEsLTc5LDI4LDEyMCw1MywtMTIyLC04MywxMjYsLTc4LDI1LC05OCwtMzYsMTMsMzIsODYsLTI1LDQ4LDMsLTEwMiwtMTYsMjQsLTMsODUsNDQsLTI4LDE0LDIyLDI3LC0xMjIsMTE4LDEwMSw3Myw1LDE4LDU4LC02NCwyMywtODYsLTExNCwyNCwwLDEwMCwyLDExNywtNjIsLTExOSwtMTI4LDE4LDY1LDkwLDE0LC0xMDIsMjEsODUsMTAwLDExNyw1NSwyOSwxMjcsNTQsNzcsNzIsNzQsMzIsNzgsMywtMTExLDExOCwtNzAsLTg2LDEyNywtNzQsNjAsMjIsNDgsMzcsODcsMTMsMCwtMTA1LDUsLTEyMiwtNzEsLTEwMCwxMDQsLTEyNiwtMTYsNzksLTMwLDEyMCw3NywtNzYsLTQxLC0xMDksMiw5NywtMTAxLC0xLDE1LDEyMywxMTksMTA4LDkxLC0yMCwtMTI1LC05NiwyLC05MiwtMTUsLTY3LC03NiwxMjEsMTA0LDEwNSw2NCwtNjIsMTAyLDgsNCwxMjEsLTQ1LC04MCwtODEsLTgsMTE4LDQ0LC04MiwtNDEsLTg0LDczLC0zNiwxMTcsODAsLTY5LC03MywxNCwtMTgsNzIsMzEsLTUsLTExMSwtMTI3LC00MywzNCwtOCw1NywxMDMsLTQyLDE4LC0zMywxMTcsLTI2LC0xMjQsLTEyNCwxNSw4OCwyMywxNiwtNTcsNTQsLTYsLTEwMiwxMTYsLTk5LC00NSwxMDAsLTM1LDg3LDM3LDYsOTgsMiwxMTIsNjAsLTMzLDE3LDI2LDk5LC0xMDUsNDgsLTEwNCwtMTE5LDc4LDYsLTU4LDk1LDksNDEsLTE2LDk2LDQxLC0yMiw5Niw3MiwxMTYsLTk1LC0xMDUsLTM2LC0xMjMsLTU1LDkxLC00NiwtNywtOTIsMzksNDUsODQsMTYsLTEyNCwtMTIyLC02OCwxLC0yOCwxMjIsLTYwLDgyLDEwMywtNTQsLTkyLDI3LC05OSwtMTI4LDY1LDcsLTcyLC0xMjcsNjIsLTIyLDIsLTExLDE4LC04OSwtMTA2LC03NCw3MSw4NiwtMTE2LC0yNSwtMTE1LC05Niw1NywtMzQsMjIsLTEyNCwtMTI1LC00LC00MSw0MiwtNTcsLTEwMyw0NSw3OCwxNCwtMTA2LDExMSw5OCwtOTQsLTcxLDUsNzUsMTksLTEyNCwtMzAsMzQsLTUwLDc1LC04NCwtNTAsLTU2LDUxLC0xNSwtMzYsNjEsLTk0LC03OSwtNDUsMTI2LC03NywtMTA1LC0yLC05MywtNiw4LC0zLDYsLTQyLDQ2LDEyNSw1LC05OCwxMyw2NywtMTAsLTEzLC05NCwtNzgsLTEyNywxMjEsLTI2LC04LC0xMDEsLTkxLDEyMSwtNDAsLTEyNCwtNjQsODQsLTcyLDYzLDE5LC04NF0sIm1hdGVyaWFsU2V0U2VyaWFsTnVtYmVyIjoxLCJpdlBhcmFtZXRlclNwZWMiOnsiaXYiOlszLC0xMCwtODUsMTE0LC05MCwxMTUsNzcsNTUsNTQsMTUsMzgsODQsLTExNiwxNCwtNDAsMjhdfX0=", "eventLeftScope":false, "executionRoleArn":"arn:aws:iam::0123456789012:role/aws-service-role/config.amazonaws.com/AWSServiceRoleForConfig", "configRuleArn":"arn:aws:config:us-east-1:0123456789012:config-rule/config-rule-pdmyw1", "configRuleName":"rule-ec2-test", "configRuleId":"config-rule-pdmyw1", "accountId":"0123456789012", "evaluationMode":"DETECTIVE" }

Bedrock Agent

The example handles Bedrock Agent event with BedrockAgentEvent to parse the incoming event. The function logs the action group and input text, then returns a structured response compatible with Bedrock Agent's expected format, including a mock response body.

app.pyBedrock Agent Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import BedrockAgentEvent, event_source logger = Logger() @event_source(data_class=BedrockAgentEvent) def lambda_handler(event: BedrockAgentEvent, context) -> dict: input_text = event.input_text logger.info(f"Bedrock Agent {event.action_group} invoked with input", input_text=input_text) return { "message_version": "1.0", "responses": [ { "action_group": event.action_group, "api_path": event.api_path, "http_method": event.http_method, "http_status_code": 200, "response_body": {"application/json": {"body": "This is the response"}}, }, ], }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 { "actionGroup": "ClaimManagementActionGroup", "messageVersion": "1.0", "sessionId": "12345678912345", "sessionAttributes": {}, "promptSessionAttributes": {}, "inputText": "I want to claim my insurance", "agent": { "alias": "TSTALIASID", "name": "test", "version": "DRAFT", "id": "8ZXY0W8P1H" }, "httpMethod": "GET", "apiPath": "/claims" }

CloudFormation Custom Resource

The example focuses on the Create request type, generating a unique physical resource ID and logging the process. The function is structured to potentially handle Update and Delete operations as well.

app.pyCloudFormation Custom Resource Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import ( CloudFormationCustomResourceEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext logger = Logger() @event_source(data_class=CloudFormationCustomResourceEvent) def lambda_handler(event: CloudFormationCustomResourceEvent, context: LambdaContext): request_type = event.request_type if request_type == "Create": return on_create(event, context) else: raise ValueError(f"Invalid request type: {request_type}") def on_create(event: CloudFormationCustomResourceEvent, context: LambdaContext): props = event.resource_properties logger.info(f"Create new resource with props {props}.") physical_id = f"MyResource-{context.aws_request_id}" return {"PhysicalResourceId": physical_id, "Data": {"Message": "Resource created successfully"}}
1 2 3 4 5 6 7 8 9 10 11 12 13 { "RequestType": "Create", "ServiceToken": "arn:aws:lambda:us-east-1:xxx:function:xxxx-CrbuiltinfunctionidProvi-2vKAalSppmKe", "ResponseURL": "https://cloudformation-custom-resource-response-useast1.s3.amazonaws.com/7F%7Cb1f50fdfc25f3b", "StackId": "arn:aws:cloudformation:us-east-1:xxxx:stack/xxxx/271845b0-f2e8-11ed-90ac-0eeb25b8ae21", "RequestId": "xxxxx-d2a0-4dfb-ab1f-xxxxxx", "LogicalResourceId": "xxxxxxxxx", "ResourceType": "Custom::MyType", "ResourceProperties": { "ServiceToken": "arn:aws:lambda:us-east-1:xxxxx:function:xxxxx", "MyProps": "ss" } }

CloudWatch Dashboard Custom Widget

Thie example for CloudWatchDashboardCustomWidgetEvent logs the dashboard name, extracts key information like widget ID and time range, and returns a formatted response with a title and markdown content. Read more about custom widgets for Cloudwatch dashboard.

app.pyCloudWatch Dashboard Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import CloudWatchDashboardCustomWidgetEvent, event_source logger = Logger() @event_source(data_class=CloudWatchDashboardCustomWidgetEvent) def lambda_handler(event: CloudWatchDashboardCustomWidgetEvent, context): if event.widget_context is None: logger.warning("No widget context provided") return {"title": "Error", "markdown": "Widget context is missing"} logger.info(f"Processing custom widget for dashboard: {event.widget_context.dashboard_name}") # Access specific event properties widget_id = event.widget_context.widget_id time_range = event.widget_context.time_range if time_range is None: logger.warning("No time range provided") return {"title": f"Custom Widget {widget_id}", "markdown": "Time range is missing"} # Your custom widget logic here return { "title": f"Custom Widget {widget_id}", "markdown": f""" Dashboard: {event.widget_context.dashboard_name} Time Range: {time_range.start} to {time_range.end} Theme: {event.widget_context.theme or "default"} """, }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 { "original": "param-to-widget", "widgetContext": { "dashboardName": "Name-of-current-dashboard", "widgetId": "widget-16", "domain": "https://us-east-1.console.aws.amazon.com", "accountId": "123456789123", "locale": "en", "timezone": { "label": "UTC", "offsetISO": "+00:00", "offsetInMinutes": 0 }, "period": 300, "isAutoPeriod": true, "timeRange": { "mode": "relative", "start": 1627236199729, "end": 1627322599729, "relativeStart": 86400012, "zoom": { "start": 1627276030434, "end": 1627282956521 } }, "theme": "light", "linkCharts": true, "title": "Tweets for Amazon website problem", "forms": { "all": {} }, "params": { "original": "param-to-widget" }, "width": 588, "height": 369 } }

CloudWatch Alarm State Change Action

CloudWatch supports Lambda as an alarm state change action. You can use the CloudWathAlarmEvent data class to access the fields containing such data as alarm information, current state, and previous state.

app.pyCloudWatch Alarm Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import CloudWatchAlarmEvent, event_source from aws_lambda_powertools.utilities.typing import LambdaContext logger = Logger() @event_source(data_class=CloudWatchAlarmEvent) def lambda_handler(event: CloudWatchAlarmEvent, context: LambdaContext) -> dict: logger.info(f"Alarm {event.alarm_data.alarm_name} state is {event.alarm_data.state.value}") # You can now work with event. For example, you can enrich the received data, and # decide on how you want to route the alarm. return { "name": event.alarm_data.alarm_name, "arn": event.alarm_arn, "urgent": "Priority: P1" in (event.alarm_data.configuration.description or ""), }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 { "source": "aws.cloudwatch", "alarmArn": "arn:aws:cloudwatch:eu-west-1:912397435824⏰test_alarm", "accountId": "123456789012", "time": "2024-02-17T11:53:08.431+0000", "region": "eu-west-1", "alarmData": { "alarmName": "Test alert", "state": { "value": "ALARM", "reason": "Threshold Crossed: 1 out of the last 1 datapoints [1.0 (17/02/24 11:51:00)] was less than the threshold (10.0) (minimum 1 datapoint for OK -> ALARM transition).", "reasonData": "{\"version\":\"1.0\",\"queryDate\":\"2024-02-17T11:53:08.423+0000\",\"startDate\":\"2024-02-17T11:51:00.000+0000\",\"statistic\":\"SampleCount\",\"period\":60,\"recentDatapoints\":[1.0],\"threshold\":10.0,\"evaluatedDatapoints\":[{\"timestamp\":\"2024-02-17T11:51:00.000+0000\",\"sampleCount\":1.0,\"value\":1.0}]}", "timestamp": "2024-02-17T11:53:08.431+0000" }, "previousState": { "value": "OK", "reason": "Threshold Crossed: 1 out of the last 1 datapoints [1.0 (17/02/24 11:50:00)] was not greater than the threshold (10.0) (minimum 1 datapoint for ALARM -> OK transition).", "reasonData": "{\"version\":\"1.0\",\"queryDate\":\"2024-02-17T11:51:31.460+0000\",\"startDate\":\"2024-02-17T11:50:00.000+0000\",\"statistic\":\"SampleCount\",\"period\":60,\"recentDatapoints\":[1.0],\"threshold\":10.0,\"evaluatedDatapoints\":[{\"timestamp\":\"2024-02-17T11:50:00.000+0000\",\"sampleCount\":1.0,\"value\":1.0}]}", "timestamp": "2024-02-17T11:51:31.462+0000" }, "configuration": { "description": "This is description **here**", "metrics": [ { "id": "e1", "expression": "m1/m2", "label": "Expression1", "returnData": true }, { "id": "m1", "metricStat": { "metric": { "namespace": "AWS/Lambda", "name": "Invocations", "dimensions": {} }, "period": 60, "stat": "SampleCount" }, "returnData": false }, { "id": "m2", "metricStat": { "metric": { "namespace": "AWS/Lambda", "name": "Duration", "dimensions": {} }, "period": 60, "stat": "SampleCount" }, "returnData": false } ] } } }

CloudWatch Logs

CloudWatch Logs events by default are compressed and base64 encoded. You can use the helper function provided to decode, decompress and parse json data from the event.

app.pyCloudWatch Logs Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import CloudWatchLogsEvent, event_source from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData logger = Logger() @event_source(data_class=CloudWatchLogsEvent) def lambda_handler(event: CloudWatchLogsEvent, context): decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data() logger.info(f"Log group: {decompressed_log.log_group}") logger.info(f"Log stream: {decompressed_log.log_stream}") for log_event in decompressed_log.log_events: logger.info(f"Timestamp: {log_event.timestamp}, Message: {log_event.message}") return {"statusCode": 200, "body": f"Processed {len(decompressed_log.log_events)} log events"}
{ "awslogs": { "data": "H4sIAAAAAAAAAHWPwQqCQBCGX0Xm7EFtK+smZBEUgXoLCdMhFtKV3akI8d0bLYmibvPPN3wz00CJxmQnTO41whwWQRIctmEcB6sQbFC3CjW3XW8kxpOpP+OC22d1Wml1qZkQGtoMsScxaczKN3plG8zlaHIta5KqWsozoTYw3/djzwhpLwivWFGHGpAFe7DL68JlBUk+l7KSN7tCOEJ4M3/qOI49vMHj+zCKdlFqLaU2ZHV2a4Ct/an0/ivdX8oYc1UVX860fQDQiMdxRQEAAA==" } }

Kinesis integration

When streaming CloudWatch Logs to a Kinesis Data Stream (cross-account or not), you can use extract_cloudwatch_logs_from_event to decode, decompress and extract logs as CloudWatchLogsDecodedData to ease log processing.

app.pyKinesis Stream CloudWatch Logs Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from typing import List from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import ( KinesisStreamEvent, extract_cloudwatch_logs_from_event, ) @event_source(data_class=KinesisStreamEvent) def lambda_handler(event: KinesisStreamEvent, context): logs: List[CloudWatchLogsDecodedData] = extract_cloudwatch_logs_from_event(event) for log in logs: if log.message_type == "DATA_MESSAGE": return "success" return "nothing to be processed"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "da10bf66b1f54bff5d96eae99149ad1f", "sequenceNumber": "49635052289529725553291405521504870233219489715332317186", "data": "H4sIAAAAAAAAAK2Sa2vbMBSG/4ox+xg3Oror39IlvaztVmJv7WjCUGwl8+ZLZstts5L/vuOsZYUyWGEgJHiP9J7nvOghLF3b2rVLthsXjsLJOBl/uZjG8fh4Gg7C+q5yDcqUAWcSONHEoFzU6+Om7jZYGdq7dljYcpnZ4cZHwLWOJl1Zbs/r9cR6e9RVqc/rKlpXV9eXt+fy27vt8W+L2DfOlr07oXQIMAQyvHlzPk6mcbKgciktF5lQfMU5dZZqzrShLF2uFC60aLtlmzb5prc/ygvvmjYc3YRPFG+LusuurE+/Ikqb1Gd55dq8jV+8isT6+317Rk42J5PTcLFnm966yvd2D2GeISJTYIwCJSQ1BE9OtWZCABWaKMIJAMdDMyU5MYZLhmkxBhQxfY4Re1tiWiAlBsgIVQTE4Cl6tI+T8SwJZu5Hh1dPs1FApOMSDI9WVKmIC+4irTMWQZYpx7QkztrgE06MU4yCx9DmVbgbvABmQJTGtkYAB0NwEwyYQUBpqEFuSbkGrThTRKi/AlP+HHj6fvJa3P9Ap/+Rbja9/PD6POd+0jXW7xM1B8CDsp37w7woXBb8qQDZ6xeurJttEOc/HWpUBxeHKNr74LHwsXXYlsm9flrl/rmFIQeS7m3m1fVs/DlIGpu6nhMiyWQGXNKIMbcCIgkhElKbaZnZpYJUz33s1iV+z/6+StMlR3yphHNcCyxiNEXf2zed6xuEu8XuF2wb6krnAwAA", "approximateArrivalTimestamp": 1668093033.744 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49635052289529725553291405521504870233219489715332317186", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49", "awsRegion": "eu-west-1", "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "cf4c4c2c9a49bdfaf58d7dbbc2b06081", "sequenceNumber": "49635052289529725553291405520881064510298312199003701250", "data": "H4sIAAAAAAAAAK2SW2/TQBCF/4pl8ViTvc7u5i0laVraQhUbWtREaG1PgsGXYK/bhqr/nXVoBRIgUYnXc2bPfHO092GFXWc3mOy2GI7D6SSZfDyfxfFkPgsPwua2xtbLjFPBgQqiifFy2WzmbdNvvTOyt92otFWa29HWRVRoHU37qtqdNZupdfaorzNXNHW0qS+vLm7O4PPr3fxHROxatNWQThgbUTqiZHT94mySzOJkBUqYLOWY8ZQLbaTRkEvDciUYzWzKfETXp13WFtsh/qgoHbZdOL4OnyhelU2fX1qXffIoXdKcFjV2RRf/9iqSmy933Sk53h5PT8LVnm12g7Ub4u7DIveIXFFjFNGUKUlAaMY0EUJKLjkQbxhKGCWeknMKoAGUkYoJ7TFd4St2tvJtDRYxDAg3VB08Ve/j42SySIIFfu396Ek+DkS+xkwAiYhM00isgUV6jXmEMrM5EmMsh+C9v9hfMQ4eS1vW4cPBH4CZVpoTJkEIAp5RUMo8vGFae3JNCCdUccMVgPw7sP4VePZm+lzc/0AH/0i3mF28fX6fSzftW+v2jZKXRgVVt3SHRVliHvx06F4+x6ppd0FcfEMvMR2cH3rR3gWPxrsO/Vau9vqyvlpMPgRJazMcYGgEHHLKBhLGJaBA0JLxNc0JppoS9Cwxbir/B4d5QDBAQSnfFFGp8aa/vxw2uLbHYUH4sHr4Dj5RJxfMAwAA", "approximateArrivalTimestamp": 1668092612.992 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49635052289529725553291405520881064510298312199003701250", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49", "awsRegion": "eu-west-1", "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG" } ] }

Alternatively, you can use extract_cloudwatch_logs_from_record to seamless integrate with the Batch utility for more robust log processing.

app.pyKinesis Stream CloudWatch Logs Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.batch import ( BatchProcessor, EventType, process_partial_response, ) from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import ( KinesisStreamRecord, extract_cloudwatch_logs_from_record, ) logger = Logger() processor = BatchProcessor(event_type=EventType.KinesisDataStreams) def record_handler(record: KinesisStreamRecord): log = extract_cloudwatch_logs_from_record(record) logger.info(f"Message type: {log.message_type}") return log.message_type == "DATA_MESSAGE" def lambda_handler(event, context): return process_partial_response( event=event, record_handler=record_handler, processor=processor, context=context, )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "da10bf66b1f54bff5d96eae99149ad1f", "sequenceNumber": "49635052289529725553291405521504870233219489715332317186", "data": "H4sIAAAAAAAAAK2Sa2vbMBSG/4ox+xg3Oror39IlvaztVmJv7WjCUGwl8+ZLZstts5L/vuOsZYUyWGEgJHiP9J7nvOghLF3b2rVLthsXjsLJOBl/uZjG8fh4Gg7C+q5yDcqUAWcSONHEoFzU6+Om7jZYGdq7dljYcpnZ4cZHwLWOJl1Zbs/r9cR6e9RVqc/rKlpXV9eXt+fy27vt8W+L2DfOlr07oXQIMAQyvHlzPk6mcbKgciktF5lQfMU5dZZqzrShLF2uFC60aLtlmzb5prc/ygvvmjYc3YRPFG+LusuurE+/Ikqb1Gd55dq8jV+8isT6+317Rk42J5PTcLFnm966yvd2D2GeISJTYIwCJSQ1BE9OtWZCABWaKMIJAMdDMyU5MYZLhmkxBhQxfY4Re1tiWiAlBsgIVQTE4Cl6tI+T8SwJZu5Hh1dPs1FApOMSDI9WVKmIC+4irTMWQZYpx7QkztrgE06MU4yCx9DmVbgbvABmQJTGtkYAB0NwEwyYQUBpqEFuSbkGrThTRKi/AlP+HHj6fvJa3P9Ap/+Rbja9/PD6POd+0jXW7xM1B8CDsp37w7woXBb8qQDZ6xeurJttEOc/HWpUBxeHKNr74LHwsXXYlsm9flrl/rmFIQeS7m3m1fVs/DlIGpu6nhMiyWQGXNKIMbcCIgkhElKbaZnZpYJUz33s1iV+z/6+StMlR3yphHNcCyxiNEXf2zed6xuEu8XuF2wb6krnAwAA", "approximateArrivalTimestamp": 1668093033.744 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49635052289529725553291405521504870233219489715332317186", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49", "awsRegion": "eu-west-1", "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "cf4c4c2c9a49bdfaf58d7dbbc2b06081", "sequenceNumber": "49635052289529725553291405520881064510298312199003701250", "data": "H4sIAAAAAAAAAK2SW2/TQBCF/4pl8ViTvc7u5i0laVraQhUbWtREaG1PgsGXYK/bhqr/nXVoBRIgUYnXc2bPfHO092GFXWc3mOy2GI7D6SSZfDyfxfFkPgsPwua2xtbLjFPBgQqiifFy2WzmbdNvvTOyt92otFWa29HWRVRoHU37qtqdNZupdfaorzNXNHW0qS+vLm7O4PPr3fxHROxatNWQThgbUTqiZHT94mySzOJkBUqYLOWY8ZQLbaTRkEvDciUYzWzKfETXp13WFtsh/qgoHbZdOL4OnyhelU2fX1qXffIoXdKcFjV2RRf/9iqSmy933Sk53h5PT8LVnm12g7Ub4u7DIveIXFFjFNGUKUlAaMY0EUJKLjkQbxhKGCWeknMKoAGUkYoJ7TFd4St2tvJtDRYxDAg3VB08Ve/j42SySIIFfu396Ek+DkS+xkwAiYhM00isgUV6jXmEMrM5EmMsh+C9v9hfMQ4eS1vW4cPBH4CZVpoTJkEIAp5RUMo8vGFae3JNCCdUccMVgPw7sP4VePZm+lzc/0AH/0i3mF28fX6fSzftW+v2jZKXRgVVt3SHRVliHvx06F4+x6ppd0FcfEMvMR2cH3rR3gWPxrsO/Vau9vqyvlpMPgRJazMcYGgEHHLKBhLGJaBA0JLxNc0JppoS9Cwxbir/B4d5QDBAQSnfFFGp8aa/vxw2uLbHYUH4sHr4Dj5RJxfMAwAA", "approximateArrivalTimestamp": 1668092612.992 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49635052289529725553291405520881064510298312199003701250", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49", "awsRegion": "eu-west-1", "eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG" } ] }

CodeDeploy LifeCycle Hook

CodeDeploy triggers Lambdas with this event when defined inAppSpec definitionsto test applications at different stages of deployment.

app.pyCodeDeploy LifeCycle Hook Example Event

from aws_lambda_powertools.utilities.data_classes import CodeDeployLifecycleHookEvent, event_source @event_source(data_class=CodeDeployLifecycleHookEvent) def lambda_handler(event: CodeDeployLifecycleHookEvent, context): deployment_id = event.deployment_id lifecycle_event_hook_execution_id = event.lifecycle_event_hook_execution_id return {"deployment_id": deployment_id, "lifecycle_event_hook_execution_id": lifecycle_event_hook_execution_id}
{ "DeploymentId": "d-ABCDEF", "LifecycleEventHookExecutionId": "xxxxxxxxxxxxxxxxxxxxxxxx" }

CodePipeline Job

Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda.

app.pyCodePipeline Job Example Event

from aws_lambda_powertools.utilities.data_classes import CodePipelineJobEvent, event_source @event_source(data_class=CodePipelineJobEvent) def lambda_handler(event: CodePipelineJobEvent, context): job_id = event.get_id input_bucket = event.input_bucket_name return {"statusCode": 200, "body": f"Processed job {job_id} from bucket {input_bucket}"}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 { "CodePipeline.job": { "id": "11111111-abcd-1111-abcd-111111abcdef", "accountId": "111111111111", "data": { "actionConfiguration": { "configuration": { "FunctionName": "MyLambdaFunctionForAWSCodePipeline", "UserParameters": "some-input-such-as-a-URL" } }, "inputArtifacts": [ { "name": "ArtifactName", "revision": null, "location": { "type": "S3", "s3Location": { "bucketName": "the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890", "objectKey": "the name of the application, for example CodePipelineDemoApplication.zip" } } } ], "outputArtifacts": [], "artifactCredentials": { "accessKeyId": "AKIAIOSFODNN7EXAMPLE", "secretAccessKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "sessionToken": "MIICiTCCAfICCQD6m7oRw0uXOjANBgkqhkiG9w0BAQUFADCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wHhcNMTEwNDI1MjA0NTIxWhcNMTIwNDI0MjA0NTIxWjCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMaK0dn+a4GmWIWJ21uUSfwfEvySWtC2XADZ4nB+BLYgVIk60CpiwsZ3G93vUEIO3IyNoH/f0wYK8m9TrDHudUZg3qX4waLG5M43q7Wgc/MbQITxOUSQv7c7ugFFDzQGBzZswY6786m86gpEIbb3OhjZnzcvQAaRHhdlQWIMm2nrAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAtCu4nUhVVxYUntneD9+h8Mg9q6q+auNKyExzyLwaxlAoo7TJHidbtS4J5iNmZgXL0FkbFFBjvSfpJIlJ00zbhNYS5f6GuoEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTbNYiytVbZPQUQ5Yaxu2jXnimvw3rrszlaEXAMPLE=" }, "continuationToken": "A continuation token if continuing job" } } }

Cognito User Pool

Cognito User Pools have several different Lambda trigger sources, all of which map to a different data class, which can be imported from aws_lambda_powertools.data_classes.cognito_user_pool_event:

Trigger/Event Source Data Class
Custom message event data_classes.cognito_user_pool_event.CustomMessageTriggerEvent
Post authentication data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent
Post confirmation data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent
Pre authentication data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent
Pre sign-up data_classes.cognito_user_pool_event.PreSignUpTriggerEvent
Pre token generation data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent
Pre token generation V2 data_classes.cognito_user_pool_event.PreTokenGenerationV2TriggerEvent
User migration data_classes.cognito_user_pool_event.UserMigrationTriggerEvent
Define Auth Challenge data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent
Create Auth Challenge data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent
Verify Auth Challenge data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent
Custom Email Sender data_classes.cognito_user_pool_event.CustomEmailSenderTriggerEvent
Custom SMS Sender data_classes.cognito_user_pool_event.CustomSMSSenderTriggerEvent

Some examples for the Cognito User Pools Lambda triggers sources:

Post Confirmation Example

app.pyCognito Post Confirmation Example Event

from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import PostConfirmationTriggerEvent def lambda_handler(event, context): event: PostConfirmationTriggerEvent = PostConfirmationTriggerEvent(event) user_attributes = event.request.user_attributes return {"statusCode": 200, "body": f"User attributes: {user_attributes}"}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 { "version": "string", "triggerSource": "PostConfirmation_ConfirmSignUp", "region": "us-east-1", "userPoolId": "string", "userName": "userName", "callerContext": { "awsSdkVersion": "awsSdkVersion", "clientId": "clientId" }, "request": { "userAttributes": { "email": "user@example.com", "email_verified": true } }, "response": {} }

Define Auth Challenge Example

Note

In this example we are modifying the wrapped dict response fields, so we need to return the json serializable wrapped event in event.raw_event.

This example is based on the AWS Cognito docs for Define Auth Challenge Lambda Trigger.

app.pyCognito Define Auth Challengen Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import DefineAuthChallengeTriggerEvent def lambda_handler(event, context) -> dict: event_obj: DefineAuthChallengeTriggerEvent = DefineAuthChallengeTriggerEvent(event) if len(event_obj.request.session) == 1 and event_obj.request.session[0].challenge_name == "SRP_A": event_obj.response.issue_tokens = False event_obj.response.fail_authentication = False event_obj.response.challenge_name = "PASSWORD_VERIFIER" elif ( len(event_obj.request.session) == 2 and event_obj.request.session[1].challenge_name == "PASSWORD_VERIFIER" and event_obj.request.session[1].challenge_result ): event_obj.response.issue_tokens = False event_obj.response.fail_authentication = False event_obj.response.challenge_name = "CUSTOM_CHALLENGE" elif ( len(event_obj.request.session) == 3 and event_obj.request.session[2].challenge_name == "CUSTOM_CHALLENGE" and event_obj.request.session[2].challenge_result ): event_obj.response.issue_tokens = True event_obj.response.fail_authentication = False else: event_obj.response.issue_tokens = False event_obj.response.fail_authentication = True return event_obj.raw_event
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 { "version": "1", "region": "us-east-1", "userPoolId": "us-east-1_example", "userName": "UserName", "callerContext": { "awsSdkVersion": "awsSdkVersion", "clientId": "clientId" }, "triggerSource": "DefineAuthChallenge_Authentication", "request": { "userAttributes": { "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA", "email_verified": "true", "name": "First Last", "email": "define-auth@mail.com" }, "session" : [ { "challengeName": "PASSWORD_VERIFIER", "challengeResult": true }, { "challengeName": "CUSTOM_CHALLENGE", "challengeResult": true, "challengeMetadata": "CAPTCHA_CHALLENGE" } ], "userNotFound": true }, "response": {} }

Create Auth Challenge Example

This example is based on the AWS Cognito docs for Create Auth Challenge Lambda Trigger.

app.pyCognito Create Auth Challengen Example Event

from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import CreateAuthChallengeTriggerEvent @event_source(data_class=CreateAuthChallengeTriggerEvent) def handler(event: CreateAuthChallengeTriggerEvent, context) -> dict: if event.request.challenge_name == "CUSTOM_CHALLENGE": event.response.public_challenge_parameters = {"captchaUrl": "url/123.jpg"} event.response.private_challenge_parameters = {"answer": "5"} event.response.challenge_metadata = "CAPTCHA_CHALLENGE" return event.raw_event
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 { "version": "1", "region": "us-east-1", "userPoolId": "us-east-1_example", "userName": "UserName", "callerContext": { "awsSdkVersion": "awsSdkVersion", "clientId": "clientId" }, "triggerSource": "CreateAuthChallenge_Authentication", "request": { "userAttributes": { "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA", "email_verified": "true", "name": "First Last", "email": "create-auth@mail.com" }, "challengeName": "PASSWORD_VERIFIER", "session" : [ { "challengeName": "CUSTOM_CHALLENGE", "challengeResult": true, "challengeMetadata": "CAPTCHA_CHALLENGE" } ], "userNotFound": false }, "response": {} }

Verify Auth Challenge Response Example

This example is based on the AWS Cognito docs for Verify Auth Challenge Response Lambda Trigger.

app.pyCognito Verify Auth Challengen Example Event

from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import VerifyAuthChallengeResponseTriggerEvent @event_source(data_class=VerifyAuthChallengeResponseTriggerEvent) def lambda_handler(event: VerifyAuthChallengeResponseTriggerEvent, context) -> dict: event.response.answer_correct = ( event.request.private_challenge_parameters.get("answer") == event.request.challenge_answer ) return event.raw_event
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 { "version": "1", "region": "us-east-1", "userPoolId": "us-east-1_example", "userName": "UserName", "callerContext": { "awsSdkVersion": "awsSdkVersion", "clientId": "clientId" }, "triggerSource": "VerifyAuthChallengeResponse_Authentication", "request": { "userAttributes": { "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA", "email_verified": "true", "name": "First Last", "email": "verify-auth@mail.com" }, "privateChallengeParameters": { "answer": "challengeAnswer" }, "clientMetadata" : { "foo": "value" }, "challengeAnswer": "challengeAnswer", "userNotFound": true }, "response": {} }

Connect Contact Flow

The example integrates with Amazon Connect by handling contact flow events. The function converts the event into a ConnectContactFlowEvent object, providing a structured representation of the contact flow data.

app.pyConnect Contact Flow Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 from aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event import ( ConnectContactFlowChannel, ConnectContactFlowEndpointType, ConnectContactFlowEvent, ConnectContactFlowInitiationMethod, ) def lambda_handler(event, context): event: ConnectContactFlowEvent = ConnectContactFlowEvent(event) assert event.contact_data.attributes == {"Language": "en-US"} assert event.contact_data.channel == ConnectContactFlowChannel.VOICE assert event.contact_data.customer_endpoint.endpoint_type == ConnectContactFlowEndpointType.TELEPHONE_NUMBER assert event.contact_data.initiation_method == ConnectContactFlowInitiationMethod.API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 { "Name": "ContactFlowEvent", "Details": { "ContactData": { "Attributes": { "Language": "en-US" }, "Channel": "VOICE", "ContactId": "5ca32fbd-8f92-46af-92a5-6b0f970f0efe", "CustomerEndpoint": { "Address": "+11234567890", "Type": "TELEPHONE_NUMBER" }, "InitialContactId": "5ca32fbd-8f92-46af-92a5-6b0f970f0efe", "InitiationMethod": "API", "InstanceARN": "arn:aws:connect:eu-central-1:123456789012:instance/9308c2a1-9bc6-4cea-8290-6c0b4a6d38fa", "MediaStreams": { "Customer": { "Audio": { "StartFragmentNumber": "91343852333181432392682062622220590765191907586", "StartTimestamp": "1565781909613", "StreamARN": "arn:aws:kinesisvideo:eu-central-1:123456789012:stream/connect-contact-a3d73b84-ce0e-479a-a9dc-5637c9d30ac9/1565272947806" } } }, "PreviousContactId": "5ca32fbd-8f92-46af-92a5-6b0f970f0efe", "Queue": { "ARN": "arn:aws:connect:eu-central-1:123456789012:instance/9308c2a1-9bc6-4cea-8290-6c0b4a6d38fa/queue/5cba7cbf-1ecb-4b6d-b8bd-fe91079b3fc8", "Name": "QueueOne" }, "SystemEndpoint": { "Address": "+11234567890", "Type": "TELEPHONE_NUMBER" } }, "Parameters": { "ParameterOne": "One", "ParameterTwo": "Two" } } }

DynamoDB Streams

The DynamoDB data class utility provides the base class for DynamoDBStreamEvent, as well as enums for stream view type (StreamViewType) and event type. (DynamoDBRecordEventName). The class automatically deserializes DynamoDB types into their equivalent Python types.

app.pyapp_multiple_records.pyDynamoDB Streams Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( DynamoDBRecordEventName, DynamoDBStreamEvent, ) def lambda_handler(event, context): event: DynamoDBStreamEvent = DynamoDBStreamEvent(event) # Multiple records can be delivered in a single event for record in event.records: if record.event_name == DynamoDBRecordEventName.MODIFY: pass elif record.event_name == DynamoDBRecordEventName.INSERT: pass return "success"
1 2 3 4 5 6 7 8 9 10 11 12 13 from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=DynamoDBStreamEvent) def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): processed_keys = [] for record in event.records: if record.dynamodb and record.dynamodb.keys and "Id" in record.dynamodb.keys: key = record.dynamodb.keys["Id"] processed_keys.append(key) return {"statusCode": 200, "body": f"Processed keys: {processed_keys}"}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 { "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "ApproximateCreationDateTime": 1693997155.0, "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "eventsource_arn", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "source_arn", "eventSource": "aws:dynamodb" } ] }

EventBridge

When an event matching a defined rule occurs in EventBridge, it can automatically trigger a Lambda function, passing the event data as input.

app.pyEventBridge Example Event

from aws_lambda_powertools.utilities.data_classes import EventBridgeEvent, event_source @event_source(data_class=EventBridgeEvent) def lambda_handler(event: EventBridgeEvent, context): detail_type = event.detail_type state = event.detail.get("state") # Do something return {"detail_type": detail_type, "state": state}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { "version": "0", "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718", "detail-type": "EC2 Instance State-change Notification", "source": "aws.ec2", "account": "111122223333", "time": "2017-12-22T18:43:48Z", "region": "us-west-1", "resources": [ "arn:aws:ec2:us-west-1:123456789012:instance/i-1234567890abcdef0" ], "detail": { "instance_id": "i-1234567890abcdef0", "state": "terminated" }, "replay-name": "replay_archive" }

Kafka

This example is based on the AWS docs for Amazon MSK and self-managed Apache Kafka.

app.pyKafka Example Event

1 2 3 4 5 6 7 8 9 10 11 12 from aws_lambda_powertools.utilities.data_classes import KafkaEvent, event_source def do_something_with(key: str, value: str): print(f"key: {key}, value: {value}") @event_source(data_class=KafkaEvent) def lambda_handler(event: KafkaEvent, context): for record in event.records: do_something_with(record.topic, record.value) return "success"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 { "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"cmVjb3JkS2V5", "value":"eyJrZXkiOiJ2YWx1ZSJ9", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] }, { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"eyJrZXkiOiJ2YWx1ZSJ9", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] }, { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key": null, "value":"eyJrZXkiOiJ2YWx1ZSJ9", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Kinesis streams

Kinesis events by default contain base64 encoded data. You can use the helper function to access the data either as json or plain text, depending on the original payload.

app.pyKinesis streams Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import json from typing import Any, Dict, Union from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import KinesisStreamEvent, event_source from aws_lambda_powertools.utilities.typing import LambdaContext logger = Logger() @event_source(data_class=KinesisStreamEvent) def lambda_handler(event: KinesisStreamEvent, context: LambdaContext): for record in event.records: kinesis_record = record.kinesis payload: Union[Dict[str, Any], str] try: # Try to parse as JSON first payload = kinesis_record.data_as_json() logger.info("Received JSON data from Kinesis") except json.JSONDecodeError: # If JSON parsing fails, get as text payload = kinesis_record.data_as_text() logger.info("Received text data from Kinesis") process_data(payload) return {"statusCode": 200, "body": "Processed all records successfully"} def process_data(data: Union[Dict[str, Any], str]) -> None: if isinstance(data, dict): # Handle JSON data logger.info(f"Processing JSON data: {data}") # Add your JSON processing logic here else: # Handle text data logger.info(f"Processing text data: {data}") # Add your text processing logic here
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Kinesis Firehose delivery stream

When using Kinesis Firehose, you can use a Lambda function to perform data transformation. For each transformed record, you can choose to either:

To do that, you can use KinesisFirehoseDataTransformationResponse class along with helper functions to make it easier to decode and encode base64 data in the stream.

Transforming streaming recordsDropping invalid recordsIndicating a processing failurekinesisFirehoseEvent.json

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, event_source, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=KinesisFirehoseEvent) def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): result = KinesisFirehoseDataTransformationResponse() for record in event.records: # get original data using data_as_text property data = record.data_as_text # (1)! ## generate data to return transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data} processed_record = record.build_data_transformation_response( data=base64_from_json(transformed_data), # (2)! ) result.add_record(processed_record) # return transformed records return result.asdict()
  1. Ingesting JSON payloads?
    Use record.data_as_json to easily deserialize them.
  2. For your convenience, base64_from_json serializes a dict to JSON, then encode as base64 data.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from json import JSONDecodeError from typing import Dict from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, event_source, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=KinesisFirehoseEvent) def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): result = KinesisFirehoseDataTransformationResponse() for record in event.records: try: payload: Dict = record.data_as_json # decodes and deserialize base64 JSON string ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) except JSONDecodeError: # (1)! # our producers ingest JSON payloads only; drop malformed records from the stream processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=record.data, result="Dropped", ) result.add_record(processed_record) # return transformed records return result.asdict()
  1. This exception would be generated from record.data_as_json if invalid payload.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, event_source, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=KinesisFirehoseEvent) def lambda_handler(event: dict, context: LambdaContext): firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() for record in firehose_event.records: try: payload = record.data_as_text # base64 decoded data as str # generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} # Default result is Ok processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) except Exception: # add Failed result to processing results, send back to kinesis for retry processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=record.data, result="ProcessingFailed", # (1)! ) result.add_record(processed_record) # return transformed records return result.asdict()
  1. This record will now be sent to your S3 bucket in the processing-failed folder.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 { "invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a", "sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source", "deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name", "region": "us-east-2", "records": [ { "data": "SGVsbG8gV29ybGQ=", "recordId": "record1", "approximateArrivalTimestamp": 1664028820148, "kinesisRecordMetadata": { "shardId": "shardId-000000000000", "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a", "approximateArrivalTimestamp": 1664028820148, "sequenceNumber": "49546986683135544286507457936321625675700192471156785154", "subsequenceNumber": 0 } }, { "data": "eyJIZWxsbyI6ICJXb3JsZCJ9", "recordId": "record2", "approximateArrivalTimestamp": 1664028793294, "kinesisRecordMetadata": { "shardId": "shardId-000000000001", "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a", "approximateArrivalTimestamp": 1664028793294, "sequenceNumber": "49546986683135544286507457936321625675700192471156785155", "subsequenceNumber": 0 } } ] }

Lambda Function URL

Lambda Function URLs provide a direct HTTP endpoint for invoking Lambda functions. This feature allows functions to receive and process HTTP requests without the need for additional services like API Gateway.

app.pyLambda Function URL Example Event

from aws_lambda_powertools.utilities.data_classes import LambdaFunctionUrlEvent, event_source @event_source(data_class=LambdaFunctionUrlEvent) def lambda_handler(event: LambdaFunctionUrlEvent, context): if event.request_context.http.method == "GET": return {"statusCode": 200, "body": "Hello World!"}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 { "version":"2.0", "routeKey":"$default", "rawPath":"/", "rawQueryString":"", "headers":{ "sec-fetch-mode":"navigate", "x-amzn-tls-version":"TLSv1.2", "sec-fetch-site":"cross-site", "accept-language":"pt-BR,pt;q=0.9", "x-forwarded-proto":"https", "x-forwarded-port":"443", "x-forwarded-for":"123.123.123.123", "sec-fetch-user":"?1", "accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "x-amzn-tls-cipher-suite":"ECDHE-RSA-AES128-GCM-SHA256", "sec-ch-ua":"\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"102\", \"Google Chrome\";v=\"102\"", "sec-ch-ua-mobile":"?0", "x-amzn-trace-id":"Root=1-62ecd163-5f302e550dcde3b12402207d", "sec-ch-ua-platform":"\"Linux\"", "host":".lambda-url.us-east-1.on.aws", "upgrade-insecure-requests":"1", "cache-control":"max-age=0", "accept-encoding":"gzip, deflate, br", "sec-fetch-dest":"document", "user-agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36" }, "requestContext":{ "accountId":"anonymous", "apiId":"", "domainName":".lambda-url.us-east-1.on.aws", "domainPrefix":"", "http":{ "method":"GET", "path":"/", "protocol":"HTTP/1.1", "sourceIp":"123.123.123.123", "userAgent":"agent" }, "requestId":"id", "routeKey":"$default", "stage":"$default", "time":"05/Aug/2022:08:14:39 +0000", "timeEpoch":1659687279885 }, "isBase64Encoded":false }

Rabbit MQ

It is used for Rabbit MQ payloads. See the blog postfor more details.

app.pyRabbit MQ Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from typing import Dict from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent logger = Logger() @event_source(data_class=RabbitMQEvent) def lambda_handler(event: RabbitMQEvent, context): for queue_name, messages in event.rmq_messages_by_queue.items(): logger.debug(f"Messages for queue: {queue_name}") for message in messages: logger.debug(f"MessageID: {message.basic_properties.message_id}") data: Dict = message.json_data logger.debug(f"Process json in base64 encoded data str {data}") return { "queue_name": queue_name, "message_id": message.basic_properties.message_id, }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 { "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "rmqMessagesByQueue": { "pizzaQueue::/": [ { "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ] } }

S3

Integration with Amazon S3 enables automatic, serverless processing of object-level events in S3 buckets. When triggered by actions like object creation or deletion, Lambda functions receive detailed event information, allowing for real-time file processing, data transformations, and automated workflows.

app.pyS3 Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from urllib.parse import unquote_plus from aws_lambda_powertools.utilities.data_classes import S3Event, event_source @event_source(data_class=S3Event) def lambda_handler(event: S3Event, context): bucket_name = event.bucket_name # Multiple records can be delivered in a single event for record in event.records: object_key = unquote_plus(record.s3.get_object.key) object_etag = record.s3.get_object.etag return { "bucket": bucket_name, "object_key": object_key, "object_etag": object_etag, }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 { "Records": [ { "eventVersion": "2.1", "eventSource": "aws:s3", "awsRegion": "us-east-2", "eventTime": "2019-09-03T19:37:27.192Z", "eventName": "ObjectCreated:Put", "userIdentity": { "principalId": "AWS:AIDAINPONIXQXHT3IKHL2" }, "requestParameters": { "sourceIPAddress": "205.255.255.255" }, "responseElements": { "x-amz-request-id": "D82B88E5F771F645", "x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=" }, "s3": { "s3SchemaVersion": "1.0", "configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1", "bucket": { "name": "lambda-artifacts-deafc19498e3f2df", "ownerIdentity": { "principalId": "A3I5XTEXAMAI3E" }, "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df" }, "object": { "key": "b21b84d653bb07b05b1e6b33684dc11b", "size": 1305107, "eTag": "b21b84d653bb07b05b1e6b33684dc11b", "sequencer": "0C0F6F405D6ED209E1" } } } ] }

S3 Batch Operations

This example is based on the AWS S3 Batch Operations documentation Example Lambda function for S3 Batch Operations.

app.pyS3 Batch Operations Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import boto3 from botocore.exceptions import ClientError from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent, S3BatchOperationResponse, event_source from aws_lambda_powertools.utilities.typing import LambdaContext @event_source(data_class=S3BatchOperationEvent) def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure") task = event.task src_key: str = task.s3_key src_bucket: str = task.s3_bucket s3 = boto3.client("s3", region_name="us-east-1") try: dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") except ClientError as e: error_code = e.response["Error"]["Code"] error_message = e.response["Error"]["Message"] if error_code == "RequestTimeout": result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.") else: result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") except Exception as e: result = task.build_task_batch_response("PermanentFailure", str(e)) finally: response.add_result(result) return response.asdict() def do_some_work(s3_client, src_bucket: str, src_key: str): ...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 { "invocationSchemaVersion": "2.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce", "userArguments": { "k1": "v1", "k2": "v2" } }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "prefix/dataset/dataset.20231222.json.gz", "s3VersionId": null, "s3Bucket": "powertools-dataset" } ] }

S3 Object Lambda

This example is based on the AWS Blog post Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3.

app.pyS3 Object Lambda Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import boto3 import requests from aws_lambda_powertools import Logger from aws_lambda_powertools.logging.correlation_paths import S3_OBJECT_LAMBDA from aws_lambda_powertools.utilities.data_classes.s3_object_event import S3ObjectLambdaEvent logger = Logger() session = boto3.session.Session() s3 = session.client("s3") @logger.inject_lambda_context(correlation_id_path=S3_OBJECT_LAMBDA, log_event=True) def lambda_handler(event, context): event = S3ObjectLambdaEvent(event) # Get object from S3 response = requests.get(event.input_s3_url) original_object = response.content.decode("utf-8") # Make changes to the object about to be returned transformed_object = original_object.upper() # Write object back to S3 Object Lambda s3.write_get_object_response( Body=transformed_object, RequestRoute=event.request_route, RequestToken=event.request_token, ) return {"status_code": 200}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 { "xAmzRequestId": "1a5ed718-5f53-471d-b6fe-5cf62d88d02a", "getObjectContext": { "inputS3Url": "https://myap-123412341234.s3-accesspoint.us-east-1.amazonaws.com/s3.txt?X-Amz-Security-Token=...", "outputRoute": "io-iad-cell001", "outputToken": "..." }, "configuration": { "accessPointArn": "arn:aws:s3-object-lambda:us-east-1:123412341234:accesspoint/myolap", "supportingAccessPointArn": "arn:aws:s3:us-east-1:123412341234:accesspoint/myap", "payload": "test" }, "userRequest": { "url": "/s3.txt", "headers": { "Host": "myolap-123412341234.s3-object-lambda.us-east-1.amazonaws.com", "Accept-Encoding": "identity", "X-Amz-Content-SHA256": "e3b0c44297fc1c149afbf4c8995fb92427ae41e4649b934ca495991b7852b855" } }, "userIdentity": { "type": "IAMUser", "principalId": "...", "arn": "arn:aws:iam::123412341234:user/myuser", "accountId": "123412341234", "accessKeyId": "..." }, "protocolVersion": "1.00" }

S3 EventBridge Notification

S3 EventBridge notifications enhance Lambda's ability to process S3 events by routing them through Amazon EventBridge. This integration offers advanced filtering, multiple destination support, and standardized CloudEvents format.

app.pyS3 EventBridge Notification Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 from aws_lambda_powertools.utilities.data_classes import S3EventBridgeNotificationEvent, event_source @event_source(data_class=S3EventBridgeNotificationEvent) def lambda_handler(event: S3EventBridgeNotificationEvent, context): bucket_name = event.detail.bucket.name file_key = event.detail.object.key if event.detail_type == "Object Created": print(f"Object {file_key} created in bucket {bucket_name}") return { "bucket": bucket_name, "file_key": file_key, }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 { "version": "0", "id": "f5f1e65c-dc3a-93ca-6c1e-b1647eac7963", "detail-type": "Object Created", "source": "aws.s3", "account": "123456789012", "time": "2023-03-08T17:50:14Z", "region": "eu-west-1", "resources": [ "arn:aws:s3:::example-bucket" ], "detail": { "version": "0", "bucket": { "name": "example-bucket" }, "object": { "key": "IMG_m7fzo3.jpg", "size": 184662, "etag": "4e68adba0abe2dc8653dc3354e14c01d", "sequencer": "006408CAD69598B05E" }, "request-id": "57H08PA84AB1JZW0", "requester": "123456789012", "source-ip-address": "34.252.34.74", "reason": "PutObject" } }

Secrets Manager

AWS Secrets Manager rotation uses an AWS Lambda function to update the secret. Click here for more information about rotating AWS Secrets Manager secrets.

app.pySecrets Manager Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from aws_lambda_powertools.utilities import parameters from aws_lambda_powertools.utilities.data_classes import SecretsManagerEvent, event_source secrets_provider = parameters.SecretsProvider() @event_source(data_class=SecretsManagerEvent) def lambda_handler(event: SecretsManagerEvent, context): # Getting secret value using Parameter utility # See https://docs.powertools.aws.dev/lambda/python/latest/utilities/parameters/ secret = secrets_provider.get(event.secret_id, VersionId=event.version_id, VersionStage="AWSCURRENT") # You need to work with secrets afterwards # Check more examples: https://github.com/aws-samples/aws-secrets-manager-rotation-lambdas return secret
{ "SecretId":"arn:aws:secretsmanager:us-west-2:123456789012:secret:MyTestDatabaseSecret-a1b2c3", "ClientRequestToken":"550e8400-e29b-41d4-a716-446655440000", "Step":"createSecret" }

SES

The integration with Simple Email Service (SES) enables serverless email processing. When configured, SES can trigger Lambda functions in response to incoming emails or delivery status notifications. The Lambda function receives an SES event containing details like sender, recipients, and email content.

app.pySES Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 from aws_lambda_powertools.utilities.data_classes import SESEvent, event_source @event_source(data_class=SESEvent) def lambda_handler(event: SESEvent, context): # Multiple records can be delivered in a single event for record in event.records: mail = record.ses.mail common_headers = mail.common_headers return { "mail": mail, "common_headers": common_headers, }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 { "Records": [ { "eventVersion": "1.0", "ses": { "mail": { "commonHeaders": { "from": [ "Jane Doe janedoe@example.com" ], "to": [ "johndoe@example.com" ], "returnPath": "janedoe@example.com", "messageId": "<0123456789example.com>", "date": "Wed, 7 Oct 2015 12:34:56 -0700", "subject": "Test Subject" }, "source": "janedoe@example.com", "timestamp": "1970-01-01T00:00:00.000Z", "destination": [ "johndoe@example.com" ], "headers": [ { "name": "Return-Path", "value": "janedoe@example.com" }, { "name": "Received", "value": "from mailer.example.com (mailer.example.com [203.0.113.1]) by ..." }, { "name": "DKIM-Signature", "value": "v=1; a=rsa-sha256; c=relaxed/relaxed; d=example.com; s=example; ..." }, { "name": "MIME-Version", "value": "1.0" }, { "name": "From", "value": "Jane Doe janedoe@example.com" }, { "name": "Date", "value": "Wed, 7 Oct 2015 12:34:56 -0700" }, { "name": "Message-ID", "value": "<0123456789example.com>" }, { "name": "Subject", "value": "Test Subject" }, { "name": "To", "value": "johndoe@example.com" }, { "name": "Content-Type", "value": "text/plain; charset=UTF-8" } ], "headersTruncated": false, "messageId": "o3vrnil0e2ic28tr" }, "receipt": { "recipients": [ "johndoe@example.com" ], "timestamp": "1970-01-01T00:00:00.000Z", "spamVerdict": { "status": "PASS" }, "dkimVerdict": { "status": "PASS" }, "dmarcPolicy": "reject", "processingTimeMillis": 574, "action": { "type": "Lambda", "invocationType": "Event", "functionArn": "arn:aws:lambda:us-west-2:012345678912:function:Example" }, "dmarcVerdict": { "status": "PASS" }, "spfVerdict": { "status": "PASS" }, "virusVerdict": { "status": "PASS" } } }, "eventSource": "aws:ses" } ] }

SNS

The integration with Simple Notification Service (SNS) enables serverless message processing. When configured, SNS can trigger Lambda functions in response to published messages or notifications. The Lambda function receives an SNS event containing details like the message body, subject, and metadata.

app.pySNS Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 from aws_lambda_powertools.utilities.data_classes import SNSEvent, event_source @event_source(data_class=SNSEvent) def lambda_handler(event: SNSEvent, context): # Multiple records can be delivered in a single event for record in event.records: message = record.sns.message subject = record.sns.subject return { "message": message, "subject": subject, }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 { "Records": [ { "EventVersion": "1.0", "EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:sns-la ...", "EventSource": "aws:sns", "Sns": { "SignatureVersion": "1", "Timestamp": "2019-01-02T12:45:07.000Z", "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==", "SigningCertUrl": "https://sns.us-east-2.amazonaws.com/SimpleNotification", "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", "Message": "Hello from SNS!", "MessageAttributes": { "Test": { "Type": "String", "Value": "TestString" }, "TestBinary": { "Type": "Binary", "Value": "TestBinary" } }, "Type": "Notification", "UnsubscribeUrl": "https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe", "TopicArn": "arn:aws:sns:us-east-2:123456789012:sns-lambda", "Subject": "TestInvoke" } } ] }

SQS

The integration with Simple Queue Service (SQS) enables serverless queue processing. When configured, SQS can trigger Lambda functions in response to messages in the queue. The Lambda function receives an SQS event containing details like message body, attributes, and metadata.

app.pySQS Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from aws_lambda_powertools.utilities.data_classes import SQSEvent, SQSRecord, event_source @event_source(data_class=SQSEvent) def lambda_handler(event: SQSEvent, context): # Multiple records can be delivered in a single event for record in event.records: message, message_id = process_record(record) return { "message": message, "message_id": message_id, } def process_record(record: SQSRecord): message = record.body message_id = record.message_id return message, message_id
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 { "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": { "testAttr": { "stringValue": "100", "binaryValue": "base64Str", "dataType": "Number" } }, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "{\"message\": \"foo1\"}", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

VPC Lattice V2

You can register your Lambda functions as targets within an Amazon VPC Lattice service network. By doing this, your Lambda function becomes a service within the network, and clients that have access to the VPC Lattice service network can call your service using Payload V2.

Click here for more information about using AWS Lambda with Amazon VPC Lattice.

app.pyLattice Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import VPCLatticeEventV2, event_source from aws_lambda_powertools.utilities.typing import LambdaContext logger = Logger() @event_source(data_class=VPCLatticeEventV2) def lambda_handler(event: VPCLatticeEventV2, context: LambdaContext): logger.info(event.body) response = { "isBase64Encoded": False, "statusCode": 200, "statusDescription": "200 OK", "headers": {"Content-Type": "application/text"}, "body": "VPC Lattice V2 Event ✨🎉✨", } return response
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 { "version": "2.0", "path": "/todos", "method": "GET", "headers": { "user_agent": "curl/7.64.1", "x-forwarded-for": "10.213.229.10", "host": "test-lambda-service-3908sdf9u3u.dkfjd93.vpc-lattice-svcs.us-east-2.on.aws", "accept": "*/*" }, "queryStringParameters": { "order-id": "1" }, "body": "{\"message\": \"Hello from Lambda!\"}", "requestContext": { "serviceNetworkArn": "arn:aws:vpc-lattice:us-east-2:123456789012:servicenetwork/sn-0bf3f2882e9cc805a", "serviceArn": "arn:aws:vpc-lattice:us-east-2:123456789012:service/svc-0a40eebed65f8d69c", "targetGroupArn": "arn:aws:vpc-lattice:us-east-2:123456789012:targetgroup/tg-6d0ecf831eec9f09", "identity": { "sourceVpcArn": "arn:aws:ec2:region:123456789012:vpc/vpc-0b8276c84697e7339", "type" : "AWS_IAM", "principal": "arn:aws:sts::123456789012:assumed-role/example-role/057d00f8b51257ba3c853a0f248943cf", "sessionName": "057d00f8b51257ba3c853a0f248943cf", "x509SanDns": "example.com" }, "region": "us-east-2", "timeEpoch": "1696331543569073" } }

VPC Lattice V1

You can register your Lambda functions as targets within an Amazon VPC Lattice service network. By doing this, your Lambda function becomes a service within the network, and clients that have access to the VPC Lattice service network can call your service.

Click here for more information about using AWS Lambda with Amazon VPC Lattice.

app.pyLattice Example Event

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.data_classes import VPCLatticeEvent, event_source from aws_lambda_powertools.utilities.typing import LambdaContext logger = Logger() @event_source(data_class=VPCLatticeEvent) def lambda_handler(event: VPCLatticeEvent, context: LambdaContext): logger.info(event.body) response = { "isBase64Encoded": False, "statusCode": 200, "headers": {"Content-Type": "application/text"}, "body": "Event Response to VPC Lattice 🔥🚀🔥", } return response
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 { "raw_path": "/testpath", "method": "GET", "headers": { "user_agent": "curl/7.64.1", "x-forwarded-for": "10.213.229.10", "host": "test-lambda-service-3908sdf9u3u.dkfjd93.vpc-lattice-svcs.us-east-2.on.aws", "accept": "*/*" }, "query_string_parameters": { "order-id": "1" }, "body": "eyJ0ZXN0IjogImV2ZW50In0=", "is_base64_encoded": true }

IoT Core Events

IoT Core Thing Created/Updated/Deleted

You can use IoT Core registry events to trigger your lambda functions. More information on this specific one can be found here.

app.pyExample Event

from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.iot_registry_event import IoTCoreThingEvent @event_source(data_class=IoTCoreThingEvent) def lambda_handler(event: IoTCoreThingEvent, context): print(f"Received IoT Core event type {event.event_type}")
1 2 3 4 5 6 7 8 9 10 11 12 { "eventType": "THING_EVENT", "eventId": "f5ae9b94-8b8e-4d8e-8c8f-b3266dd89853", "timestamp": 1234567890123, "operation": "CREATED", "accountId": "123456789012", "thingId": "b604f69c-aa9a-4d4a-829e-c480e958a0b5", "thingName": "MyThing", "versionNumber": 1, "thingTypeName": null, "attributes": {"attribute3": "value3", "attribute1": "value1", "attribute2": "value2"} }

IoT Core Thing Type Created/Updated/Deprecated/Undeprecated/Deleted

You can use IoT Core registry events to trigger your lambda functions. More information on this specific one can be found here.

app.pyExample Event

from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.iot_registry_event import IoTCoreThingTypeEvent @event_source(data_class=IoTCoreThingTypeEvent) def lambda_handler(event: IoTCoreThingTypeEvent, context): print(f"Received IoT Core event type {event.event_type}")
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { "eventType": "THING_TYPE_EVENT", "eventId": "8827376c-4b05-49a3-9b3b-733729df7ed5", "timestamp": 1234567890123, "operation": "CREATED", "accountId": "123456789012", "thingTypeId": "c530ae83-32aa-4592-94d3-da29879d1aac", "thingTypeName": "MyThingType", "isDeprecated": false, "deprecationDate": null, "searchableAttributes": ["attribute1", "attribute2", "attribute3"], "propagatingAttributes": [ {"userPropertyKey": "key", "thingAttribute": "model"}, {"userPropertyKey": "key", "connectionAttribute": "iot:ClientId"} ], "description": "My thing type" }

IoT Core Thing Type Associated/Disassociated with a Thing

You can use IoT Core registry events to trigger your lambda functions. More information on this specific one can be found here.

app.pyExample Event

from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.iot_registry_event import IoTCoreThingTypeAssociationEvent @event_source(data_class=IoTCoreThingTypeAssociationEvent) def lambda_handler(event: IoTCoreThingTypeAssociationEvent, context): print(f"Received IoT Core event type {event.event_type}")
{ "eventId": "87f8e095-531c-47b3-aab5-5171364d138d", "eventType": "THING_TYPE_ASSOCIATION_EVENT", "operation": "ADDED", "thingId": "b604f69c-aa9a-4d4a-829e-c480e958a0b5", "thingName": "myThing", "thingTypeName": "MyThingType", "timestamp": 1234567890123 }

IoT Core Thing Group Created/Updated/Deleted

You can use IoT Core registry events to trigger your lambda functions. More information on this specific one can be found here.

app.pyExample Event

from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.iot_registry_event import IoTCoreThingGroupEvent @event_source(data_class=IoTCoreThingGroupEvent) def lambda_handler(event: IoTCoreThingGroupEvent, context): print(f"Received IoT Core event type {event.event_type}")
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 { "eventType": "THING_GROUP_EVENT", "eventId": "8b9ea8626aeaa1e42100f3f32b975899", "timestamp": 1603995417409, "operation": "UPDATED", "accountId": "571EXAMPLE833", "thingGroupId": "8757eec8-bb37-4cca-a6fa-403b003d139f", "thingGroupName": "Tg_level5", "versionNumber": 3, "parentGroupName": "Tg_level4", "parentGroupId": "5fce366a-7875-4c0e-870b-79d8d1dce119", "description": "New description for Tg_level5", "rootToParentThingGroups": [ { "groupArn": "arn:aws:iot:us-west-2:571EXAMPLE833:thinggroup/TgTopLevel", "groupId": "36aa0482-f80d-4e13-9bff-1c0a75c055f6" }, { "groupArn": "arn:aws:iot:us-west-2:571EXAMPLE833:thinggroup/Tg_level1", "groupId": "bc1643e1-5a85-4eac-b45a-92509cbe2a77" }, { "groupArn": "arn:aws:iot:us-west-2:571EXAMPLE833:thinggroup/Tg_level2", "groupId": "0476f3d2-9beb-48bb-ae2c-ea8bd6458158" }, { "groupArn": "arn:aws:iot:us-west-2:571EXAMPLE833:thinggroup/Tg_level3", "groupId": "1d9d4ffe-a6b0-48d6-9de6-2e54d1eae78f" }, { "groupArn": "arn:aws:iot:us-west-2:571EXAMPLE833:thinggroup/Tg_level4", "groupId": "5fce366a-7875-4c0e-870b-79d8d1dce119" } ], "attributes": {"attribute1": "value1", "attribute3": "value3", "attribute2": "value2"}, "dynamicGroupMappingId": null }

IoT Thing Added/Removed from Thing Group

You can use IoT Core registry events to trigger your lambda functions. More information on this specific one can be found here.

app.pyExample Event

from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.iot_registry_event import IoTCoreAddOrRemoveFromThingGroupEvent @event_source(data_class=IoTCoreAddOrRemoveFromThingGroupEvent) def lambda_handler(event: IoTCoreAddOrRemoveFromThingGroupEvent, context): print(f"Received IoT Core event type {event.event_type}")
1 2 3 4 5 6 7 8 9 10 11 12 { "eventType": "THING_GROUP_MEMBERSHIP_EVENT", "eventId": "d684bd5f-6f6e-48e1-950c-766ac7f02fd1", "timestamp": 1234567890123, "operation": "ADDED", "accountId": "123456789012", "groupArn": "arn:aws:iot:ap-northeast-2:123456789012:thinggroup/MyChildThingGroup", "groupId": "06838589-373f-4312-b1f2-53f2192291c4", "thingArn": "arn:aws:iot:ap-northeast-2:123456789012:thing/MyThing", "thingId": "b604f69c-aa9a-4d4a-829e-c480e958a0b5", "membershipId": "8505ebf8-4d32-4286-80e9-c23a4a16bbd8" }

IoT Child Group Added/Deleted from Parent Group

You can use IoT Core registry events to trigger your lambda functions. More information on this specific one can be found here.

app.pyExample Event

from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.iot_registry_event import IoTCoreAddOrDeleteFromThingGroupEvent @event_source(data_class=IoTCoreAddOrDeleteFromThingGroupEvent) def lambda_handler(event: IoTCoreAddOrDeleteFromThingGroupEvent, context): print(f"Received IoT Core event type {event.event_type}")
{ "eventType": "THING_GROUP_HIERARCHY_EVENT", "eventId": "264192c7-b573-46ef-ab7b-489fcd47da41", "timestamp": 1234567890123, "operation": "ADDED", "accountId": "123456789012", "thingGroupId": "8f82a106-6b1d-4331-8984-a84db5f6f8cb", "thingGroupName": "MyRootThingGroup", "childGroupId": "06838589-373f-4312-b1f2-53f2192291c4", "childGroupName": "MyChildThingGroup" }

Advanced

Debugging

Alternatively, you can print out the fields to obtain more information. All classes come with a __str__ method that generates a dictionary string which can be quite useful for debugging.

However, certain events may contain sensitive fields such as secret_access_key and session_token, which are labeled as [SENSITIVE] to prevent any accidental disclosure of confidential information.

If we fail to deserialize a field value (e.g., JSON), they will appear as [Cannot be deserialized]

debugging.pydebugging_event.jsondebugging_output.json

from aws_lambda_powertools.utilities.data_classes import ( CodePipelineJobEvent, event_source, ) @event_source(data_class=CodePipelineJobEvent) def lambda_handler(event, context): print(event)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 { "CodePipeline.job": { "id": "11111111-abcd-1111-abcd-111111abcdef", "accountId": "111111111111", "data": { "actionConfiguration": { "configuration": { "FunctionName": "MyLambdaFunctionForAWSCodePipeline", "UserParameters": "some-input-such-as-a-URL" } }, "inputArtifacts": [ { "name": "ArtifactName", "revision": null, "location": { "type": "S3", "s3Location": { "bucketName": "the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890", "objectKey": "the name of the application, for example CodePipelineDemoApplication.zip" } } } ], "outputArtifacts": [], "artifactCredentials": { "accessKeyId": "AKIAIOSFODNN7EXAMPLE", "secretAccessKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "sessionToken": "MIICiTCCAfICCQD6m7oRw0uXOjANBgkqhkiG9w0BAQUFADCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wHhcNMTEwNDI1MjA0NTIxWhcNMTIwNDI0MjA0NTIxWjCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMaK0dn+a4GmWIWJ21uUSfwfEvySWtC2XADZ4nB+BLYgVIk60CpiwsZ3G93vUEIO3IyNoH/f0wYK8m9TrDHudUZg3qX4waLG5M43q7Wgc/MbQITxOUSQv7c7ugFFDzQGBzZswY6786m86gpEIbb3OhjZnzcvQAaRHhdlQWIMm2nrAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAtCu4nUhVVxYUntneD9+h8Mg9q6q+auNKyExzyLwaxlAoo7TJHidbtS4J5iNmZgXL0FkbFFBjvSfpJIlJ00zbhNYS5f6GuoEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTbNYiytVbZPQUQ5Yaxu2jXnimvw3rrszlaEXAMPLE=" }, "continuationToken": "A continuation token if continuing job" } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 { "account_id":"111111111111", "data":{ "action_configuration":{ "configuration":{ "decoded_user_parameters":"[Cannot be deserialized]", "function_name":"MyLambdaFunctionForAWSCodePipeline", "raw_event":"[SENSITIVE]", "user_parameters":"some-input-such-as-a-URL" }, "raw_event":"[SENSITIVE]" }, "artifact_credentials":{ "access_key_id":"AKIAIOSFODNN7EXAMPLE", "expiration_time":"None", "raw_event":"[SENSITIVE]", "secret_access_key":"[SENSITIVE]", "session_token":"[SENSITIVE]" }, "continuation_token":"A continuation token if continuing job", "encryption_key":"None", "input_artifacts":[ { "location":{ "get_type":"S3", "raw_event":"[SENSITIVE]", "s3_location":{ "bucket_name":"the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890", "key":"the name of the application, for example CodePipelineDemoApplication.zip", "object_key":"the name of the application, for example CodePipelineDemoApplication.zip", "raw_event":"[SENSITIVE]" } }, "name":"ArtifactName", "raw_event":"[SENSITIVE]", "revision":"None" } ], "output_artifacts":[ ], "raw_event":"[SENSITIVE]" }, "decoded_user_parameters":"[Cannot be deserialized]", "get_id":"11111111-abcd-1111-abcd-111111abcdef", "input_bucket_name":"the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890", "input_object_key":"the name of the application, for example CodePipelineDemoApplication.zip", "raw_event":"[SENSITIVE]", "user_parameters":"some-input-such-as-a-URL" }