Jekyll SEO checklist for GitHub Pages - Latest Updates
Jekyll SEO checklist for GitHub Pages
March 27, 2026
Serverless computing has revolutionized how developers build and deploy applications in the cloud. AWS Serverless services enable you to focus on writing code without worrying about infrastructure management, scaling, or server maintenance. This comprehensive guide explores the AWS Serverless ecosystem and how you can leverage it to build robust, scalable applications.
AWS Serverless is a collection of cloud services that allow you to run applications and manage infrastructure without provisioning or managing servers. The key benefits include:
| Aspect | Traditional | Serverless |
|---|---|---|
| Infrastructure | Manual provisioning | Automatic scaling |
| Cost Model | Fixed monthly cost | Pay-per-use |
| Maintenance | OS patches, security updates | AWS handles maintenance |
| Scaling | Manual or auto-scaling groups | Automatic, event-driven |
| Availability | Manual failover setup | Built-in multi-AZ |
AWS Lambda is the cornerstone of serverless computing on AWS. It allows you to run code in response to events without provisioning servers.
Key Features:
Technical Specifications:
Common Use Cases:
Lambda Execution Context:
import json
import boto3
import os
# Initialize outside handler for connection reuse
s3_client = boto3.client('s3')
dynamodb_resource = boto3.resource('dynamodb')
# Global variables persist across invocations
counter = 0
def lambda_handler(event, context):
"""
Advanced Lambda function with connection pooling and error handling
"""
global counter
counter += 1
try:
# Extract request data
http_method = event.get('httpMethod', 'GET')
path_parameters = event.get('pathParameters', {})
query_parameters = event.get('queryStringParameters', {})
body = json.loads(event.get('body', '{}')) if event.get('body') else {}
# Business logic
result = process_request(http_method, path_parameters, query_parameters, body)
# Response format
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type,X-Amz-Date,Authorization,X-Api-Key'
},
'body': json.dumps({
'message': 'Success',
'data': result,
'invocation_count': counter,
'request_id': context.aws_request_id,
'remaining_time': context.get_remaining_time_in_millis()
})
}
except Exception as e:
# Structured error logging
error_details = {
'error': str(e),
'type': type(e).__name__,
'request_id': context.aws_request_id,
'function_name': context.function_name,
'function_version': context.function_version
}
# Log to CloudWatch
print(json.dumps(error_details))
return {
'statusCode': 500,
'body': json.dumps({
'error': 'Internal Server Error',
'request_id': context.aws_request_id
})
}
def process_request(method, path_params, query_params, body):
"""Business logic implementation"""
# Your application logic here
return {'status': 'processed', 'timestamp': str(datetime.utcnow())}
Lambda Performance Optimization:
# Optimize for cold starts
import json
from datetime import datetime
# Heavy initialization outside handler
db_connection = None
def get_db_connection():
global db_connection
if db_connection is None:
# Initialize database connection
db_connection = create_database_connection()
return db_connection
def lambda_handler(event, context):
# Fast path - reuse connections
conn = get_db_connection()
# Process request
return response
API Gateway is a fully managed service that makes it easy to create, publish, and secure APIs at any scale.
Key Features:
Technical Specifications:
Integration Options:
API Gateway Configuration Example:
# OpenAPI 3.0 specification for API Gateway
openapi: 3.0.0
info:
title: Serverless E-commerce API
version: 1.0.0
description: E-commerce backend API
paths:
/products:
get:
summary: List all products
parameters:
- name: category
in: query
schema:
type: string
- name: limit
in: query
schema:
type: integer
default: 10
responses:
'200':
description: List of products
content:
application/json:
schema:
type: object
properties:
products:
type: array
items:
$ref: '#/components/schemas/Product'
x-amazon-apigateway-integration:
type: aws_proxy
httpMethod: POST
uri: arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${GetProductsFunction.Arn}/invocations
responses:
default:
statusCode: '200'
/products/{productId}:
get:
summary: Get product by ID
parameters:
- name: productId
in: path
required: true
schema:
type: string
responses:
'200':
description: Product details
content:
application/json:
schema:
$ref: '#/components/schemas/Product'
x-amazon-apigateway-integration:
type: aws_proxy
httpMethod: POST
uri: arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${GetProductFunction.Arn}/invocations
components:
schemas:
Product:
type: object
properties:
id:
type: string
name:
type: string
price:
type: number
category:
type: string
description:
type: string
x-amazon-apigateway-request-validators:
ALL:
validateRequestBody: true
validateRequestParameters: true
PARAMS_ONLY:
validateRequestBody: false
validateRequestParameters: true
x-amazon-apigateway-gateway-responses:
BAD_REQUEST_BODY:
statusCode: 400
responseTemplates:
application/json: '{"error": "Invalid request body"}'
API Gateway Custom Domain:
import boto3
def setup_custom_domain():
client = boto3.client('apigateway')
# Create custom domain name
response = client.create_domain_name(
domainName='api.yourapp.com',
certificateArn='arn:aws:acm:us-east-1:123456789012:certificate/12345678-1234-1234-1234-123456789012',
endpointConfiguration={
'types': ['REGIONAL']
}
)
# Create base path mapping
client.create_base_path_mapping(
domainName='api.yourapp.com',
restApiId='your-api-id',
basePath='v1',
stage='prod'
)
Step Functions allows you to coordinate multiple AWS services into serverless workflows.
Key Features:
Technical Specifications:
Use Cases:
Advanced Step Functions Example:
{
"Comment": "Advanced E-commerce Order Processing Workflow",
"StartAt": "ValidateOrder",
"States": {
"ValidateOrder": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:ValidateOrder",
"Payload.$": "$"
},
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException", "Lambda.AWSLambdaException"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleValidationError",
"ResultPath": "$.error"
}
],
"Next": "CheckInventory"
},
"CheckInventory": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.inventoryAvailable",
"BooleanEquals": true,
"Next": "ProcessPayment"
}
],
"Default": "NotifyOutOfStock"
},
"ProcessPayment": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:ProcessPayment",
"Payload.$": "$"
},
"TimeoutSeconds": 30,
"HeartbeatSeconds": 10,
"Next": "UpdateInventory"
},
"UpdateInventory": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "ReserveInventory",
"States": {
"ReserveInventory": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "Inventory",
"Key": {
"productId.$": "$.productId"
},
"UpdateExpression": "SET quantity = quantity - :qty",
"ExpressionAttributeValues": {
":qty": {
"N.$": "$.quantity"
}
}
},
"End": true
}
}
},
{
"StartAt": "UpdateOrderStatus",
"States": {
"UpdateOrderStatus": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:UpdateOrderStatus",
"Payload": {
"orderId.$": "$.orderId",
"status": "PROCESSING"
}
},
"End": true
}
}
}
],
"Next": "SendConfirmation"
},
"SendConfirmation": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:123456789012:OrderNotifications",
"Message.$": "$.confirmationMessage",
"Subject": "Order Confirmation"
},
"Next": "OrderComplete"
},
"OrderComplete": {
"Type": "Succeed"
},
"NotifyOutOfStock": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:NotifyOutOfStock",
"Payload.$": "$"
},
"Next": "OrderFailed"
},
"HandleValidationError": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:LogValidationError",
"Payload": {
"error.$": "$.error",
"order.$": "$"
}
},
"Next": "OrderFailed"
},
"OrderFailed": {
"Type": "Fail",
"Cause": "Order processing failed",
"Error": "OrderProcessingError"
}
}
}
Express Workflow for High-Volume Processing:
{
"Comment": "Express workflow for real-time data processing",
"StartAt": "ProcessEvent",
"States": {
"ProcessEvent": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "EXPRESS"
},
"StartAt": "TransformData",
"States": {
"TransformData": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:TransformData",
"Payload.$": "$"
},
"End": true
}
}
},
"MaxConcurrency": 100,
"ItemSelector": {
"data.$": "$.events[*]"
},
"End": true
}
}
}
EventBridge is a serverless event bus that makes it easy to connect applications together using data from your own apps, integrated SaaS applications, and other AWS services.
Key Features:
Technical Specifications:
EventBridge Event Structure:
{
"version": "0",
"id": "12345678-1234-1234-1234-123456789012",
"detail-type": "OrderCreated",
"source": "com.myapp.orders",
"account": "123456789012",
"time": "2026-03-31T11:23:45Z",
"region": "us-east-1",
"resources": [
"arn:aws:orders:us-east-1:123456789012:order/12345"
],
"detail": {
"orderId": "12345",
"customerId": "customer-67890",
"amount": 99.99,
"items": [
{
"productId": "prod-111",
"quantity": 2,
"price": 49.99
}
],
"metadata": {
"userAgent": "Mozilla/5.0...",
"ipAddress": "192.168.1.100"
}
}
}
EventBridge Architecture Example:
import json
import boto3
from datetime import datetime
eventbridge = boto3.client('events')
lambda_client = boto3.client('lambda')
def publish_order_event(order_data):
"""Publish order creation event to EventBridge"""
event = {
'Source': 'com.myapp.orders',
'DetailType': 'OrderCreated',
'Detail': json.dumps({
'orderId': order_data['orderId'],
'customerId': order_data['customerId'],
'amount': order_data['total'],
'timestamp': datetime.utcnow().isoformat(),
'items': order_data['items']
}),
'EventBusName': 'default'
}
response = eventbridge.put_events(Entries=[event])
return response
def create_event_rules():
"""Create EventBridge rules for different event types"""
# Rule for order processing
eventbridge.put_rule(
Name='OrderProcessingRule',
EventPattern=json.dumps({
'source': ['com.myapp.orders'],
'detail-type': ['OrderCreated']
}),
State='ENABLED',
Description='Trigger order processing workflow'
)
# Add target for Step Functions
eventbridge.put_targets(
Rule='OrderProcessingRule',
Targets=[
{
'Id': 'OrderProcessingStepFunction',
'Arn': 'arn:aws:states:us-east-1:123456789012:stateMachine:OrderProcessing',
'RoleArn': 'arn:aws:iam::123456789012:role/EventBridgeStepFunctionRole',
'InputTransformer': {
'InputPathsMap': {
'order': '$.detail'
},
'InputTemplate': '{"order": <order>}'
}
}
]
)
# Rule for customer notifications
eventbridge.put_rule(
Name='CustomerNotificationRule',
EventPattern=json.dumps({
'source': ['com.myapp.orders'],
'detail-type': ['OrderCreated', 'OrderShipped', 'OrderDelivered']
}),
State='ENABLED'
)
# Add target for notification Lambda
eventbridge.put_targets(
Rule='CustomerNotificationRule',
Targets=[
{
'Id': 'NotificationLambda',
'Arn': 'arn:aws:lambda:us-east-1:123456789012:function:SendNotification',
'RoleArn': 'arn:aws:iam::123456789012:role/EventBridgeLambdaRole'
}
]
)
EventBridge Pipes Integration:
def create_event_pipe():
"""Create EventBridge Pipe for S3 to Lambda integration"""
pipes_client = boto3.client('pipes')
response = pipes_client.create_pipe(
Name='S3FileProcessingPipe',
RoleArn='arn:aws:iam::123456789012:role/EventBridgePipeRole',
Source='arn:aws:s3:::my-bucket',
SourceParameters={
'FilterCriteria': {
'Filters': [
{
'Pattern': json.dumps({
'event': ['ObjectCreated'],
's3': {
'object': {
'key': [{
'prefix': 'uploads/'
}]
}
}
})
}
]
},
'S3Parameters': {
'BucketName': 'my-bucket'
}
},
Target='arn:aws:lambda:us-east-1:123456789012:function:ProcessFile',
TargetParameters={
'InputTemplate': '{"bucket": <$.bucket>, "key": <$.object.key>, "size": <$.object.size>}'
}
)
return response
Microservices Implementation Example:
# SAM Template for Microservices Architecture
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: 'Serverless Microservices E-commerce Platform'
Globals:
Function:
Runtime: python3.9
Timeout: 30
MemorySize: 256
Environment:
Variables:
ENVIRONMENT: !Ref Environment
REGION: !Ref AWS::Region
Resources:
# API Gateway
ApiGateway:
Type: AWS::Serverless::Api
Properties:
StageName: !Ref Environment
Cors:
AllowMethods: "'*'"
AllowHeaders: "'*'"
AllowOrigin: "'*'"
Domain:
DomainName: api.ecommerce.com
CertificateArn: !Ref ApiCertificate
# User Service
UserService:
Type: AWS::Serverless::Function
Properties:
CodeUri: services/user/
Handler: app.lambda_handler
Events:
RegisterUser:
Type: Api
Properties:
RestApiId: !Ref ApiGateway
Path: /users/register
Method: post
LoginUser:
Type: Api
Properties:
RestApiId: !Ref ApiGateway
Path: /users/login
Method: post
GetUserProfile:
Type: Api
Properties:
RestApiId: !Ref ApiGateway
Path: /users/{userId}
Method: get
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref UsersTable
UsersTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub "${Environment}-users"
AttributeDefinitions:
- AttributeName: userId
AttributeType: S
- AttributeName: email
AttributeType: S
KeySchema:
- AttributeName: userId
KeyType: HASH
GlobalSecondaryIndexes:
- IndexName: EmailIndex
KeySchema:
- AttributeName: email
KeyType: HASH
Projection:
ProjectionType: ALL
BillingMode: PAY_PER_REQUEST
# Product Service
ProductService:
Type: AWS::Serverless::Function
Properties:
CodeUri: services/product/
Handler: app.lambda_handler
Events:
ListProducts:
Type: Api
Properties:
RestApiId: !Ref ApiGateway
Path: /products
Method: get
GetProduct:
Type: Api
Properties:
RestApiId: !Ref ApiGateway
Path: /products/{productId}
Method: get
Policies:
- DynamoDBReadPolicy:
TableName: !Ref ProductsTable
ProductsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub "${Environment}-products"
AttributeDefinitions:
- AttributeName: productId
AttributeType: S
- AttributeName: category
AttributeType: S
KeySchema:
- AttributeName: productId
KeyType: HASH
GlobalSecondaryIndexes:
- IndexName: CategoryIndex
KeySchema:
- AttributeName: category
KeyType: HASH
Projection:
ProjectionType: ALL
BillingMode: PAY_PER_REQUEST
# Order Service with Step Functions
OrderService:
Type: AWS::Serverless::Function
Properties:
CodeUri: services/order/
Handler: app.lambda_handler
Events:
CreateOrder:
Type: Api
Properties:
RestApiId: !Ref ApiGateway
Path: /orders
Method: post
OrderProcessingWorkflow:
Type: AWS::States::StateMachine
Properties:
DefinitionString:
!Sub
- |
{
"Comment": "Order Processing Workflow",
"StartAt": "ValidateOrder",
"States": {
"ValidateOrder": {
"Type": "Task",
"Resource": "${ValidateOrderFunction.Arn}",
"Next": "ProcessPayment"
},
"ProcessPayment": {
"Type": "Task",
"Resource": "${ProcessPaymentFunction.Arn}",
"Next": "UpdateInventory"
},
"UpdateInventory": {
"Type": "Task",
"Resource": "${UpdateInventoryFunction.Arn}",
"Next": "SendConfirmation"
},
"SendConfirmation": {
"Type": "Task",
"Resource": "${SendConfirmationFunction.Arn}",
"End": true
}
}
}
- ValidateOrderFunction: !GetAtt ValidateOrder.Arn
ProcessPaymentFunction: !GetAtt ProcessPayment.Arn
UpdateInventoryFunction: !GetAtt UpdateInventory.Arn
SendConfirmationFunction: !GetAtt SendConfirmation.Arn
Role: !Ref OrderWorkflowRole
# Event-driven Architecture
EventBus:
Type: AWS::Events::EventBus
Properties:
Name: !Sub "${Environment}-ecommerce-events"
OrderCreatedRule:
Type: AWS::Events::Rule
Properties:
EventBusName: !Ref EventBus
EventPattern:
source:
- "com.ecommerce.orders"
detail-type:
- "OrderCreated"
Targets:
- Arn: !GetAtt OrderProcessingWorkflow.Arn
RoleArn: !GetAtt EventBridgeRole.Arn
Id: "OrderWorkflowTarget"
# Lambda Layer: Database Utilities
db_utils.py
# Layer Structure
aws_lambda_layers/
├── python/
│ ├── db_utils.py
│ ├── auth_utils.py
│ ├── logging_utils.py
│ └── requirements.txt
└── layer.yaml
# db_utils.py - Shared database utilities
import boto3
import json
from typing import Dict, Any, List
from contextlib import contextmanager
class DatabaseManager:
def __init__(self, table_name: str, region: str = 'us-east-1'):
self.dynamodb = boto3.resource('dynamodb', region_name=region)
self.table = self.dynamodb.Table(table_name)
@contextmanager
def transaction(self):
"""Context manager for DynamoDB transactions"""
try:
yield
except Exception as e:
raise e
def get_item(self, key: Dict[str, Any]) -> Dict[str, Any]:
"""Get item from DynamoDB table"""
response = self.table.get_item(Key=key)
return response.get('Item', {})
def put_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""Put item to DynamoDB table"""
response = self.table.put_item(Item=item)
return response
def query_items(self, key_condition_expression: str,
expression_attribute_values: Dict[str, Any],
index_name: str = None) -> List[Dict[str, Any]]:
"""Query items from DynamoDB table"""
kwargs = {
'KeyConditionExpression': key_condition_expression,
'ExpressionAttributeValues': expression_attribute_values
}
if index_name:
kwargs['IndexName'] = index_name
response = self.table.query(**kwargs)
return response.get('Items', [])
def update_item(self, key: Dict[str, Any],
update_expression: str,
expression_attribute_values: Dict[str, Any] = None,
expression_attribute_names: Dict[str, str] = None) -> Dict[str, Any]:
"""Update item in DynamoDB table"""
kwargs = {
'Key': key,
'UpdateExpression': update_expression
}
if expression_attribute_values:
kwargs['ExpressionAttributeValues'] = expression_attribute_values
if expression_attribute_names:
kwargs['ExpressionAttributeNames'] = expression_attribute_names
response = self.table.update_item(**kwargs)
return response.get('Attributes', {})
# auth_utils.py - Shared authentication utilities
import jwt
import hashlib
import secrets
from datetime import datetime, timedelta
class AuthManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
def generate_token(self, user_id: str, expires_in: int = 3600) -> str:
"""Generate JWT token"""
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(seconds=expires_in),
'iat': datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm='HS256')
def verify_token(self, token: str) -> Dict[str, Any]:
"""Verify JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
raise Exception('Token has expired')
except jwt.InvalidTokenError:
raise Exception('Invalid token')
def hash_password(self, password: str) -> str:
"""Hash password using SHA-256"""
salt = secrets.token_hex(16)
password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
return f"{salt}:{password_hash}"
def verify_password(self, password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
salt, password_hash = hashed_password.split(':')
computed_hash = hashlib.sha256((password + salt).encode()).hexdigest()
return computed_hash == password_hash
# logging_utils.py - Shared logging utilities
import logging
import json
import traceback
from datetime import datetime
class StructuredLogger:
def __init__(self, service_name: str, log_level: str = 'INFO'):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
self.logger.setLevel(getattr(logging, log_level.upper()))
# Configure JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
self.logger.addHandler(handler)
def log_request(self, event: Dict[str, Any], context: Any):
"""Log incoming request"""
log_data = {
'event_type': 'request_start',
'service': self.service_name,
'request_id': context.aws_request_id,
'function_name': context.function_name,
'timestamp': datetime.utcnow().isoformat(),
'http_method': event.get('httpMethod'),
'path': event.get('path'),
'user_agent': event.get('headers', {}).get('User-Agent')
}
self.logger.info(json.dumps(log_data))
def log_response(self, status_code: int, response_body: Dict[str, Any],
context: Any, duration_ms: int):
"""Log response"""
log_data = {
'event_type': 'request_end',
'service': self.service_name,
'request_id': context.aws_request_id,
'status_code': status_code,
'duration_ms': duration_ms,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.info(json.dumps(log_data))
def log_error(self, error: Exception, event: Dict[str, Any], context: Any):
"""Log error with stack trace"""
log_data = {
'event_type': 'error',
'service': self.service_name,
'request_id': context.aws_request_id,
'error_type': type(error).__name__,
'error_message': str(error),
'stack_trace': traceback.format_exc(),
'timestamp': datetime.utcnow().isoformat()
}
self.logger.error(json.dumps(log_data))
class JsonFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
return json.dumps(log_data)
# Main Lambda function using layers
import json
import time
from datetime import datetime
from db_utils import DatabaseManager
from auth_utils import AuthManager
from logging_utils import StructuredLogger
# Initialize shared components (outside handler for reuse)
db_manager = DatabaseManager(table_name='users')
auth_manager = AuthManager(secret_key='your-secret-key')
logger = StructuredLogger(service_name='user-service')
def lambda_handler(event, context):
"""User registration Lambda function using layers"""
start_time = time.time()
try:
# Log request
logger.log_request(event, context)
# Extract request data
body = json.loads(event.get('body', '{}'))
email = body.get('email')
password = body.get('password')
name = body.get('name')
# Validate input
if not all([email, password, name]):
return {
'statusCode': 400,
'body': json.dumps({'error': 'Missing required fields'})
}
# Check if user already exists
existing_user = db_manager.query_items(
key_condition_expression='email = :email',
expression_attribute_values={':email': email},
index_name='EmailIndex'
)
if existing_user:
return {
'statusCode': 409,
'body': json.dumps({'error': 'User already exists'})
}
# Create user
user_id = f"user_{int(time.time())}"
hashed_password = auth_manager.hash_password(password)
user_data = {
'userId': user_id,
'email': email,
'password': hashed_password,
'name': name,
'createdAt': datetime.utcnow().isoformat(),
'status': 'active'
}
db_manager.put_item(user_data)
# Generate auth token
token = auth_manager.generate_token(user_id)
response_data = {
'userId': user_id,
'email': email,
'name': name,
'token': token
}
# Log response
duration_ms = int((time.time() - start_time) * 1000)
logger.log_response(201, response_data, context, duration_ms)
return {
'statusCode': 201,
'body': json.dumps(response_data)
}
except Exception as e:
# Log error
logger.log_error(e, event, context)
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
### Step 2: Creating a Lambda Function
Here's a simple Lambda function in Python:
```python
import json
def lambda_handler(event, context):
"""
Simple Lambda function that processes incoming events
"""
try:
# Extract data from the event
name = event.get('name', 'World')
# Process the request
response_message = f"Hello, {name}! Your serverless function is working."
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json'
},
'body': json.dumps({
'message': response_message,
'timestamp': context.aws_request_id
})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({
'error': str(e)
})
}
### Step 3: Creating API Gateway Integration
Set up API Gateway to trigger your Lambda function:
1. Create a new REST API in API Gateway
2. Define a resource (e.g., `/hello`)
3. Create a POST method
4. Set up Lambda proxy integration
5. Deploy the API to a stage
### Step 4: Testing the Application
Test your serverless application using the AWS Console or AWS CLI:
```bash
aws lambda invoke \
--function-name my-function \
--payload '{"name": "AWS Serverless"}' \
response.json
Build event-driven applications using EventBridge and Lambda:
import json
import boto3
def lambda_handler(event, context):
"""
Process events from EventBridge
"""
for record in event['Records']:
source = record['source']
detail_type = record['detail-type']
detail = record['detail']
# Route events based on type
if detail_type == 'UserCreated':
handle_user_creation(detail)
elif detail_type == 'OrderPlaced':
handle_order_processing(detail)
return {'status': 'processed'}
def handle_user_creation(user_data):
"""Handle user creation event"""
# Send welcome email
# Create user profile
# Initialize user preferences
pass
def handle_order_processing(order_data):
"""Handle order processing event"""
# Validate inventory
# Process payment
# Update order status
pass
### Durable Functions with Step Functions
Create complex workflows with Step Functions:
```json
{
"Comment": "A state machine that processes orders",
"StartAt": "ValidateOrder",
"States": {
"ValidateOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:ValidateOrder",
"Next": "ProcessPayment"
},
"ProcessPayment": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:ProcessPayment",
"Next": "UpdateInventory"
},
"UpdateInventory": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:UpdateInventory",
"End": true
}
}
}
### API Gateway Custom Authorizers
Implement custom authentication:
```python
import json
import jwt
def lambda_handler(event, context):
"""
Custom authorizer for API Gateway
"""
token = event['authorizationToken']
try:
# Verify JWT token
decoded = jwt.decode(token, 'your-secret-key', algorithms=['HS256'])
# Generate IAM policy
policy = {
'principalId': decoded['user_id'],
'policyDocument': {
'Version': '2012-10-17',
'Statement': [
{
'Action': 'execute-api:Invoke',
'Effect': 'Allow',
'Resource': event['methodArn']
}
]
}
}
return policy
except jwt.InvalidTokenError:
raise Exception('Unauthorized')
## Best Practices for AWS Serverless
### Performance Optimization
1. **Cold Start Mitigation**
- Use provisioned concurrency for critical functions
- Keep functions small and lightweight
- Choose optimal runtime (Node.js/Python for faster cold starts)
2. **Memory Configuration**
- Monitor memory usage with CloudWatch
- Right-size memory allocation (affects CPU and network)
- Use AWS Compute Optimizer for recommendations
3. **Connection Pooling**
- Reuse database connections across invocations
- Initialize clients outside the handler
- Use VPC endpoints for AWS service access
### Security Best Practices
1. **IAM Permissions**
- Follow principle of least privilege
- Use IAM roles for Lambda functions
- Implement resource-based policies where appropriate
2. **Environment Variables**
- Use AWS Secrets Manager for sensitive data
- Encrypt environment variables
- Rotate credentials regularly
3. **VPC Configuration**
- Place functions in private subnets
- Use NAT gateways for internet access
- Implement security groups and NACLs
### Monitoring and Observability
1. **Logging Strategy**
- Use structured logging (JSON format)
- Implement log levels (DEBUG, INFO, WARN, ERROR)
- Centralize logs with CloudWatch Logs
2. **Metrics and Alarms**
- Create custom CloudWatch metrics
- Set up alarms for errors and throttling
- Monitor duration and memory usage
3. **Distributed Tracing**
- Enable AWS X-Ray for request tracing
- Implement custom subsegments
- Use X-Ray service maps for visualization
## Deployment Strategies
### Infrastructure as Code
Use AWS SAM or CDK for infrastructure management:
**AWS SAM Template Example:**
```yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: 'Serverless application'
Resources:
MyFunction:
Type: AWS::Serverless::Function
Properties:
Handler: index.lambda_handler
Runtime: python3.9
CodeUri: ./src
Events:
MyApi:
Type: Api
Properties:
Path: /hello
Method: post
Environment:
Variables:
TABLE_NAME: !Ref MyTable
MyTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: MyDataTable
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
BillingMode: PAY_PER_REQUEST
Implement automated deployment with AWS CodePipeline:
Manage different environments (dev, staging, prod):
Symptoms: High latency on first invocation Solutions:
Symptoms: Functions timing out Solutions:
Symptoms: OutOfMemory errors Solutions:
The serverless landscape continues to evolve with new services and capabilities:
Before you start, ensure you have:
Here’s a simple Lambda function in Python:
import json
def lambda_handler(event, context):
"""
Simple Lambda function that processes incoming events
"""
try:
# Extract data from the event
name = event.get('name', 'World')
# Process the request
response_message = f"Hello, {name}! Your serverless function is working."
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json'
},
'body': json.dumps({
'message': response_message,
'timestamp': context.aws_request_id
})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({
'error': str(e)
})
}
### Durable Functions with Step Functions
Create complex workflows with Step Functions:
```json
{
"Comment": "A state machine that processes orders",
"StartAt": "ValidateOrder",
"States": {
"ValidateOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:ValidateOrder",
"Next": "ProcessPayment"
},
"ProcessPayment": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:ProcessPayment",
"Next": "UpdateInventory"
},
"UpdateInventory": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:UpdateInventory",
"End": true
}
}
}
### Event-Driven Architecture
Build event-driven applications using EventBridge and Lambda:
```python
import json
import boto3
def lambda_handler(event, context):
"""
Process events from EventBridge
"""
for record in event['Records']:
source = record['source']
detail_type = record['detail-type']
detail = record['detail']
# Route events based on type
if detail_type == 'UserCreated':
handle_user_creation(detail)
elif detail_type == 'OrderPlaced':
handle_order_processing(detail)
return {'status': 'processed'}
def handle_user_creation(user_data):
"""Handle user creation event"""
# Send welcome email
# Create user profile
# Initialize user preferences
pass
def handle_order_processing(order_data):
"""Handle order processing event"""
# Validate inventory
# Process payment
# Update order status
pass
### API Gateway Custom Authorizers
Implement custom authentication:
```python
import json
import jwt
def lambda_handler(event, context):
"""
Custom authorizer for API Gateway
"""
token = event['authorizationToken']
try:
# Verify JWT token
decoded = jwt.decode(token, 'your-secret-key', algorithms=['HS256'])
# Generate IAM policy
policy = {
'principalId': decoded['user_id'],
'policyDocument': {
'Version': '2012-10-17',
'Statement': [
{
'Action': 'execute-api:Invoke',
'Effect': 'Allow',
'Resource': event['methodArn']
}
]
}
}
return policy
except jwt.InvalidTokenError:
raise Exception('Unauthorized')
## Performance Optimization Techniques
### Cold Start Mitigation Strategies
<div class="cold-start-diagram">
<style>
.cold-start-diagram {
display: flex;
justify-content: space-around;
align-items: center;
padding: 20px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: 15px;
margin: 20px 0;
flex-wrap: wrap;
}
.strategy {
background: rgba(255,255,255,0.1);
border-radius: 10px;
padding: 15px;
margin: 10px;
min-width: 200px;
text-align: center;
color: white;
border: 2px solid rgba(255,255,255,0.3);
transition: all 0.3s ease;
}
.strategy:hover {
transform: translateY(-5px);
background: rgba(255,255,255,0.2);
}
.strategy-title {
font-weight: bold;
margin-bottom: 10px;
font-size: 1.1em;
}
.strategy-desc {
font-size: 0.9em;
}
</style>
<div class="cold-start-diagram">
<div class="strategy">
<div class="strategy-title">Provisioned Concurrency</div>
<div class="strategy-desc">Keep functions warm and ready</div>
</div>
<div class="strategy">
<div class="strategy-title">Optimize Runtime</div>
<div class="strategy-desc">Choose faster runtimes like Node.js</div>
</div>
<div class="strategy">
<div class="strategy-title">Reduce Package Size</div>
<div class="strategy-desc">Minimize dependencies and code</div>
</div>
<div class="strategy">
<div class="strategy-title">Connection Reuse</div>
<div class="strategy-desc">Initialize clients outside handler</div>
</div>
</div>
</div>
**Implementation Example:**
```python
# Optimized Lambda function for cold start mitigation
import json
import boto3
import time
from datetime import datetime
# Initialize outside handler (executed once per container)
s3_client = None
dynamodb_resource = None
connection_pool = {}
def initialize_clients():
"""Initialize AWS clients once"""
global s3_client, dynamodb_resource
if s3_client is None:
s3_client = boto3.client('s3')
if dynamodb_resource is None:
dynamodb_resource = boto3.resource('dynamodb')
# Initialize database connection pool
if 'db_connection' not in connection_pool:
connection_pool['db_connection'] = create_db_connection()
def create_db_connection():
"""Create database connection"""
# Your database connection logic
return "database_connection_object"
def lambda_handler(event, context):
"""Optimized Lambda handler"""
start_time = time.time()
try:
# Initialize clients (only on cold start)
initialize_clients()
# Fast path - reuse existing connections
db_conn = connection_pool['db_connection']
# Process request
result = process_request_with_db(event, db_conn)
# Log performance metrics
duration = (time.time() - start_time) * 1000
log_performance_metrics(duration, context)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Success',
'data': result,
'processing_time_ms': duration
})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def log_performance_metrics(duration, context):
"""Log performance metrics to CloudWatch"""
print(json.dumps({
'metric_type': 'performance',
'duration_ms': duration,
'function_name': context.function_name,
'request_id': context.aws_request_id,
'timestamp': datetime.utcnow().isoformat()
}))
### Memory and Performance Tuning
```python
# Performance monitoring and auto-tuning
import json
import time
import psutil
import os
from datetime import datetime
class PerformanceMonitor:
def __init__(self):
self.start_time = None
self.memory_usage = []
self.cpu_usage = []
def start_monitoring(self):
"""Start performance monitoring"""
self.start_time = time.time()
self.memory_usage = []
self.cpu_usage = []
def record_metrics(self):
"""Record current performance metrics"""
if self.start_time:
# Memory usage (percentage)
memory_percent = psutil.virtual_memory().percent
self.memory_usage.append(memory_percent)
# CPU usage (percentage)
cpu_percent = psutil.cpu_percent()
self.cpu_usage.append(cpu_percent)
def get_summary(self):
"""Get performance summary"""
if not self.start_time:
return {}
duration = time.time() - self.start_time
return {
'duration_seconds': duration,
'avg_memory_usage': sum(self.memory_usage) / len(self.memory_usage) if self.memory_usage else 0,
'max_memory_usage': max(self.memory_usage) if self.memory_usage else 0,
'avg_cpu_usage': sum(self.cpu_usage) / len(self.cpu_usage) if self.cpu_usage else 0,
'max_cpu_usage': max(self.cpu_usage) if self.cpu_usage else 0,
'memory_samples': len(self.memory_usage),
'cpu_samples': len(self.cpu_usage)
}
# Global performance monitor
monitor = PerformanceMonitor()
def lambda_handler(event, context):
"""Lambda function with performance monitoring"""
monitor.start_monitoring()
try:
# Record metrics during processing
monitor.record_metrics()
# Your business logic here
result = process_business_logic(event)
# Record final metrics
monitor.record_metrics()
# Get performance summary
perf_summary = monitor.get_summary()
# Log to CloudWatch
print(json.dumps({
'event_type': 'performance_summary',
'function_name': context.function_name,
'request_id': context.aws_request_id,
'metrics': perf_summary,
'timestamp': datetime.utcnow().isoformat()
}))
return {
'statusCode': 200,
'body': json.dumps({
'result': result,
'performance': perf_summary
})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def process_business_logic(event):
"""Simulate business logic processing"""
time.sleep(0.1) # Simulate work
return {'status': 'processed', 'data': event}
## Security Best Practices
### Implementing Zero Trust Architecture
<div class="security-diagram">
<style>
.security-diagram {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 15px;
padding: 20px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: 15px;
margin: 20px 0;
}
.security-layer {
background: rgba(255,255,255,0.1);
border-radius: 10px;
padding: 15px;
border: 2px solid rgba(255,255,255,0.3);
color: white;
}
.layer-title {
font-weight: bold;
margin-bottom: 10px;
font-size: 1.1em;
}
.security-items {
font-size: 0.9em;
}
.security-item {
margin: 5px 0;
padding-left: 15px;
position: relative;
}
.security-item:before {
content: "🔒";
position: absolute;
left: 0;
}
</style>
<div class="security-diagram">
<div class="security-layer">
<div class="layer-title">Identity & Access</div>
<div class="security-items">
<div class="security-item">IAM Roles with Least Privilege</div>
<div class="security-item">Cognito User Pools</div>
<div class="security-item">JWT Token Validation</div>
<div class="security-item">Multi-Factor Authentication</div>
</div>
</div>
<div class="security-layer">
<div class="layer-title">Network Security</div>
<div class="security-items">
<div class="security-item">VPC Endpoints</div>
<div class="security-item">Security Groups</div>
<div class="security-item">WAF Integration</div>
<div class="security-item">DDoS Protection</div>
</div>
</div>
<div class="security-layer">
<div class="layer-title">Data Protection</div>
<div class="security-items">
<div class="security-item">Encryption at Rest</div>
<div class="security-item">Encryption in Transit</div>
<div class="security-item">Secrets Manager</div>
<div class="security-item">Data Masking</div>
</div>
</div>
<div class="security-layer">
<div class="layer-title">Monitoring & Compliance</div>
<div class="security-items">
<div class="security-item">CloudTrail Logging</div>
<div class="security-item">GuardDuty</div>
<div class="security-item">Security Hub</div>
<div class="security-item">Audit Logs</div>
</div>
</div>
</div>
</div>
**Security Implementation Example:**
```python
# Secure Lambda function with comprehensive security measures
import json
import boto3
import jwt
import hashlib
import os
from datetime import datetime, timedelta
from cryptography.fernet import Fernet
class SecurityManager:
def __init__(self):
self.secrets_client = boto3.client('secretsmanager')
self.kms_client = boto3.client('kms')
self.encryption_key = None
def get_secret(self, secret_name):
"""Retrieve secret from AWS Secrets Manager"""
try:
response = self.secrets_client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
except Exception as e:
raise Exception(f"Failed to retrieve secret: {e}")
def get_encryption_key(self):
"""Get or create encryption key"""
if self.encryption_key is None:
secret = self.get_secret('application-encryption-key')
self.encryption_key = secret['encryption_key'].encode()
return self.encryption_key
def encrypt_data(self, data):
"""Encrypt sensitive data"""
key = self.get_encryption_key()
f = Fernet(key)
return f.encrypt(data.encode()).decode()
def decrypt_data(self, encrypted_data):
"""Decrypt sensitive data"""
key = self.get_encryption_key()
f = Fernet(key)
return f.decrypt(encrypted_data.encode()).decode()
def validate_jwt_token(self, token, secret_name='jwt-secret'):
"""Validate JWT token"""
try:
secret = self.get_secret(secret_name)
payload = jwt.decode(token, secret['secret'], algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
raise Exception('Token has expired')
except jwt.InvalidTokenError:
raise Exception('Invalid token')
def hash_sensitive_data(self, data):
"""Hash sensitive data for storage"""
return hashlib.sha256(data.encode()).hexdigest()
# Global security manager
security_manager = SecurityManager()
def lambda_handler(event, context):
"""Secure Lambda function with comprehensive security"""
try:
# Extract and validate authorization header
auth_header = event.get('headers', {}).get('Authorization', '')
if not auth_header.startswith('Bearer '):
return {
'statusCode': 401,
'body': json.dumps({'error': 'Missing or invalid authorization header'})
}
token = auth_header.split(' ')[1]
# Validate JWT token
try:
user_payload = security_manager.validate_jwt_token(token)
user_id = user_payload['user_id']
except Exception as e:
return {
'statusCode': 401,
'body': json.dumps({'error': 'Invalid token', 'detail': str(e)})
}
# Log security event
log_security_event('token_validated', user_id, context.aws_request_id)
# Process request with security measures
body = json.loads(event.get('body', '{}'))
# Encrypt sensitive data before processing
if 'sensitive_data' in body:
encrypted_data = security_manager.encrypt_data(body['sensitive_data'])
body['encrypted_sensitive_data'] = encrypted_data
del body['sensitive_data']
# Your business logic here
result = process_secure_request(body, user_id)
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block'
},
'body': json.dumps({
'message': 'Success',
'data': result,
'request_id': context.aws_request_id
})
}
except Exception as e:
# Log security incident
log_security_event('security_error', 'unknown', context.aws_request_id, str(e))
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
def process_secure_request(data, user_id):
"""Process request with security measures"""
# Implement your secure business logic
return {
'user_id': user_id,
'processed_at': datetime.utcnow().isoformat(),
'data_hash': security_manager.hash_sensitive_data(str(data))
}
def log_security_event(event_type, user_id, request_id, details=None):
"""Log security events to CloudWatch"""
log_data = {
'event_type': 'security',
'security_event': event_type,
'user_id': user_id,
'request_id': request_id,
'timestamp': datetime.utcnow().isoformat()
}
if details:
log_data['details'] = details
print(json.dumps(log_data))
## Monitoring and Observability
### Comprehensive Monitoring Setup
<div class="monitoring-diagram">
<style>
.monitoring-diagram {
display: flex;
flex-direction: column;
align-items: center;
padding: 20px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: 15px;
margin: 20px 0;
}
.monitoring-layers {
display: flex;
justify-content: space-around;
width: 100%;
flex-wrap: wrap;
margin: 10px 0;
}
.monitoring-layer {
background: rgba(255,255,255,0.1);
border-radius: 10px;
padding: 15px;
margin: 5px;
min-width: 150px;
text-align: center;
color: white;
border: 2px solid rgba(255,255,255,0.3);
}
.layer-name {
font-weight: bold;
margin-bottom: 8px;
}
.services {
font-size: 0.8em;
}
.service {
background: rgba(255,255,255,0.2);
padding: 2px 6px;
border-radius: 10px;
margin: 2px;
display: inline-block;
}
.arrow {
font-size: 20px;
color: white;
margin: 5px 0;
}
</style>
<div class="monitoring-diagram">
<div class="monitoring-layers">
<div class="monitoring-layer">
<div class="layer-name">Application Metrics</div>
<div class="services">
<div class="service">Custom Metrics</div>
<div class="service">Business KPIs</div>
<div class="service">User Behavior</div>
</div>
</div>
<div class="monitoring-layer">
<div class="layer-name">Infrastructure</div>
<div class="services">
<div class="service">CloudWatch</div>
<div class="service">X-Ray</div>
<div class="service">VPC Flow Logs</div>
</div>
</div>
<div class="monitoring-layer">
<div class="layer-name">Security</div>
<div class="services">
<div class="service">GuardDuty</div>
<div class="service">CloudTrail</div>
<div class="service">Security Hub</div>
</div>
</div>
<div class="monitoring-layer">
<div class="layer-name">Cost</div>
<div class="services">
<div class="service">Cost Explorer</div>
<div class="service">Budgets</div>
<div class="service">Usage Metrics</div>
</div>
</div>
</div>
<div class="arrow">⬇</div>
<div class="monitoring-layer" style="width: 80%; background: linear-gradient(90deg, #FF6B6B, #4ECDC4);">
<div class="layer-name">Centralized Dashboard & Alerting</div>
<div class="services">
<div class="service">CloudWatch Dashboard</div>
<div class="service">SNS Alerts</div>
<div class="service">PagerDuty Integration</div>
</div>
</div>
</div>
</div>
**Advanced Monitoring Implementation:**
```python
# Comprehensive monitoring and observability
import json
import time
import boto3
import psutil
import traceback
from datetime import datetime
from contextlib import contextmanager
from typing import Dict, Any, List
class ObservabilityManager:
def __init__(self, service_name: str):
self.service_name = service_name
self.cloudwatch = boto3.client('cloudwatch')
self.xray = boto3.client('xray')
self.metrics_buffer = []
self.start_time = None
@contextmanager
def trace_operation(self, operation_name: str):
"""Context manager for tracing operations"""
start_time = time.time()
try:
yield
duration = (time.time() - start_time) * 1000
self.record_metric(f'{operation_name}_duration_ms', duration, 'Milliseconds')
except Exception as e:
duration = (time.time() - start_time) * 1000
self.record_metric(f'{operation_name}_error_duration_ms', duration, 'Milliseconds')
self.record_metric(f'{operation_name}_errors', 1, 'Count')
raise
def record_metric(self, metric_name: str, value: float, unit: str = 'Count'):
"""Record custom metric"""
self.metrics_buffer.append({
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow(),
'Dimensions': [
{
'Name': 'ServiceName',
'Value': self.service_name
}
]
})
# Flush buffer if it gets too large
if len(self.metrics_buffer) >= 20:
self.flush_metrics()
def flush_metrics(self):
"""Flush metrics to CloudWatch"""
if self.metrics_buffer:
try:
self.cloudwatch.put_metric_data(
Namespace='ServerlessApplications',
MetricData=self.metrics_buffer
)
self.metrics_buffer.clear()
except Exception as e:
print(f"Failed to flush metrics: {e}")
def log_structured_event(self, event_type: str, data: Dict[str, Any], level: str = 'INFO'):
"""Log structured event"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': level,
'service': self.service_name,
'event_type': event_type,
**data
}
print(json.dumps(log_entry))
def record_error(self, error: Exception, context: Any, additional_data: Dict[str, Any] = None):
"""Record error with full context"""
error_data = {
'error_type': type(error).__name__,
'error_message': str(error),
'stack_trace': traceback.format_exc(),
'function_name': getattr(context, 'function_name', 'unknown'),
'request_id': getattr(context, 'aws_request_id', 'unknown'),
'memory_limit': getattr(context, 'memory_limit_in_mb', 'unknown')
}
if additional_data:
error_data.update(additional_data)
self.log_structured_event('error', error_data, 'ERROR')
self.record_metric('errors_total', 1, 'Count')
def record_performance_metrics(self, start_time: float, context: Any):
"""Record performance metrics"""
duration = (time.time() - start_time) * 1000
memory_used = getattr(context, 'memory_limit_in_mb', 0)
self.record_metric('invocation_duration_ms', duration, 'Milliseconds')
self.record_metric('memory_used_mb', memory_used, 'Megabytes')
# System metrics
memory_percent = psutil.virtual_memory().percent
cpu_percent = psutil.cpu_percent()
self.record_metric('system_memory_percent', memory_percent, 'Percent')
self.record_metric('system_cpu_percent', cpu_percent, 'Percent')
# Performance thresholds
if duration > 5000: # 5 seconds
self.record_metric('slow_invocations', 1, 'Count')
if memory_percent > 80:
self.record_metric('high_memory_usage', 1, 'Count')
# Global observability manager
observability = ObservabilityManager('serverless-api')
def lambda_handler(event, context):
"""Lambda function with comprehensive observability"""
start_time = time.time()
try:
# Start request tracking
observability.log_structured_event('request_start', {
'request_id': context.aws_request_id,
'http_method': event.get('httpMethod'),
'path': event.get('path'),
'user_agent': event.get('headers', {}).get('User-Agent')
})
with observability.trace_operation('business_logic'):
# Your business logic here
result = process_business_request(event)
# Record success metrics
observability.record_metric('successful_invocations', 1, 'Count')
observability.record_performance_metrics(start_time, context)
# Log completion
observability.log_structured_event('request_complete', {
'request_id': context.aws_request_id,
'status': 'success',
'result_size': len(str(result))
})
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Success',
'data': result,
'request_id': context.aws_request_id
})
}
except Exception as e:
# Record error
observability.record_error(e, context, {
'http_method': event.get('httpMethod'),
'path': event.get('path')
})
return {
'statusCode': 500,
'body': json.dumps({
'error': 'Internal server error',
'request_id': context.aws_request_id
})
}
finally:
# Flush any remaining metrics
observability.flush_metrics()
def process_business_request(event):
"""Simulate business logic processing"""
# Simulate some work
time.sleep(0.1)
return {'status': 'processed', 'timestamp': datetime.utcnow().isoformat()}
## Deployment Strategies
### Infrastructure as Code
Use AWS SAM or CDK for infrastructure management:
**AWS SAM Template Example:**
```yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: 'Serverless application'
Resources:
MyFunction:
Type: AWS::Serverless::Function
Properties:
Handler: index.lambda_handler
Runtime: python3.9
CodeUri: ./src
Events:
MyApi:
Type: Api
Properties:
Path: /hello
Method: post
Environment:
Variables:
TABLE_NAME: !Ref MyTable
MyTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: MyDataTable
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
BillingMode: PAY_PER_REQUEST
Implement automated deployment with AWS CodePipeline:
Manage different environments (dev, staging, prod):
Symptoms: High latency on first invocation Solutions:
Symptoms: Functions timing out Solutions:
Symptoms: OutOfMemory errors Solutions:
The serverless landscape continues to evolve with new services and capabilities:
AWS Serverless provides a powerful, flexible platform for building modern applications without the complexity of managing infrastructure. By understanding the core services, implementing best practices, and following proper architecture patterns, you can build scalable, cost-effective applications that focus on delivering business value.
Start small with simple Lambda functions, gradually incorporate more advanced services like Step Functions and EventBridge, and continuously optimize based on real-world usage patterns. The serverless journey is iterative, and each application provides valuable insights for the next.
The future of cloud computing is serverless, and AWS provides the most comprehensive platform for building production-ready applications today.
Published on 2026-03-31 11:23 UTC