Serverless Data Pipelines: Architecture Patterns
Serverless computing eliminates server management and provides automatic scaling with pay-per-use billing. These benefits matter for data pipelines with variable workloads.
This article covers architectural patterns for serverless data pipelines.
Why Serverless for Data Pipelines?
Traditional data pipeline architectures often require provisioning servers or clusters that can handle peak loads, resulting in idle resources during periods of low activity. Serverless computing addresses this inefficiency with several key benefits:
- Automatic scaling: Resources scale up and down based on workload
- Cost efficiency: Pay only for resources used during execution
- Reduced operational overhead: No server management or patching required
- High availability: Built-in redundancy across availability zones
- Event-driven processing: Native support for event-based architectures
Core Serverless Data Pipeline Patterns
Let’s explore several architectural patterns for building serverless data pipelines, with implementation examples using AWS services (though similar patterns apply to other cloud providers).
Pattern 1: Event-Triggered Processing
This pattern uses events to trigger data processing functions, creating a responsive pipeline that processes data as it arrives.
This diagram requires JavaScript.
Enable JavaScript in your browser to use this feature.
AWS Implementation Example:
# AWS CDK code for event-triggered data pipeline
from aws_cdk import (
Stack,
aws_s3 as s3,
aws_lambda as lambda_,
aws_s3_notifications as s3n,
)
from constructs import Construct
class EventTriggeredPipelineStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# Create S3 buckets for input and processed data
input_bucket = s3.Bucket(self, "InputBucket")
processed_bucket = s3.Bucket(self, "ProcessedBucket")
# Create Lambda function for data processing
processor = lambda_.Function(
self, "DataProcessor",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="processor.handler",
code=lambda_.Code.from_asset("lambda/processor"),
environment={
"PROCESSED_BUCKET": processed_bucket.bucket_name
}
)
# Grant permissions
processed_bucket.grant_write(processor)
# Create S3 event notification to trigger Lambda
input_bucket.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3n.LambdaDestination(processor)
)
Lambda function implementation:
# processor.py
import boto3
import os
import json
from datetime import datetime
s3_client = boto3.client('s3')
processed_bucket = os.environ['PROCESSED_BUCKET']
def handler(event, context):
# Extract S3 event details
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
# Get the object
response = s3_client.get_object(Bucket=bucket, Key=key)
data = response['Body'].read().decode('utf-8')
# Process the data (example: JSON transformation)
try:
json_data = json.loads(data)
# Add processing timestamp
json_data['processed_at'] = datetime.now().isoformat()
# Example transformation: calculate totals
if 'items' in json_data:
json_data['total_items'] = sum(item['quantity'] for item in json_data['items'])
json_data['total_value'] = sum(item['quantity'] * item['price'] for item in json_data['items'])
# Write to processed bucket
processed_key = f"processed/{key}"
s3_client.put_object(
Bucket=processed_bucket,
Key=processed_key,
Body=json.dumps(json_data),
ContentType='application/json'
)
return {
'statusCode': 200,
'body': f"Successfully processed {key}"
}
except Exception as e:
print(f"Error processing {key}: {str(e)}")
raise
Pattern 2: Fan-Out Processing
This pattern distributes incoming data to multiple processing functions, enabling parallel processing and specialization of tasks.
This diagram requires JavaScript.
Enable JavaScript in your browser to use this feature.
AWS Implementation Example with EventBridge and SQS:
# AWS CDK code for fan-out pattern
from aws_cdk import (
Stack,
aws_lambda as lambda_,
aws_s3 as s3,
aws_s3_notifications as s3n,
aws_sqs as sqs,
aws_lambda_event_sources as lambda_events,
aws_events as events,
aws_events_targets as targets,
)
from constructs import Construct
class FanOutPipelineStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# Create S3 bucket for input
input_bucket = s3.Bucket(self, "InputBucket")
# Create output bucket
output_bucket = s3.Bucket(self, "OutputBucket")
# Create the router function
router = lambda_.Function(
self, "RouterFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="router.handler",
code=lambda_.Code.from_asset("lambda/router")
)
# Create event bus for routing
bus = events.EventBus(self, "DataPipelineBus")
# Create SQS queues for different data types
type_a_queue = sqs.Queue(self, "TypeAQueue")
type_b_queue = sqs.Queue(self, "TypeBQueue")
type_c_queue = sqs.Queue(self, "TypeCQueue")
# Create processor functions
processor_a = lambda_.Function(
self, "ProcessorA",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="processor_a.handler",
code=lambda_.Code.from_asset("lambda/processor_a"),
environment={"OUTPUT_BUCKET": output_bucket.bucket_name}
)
processor_b = lambda_.Function(
self, "ProcessorB",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="processor_b.handler",
code=lambda_.Code.from_asset("lambda/processor_b"),
environment={"OUTPUT_BUCKET": output_bucket.bucket_name}
)
processor_c = lambda_.Function(
self, "ProcessorC",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="processor_c.handler",
code=lambda_.Code.from_asset("lambda/processor_c"),
environment={"OUTPUT_BUCKET": output_bucket.bucket_name}
)
# Connect processors to SQS queues
processor_a.add_event_source(lambda_events.SqsEventSource(type_a_queue))
processor_b.add_event_source(lambda_events.SqsEventSource(type_b_queue))
processor_c.add_event_source(lambda_events.SqsEventSource(type_c_queue))
# Grant permissions to write to output bucket
output_bucket.grant_write(processor_a)
output_bucket.grant_write(processor_b)
output_bucket.grant_write(processor_c)
# Connect S3 events to router
input_bucket.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3n.LambdaDestination(router)
)
# Grant permissions to router
bus.grant_put_events(router)
# Create EventBridge rules to route events to queues
events.Rule(
self, "TypeARule",
event_bus=bus,
event_pattern=events.EventPattern(
detail_type=["data-event"],
detail={"data_type": ["type_a"]}
),
targets=[targets.SqsQueue(type_a_queue)]
)
events.Rule(
self, "TypeBRule",
event_bus=bus,
event_pattern=events.EventPattern(
detail_type=["data-event"],
detail={"data_type": ["type_b"]}
),
targets=[targets.SqsQueue(type_b_queue)]
)
events.Rule(
self, "TypeCRule",
event_bus=bus,
event_pattern=events.EventPattern(
detail_type=["data-event"],
detail={"data_type": ["type_c"]}
),
targets=[targets.SqsQueue(type_c_queue)]
)
Router function implementation:
# router.py
import boto3
import json
import os
s3_client = boto3.client('s3')
events_client = boto3.client('events')
def handler(event, context):
# Process S3 events
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
# Get object content
response = s3_client.get_object(Bucket=bucket, Key=key)
content = response['Body'].read().decode('utf-8')
try:
# Parse content as JSON
data = json.loads(content)
# Determine data type based on content
data_type = None
if 'type' in data:
data_type = data['type']
elif key.startswith('customer_'):
data_type = 'type_a'
elif key.startswith('product_'):
data_type = 'type_b'
else:
data_type = 'type_c'
# Send event to EventBridge
response = events_client.put_events(
Entries=[
{
'Source': 'data-pipeline',
'DetailType': 'data-event',
'Detail': json.dumps({
'bucket': bucket,
'key': key,
'data_type': data_type
}),
'EventBusName': 'DataPipelineBus'
}
]
)
return {
'statusCode': 200,
'body': f"Successfully routed {key} to {data_type} queue"
}
except Exception as e:
print(f"Error routing {key}: {str(e)}")
raise
Pattern 3: Orchestrated Workflows
For complex pipelines with multiple sequential or conditional steps, an orchestration service manages the execution flow.
This diagram requires JavaScript.
Enable JavaScript in your browser to use this feature.
AWS Implementation using Step Functions:
# AWS CDK code for orchestrated workflow
from aws_cdk import (
Stack,
aws_lambda as lambda_,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as tasks,
aws_s3 as s3,
aws_s3_notifications as s3n,
)
from constructs import Construct
class OrchestratedPipelineStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# Create S3 buckets
input_bucket = s3.Bucket(self, "InputBucket")
processed_bucket = s3.Bucket(self, "ProcessedBucket")
# Create Lambda functions for pipeline steps
extract_fn = lambda_.Function(
self, "ExtractFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="extract.handler",
code=lambda_.Code.from_asset("lambda/extract"),
environment={"BUCKET_NAME": input_bucket.bucket_name}
)
transform_fn = lambda_.Function(
self, "TransformFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="transform.handler",
code=lambda_.Code.from_asset("lambda/transform")
)
load_fn = lambda_.Function(
self, "LoadFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="load.handler",
code=lambda_.Code.from_asset("lambda/load"),
environment={"OUTPUT_BUCKET": processed_bucket.bucket_name}
)
notify_fn = lambda_.Function(
self, "NotifyFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="notify.handler",
code=lambda_.Code.from_asset("lambda/notify")
)
# Grant necessary permissions
input_bucket.grant_read(extract_fn)
processed_bucket.grant_write(load_fn)
# Define Step Functions tasks
extract_task = tasks.LambdaInvoke(
self, "Extract",
lambda_function=extract_fn,
output_path="$.Payload"
)
transform_task = tasks.LambdaInvoke(
self, "Transform",
lambda_function=transform_fn,
output_path="$.Payload"
)
load_task = tasks.LambdaInvoke(
self, "Load",
lambda_function=load_fn,
output_path="$.Payload"
)
notify_task = tasks.LambdaInvoke(
self, "Notify",
lambda_function=notify_fn
)
# Define error handling
fail_state = sfn.Fail(
self, "ProcessingFailed",
cause="Data Processing Failed",
error="DataPipelineError"
)
# Define workflow
definition = extract_task \
.next(transform_task) \
.next(load_task) \
.next(notify_task) \
.catch(fail_state, errors=["States.ALL"])
# Create state machine
state_machine = sfn.StateMachine(
self, "DataPipelineStateMachine",
definition=definition
)
# Create trigger Lambda
trigger_fn = lambda_.Function(
self, "TriggerFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="trigger.handler",
code=lambda_.Code.from_asset("lambda/trigger"),
environment={"STATE_MACHINE_ARN": state_machine.state_machine_arn}
)
# Grant permission to start execution
state_machine.grant_start_execution(trigger_fn)
# Add S3 event notification
input_bucket.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3n.LambdaDestination(trigger_fn)
)
Step Functions State Machine Definition (JSON):
{
"Comment": "Data Processing Pipeline Workflow",
"StartAt": "Extract",
"States": {
"Extract": {
"Type": "Task",
"Resource": "${ExtractFunctionArn}",
"Next": "Transform",
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "ProcessingFailed"
}
]
},
"Transform": {
"Type": "Task",
"Resource": "${TransformFunctionArn}",
"Next": "Load",
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "ProcessingFailed"
}
]
},
"Load": {
"Type": "Task",
"Resource": "${LoadFunctionArn}",
"Next": "Notify",
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "ProcessingFailed"
}
]
},
"Notify": {
"Type": "Task",
"Resource": "${NotifyFunctionArn}",
"End": true
},
"ProcessingFailed": {
"Type": "Fail",
"Cause": "Data Processing Failed",
"Error": "DataPipelineError"
}
}
}
Pattern 4: Stream Processing
This pattern is ideal for continuous data processing, using stream-based services to process data in near real-time.
This diagram requires JavaScript.
Enable JavaScript in your browser to use this feature.
AWS Implementation with Kinesis and Lambda:
# AWS CDK code for stream processing
from aws_cdk import (
Stack,
aws_kinesis as kinesis,
aws_lambda as lambda_,
aws_lambda_event_sources as lambda_events,
aws_dynamodb as dynamodb,
)
from constructs import Construct
class StreamProcessingStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# Create Kinesis data stream
data_stream = kinesis.Stream(
self, "DataStream",
shard_count=1
)
# Create DynamoDB table for processed results
results_table = dynamodb.Table(
self, "ResultsTable",
partition_key=dynamodb.Attribute(
name="id",
type=dynamodb.AttributeType.STRING
),
stream=dynamodb.StreamViewType.NEW_IMAGE
)
# Create stream processor Lambda
processor = lambda_.Function(
self, "StreamProcessor",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="processor.handler",
code=lambda_.Code.from_asset("lambda/stream_processor"),
environment={
"RESULTS_TABLE": results_table.table_name
}
)
# Add Kinesis as event source
processor.add_event_source(
lambda_events.KinesisEventSource(
data_stream,
starting_position=lambda_.StartingPosition.LATEST,
batch_size=100,
max_batching_window=core.Duration.seconds(60)
)
)
# Grant permissions
results_table.grant_write_data(processor)
Stream processor Lambda implementation:
# processor.py
import boto3
import base64
import json
import os
from datetime import datetime
# Initialize DynamoDB client
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['RESULTS_TABLE'])
def handler(event, context):
records_processed = 0
for record in event['Records']:
# Decode and parse record data
payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
try:
data = json.loads(payload)
# Process data (example: calculate anomaly score)
process_data(data)
records_processed += 1
except Exception as e:
print(f"Error processing record: {str(e)}")
print(f"Processed {records_processed} records")
return {
'statusCode': 200,
'body': json.dumps(f'Processed {records_processed} records')
}
def process_data(data):
# Add processing timestamp
data['processed_at'] = datetime.now().isoformat()
# Example: Detect anomalies
if 'value' in data:
# Simple threshold-based anomaly detection
anomaly_score = 0
if 'threshold' in data:
threshold = data['threshold']
value = data['value']
if value > threshold * 1.5:
anomaly_score = (value / threshold) - 1
data['anomaly'] = True
else:
data['anomaly'] = False
data['anomaly_score'] = anomaly_score
# Store in DynamoDB
item_id = data.get('id', str(datetime.now().timestamp()))
table.put_item(
Item={
'id': item_id,
'timestamp': data.get('timestamp', datetime.now().isoformat()),
'data': data,
'anomaly_score': data.get('anomaly_score', 0),
'processed_at': data['processed_at']
}
)
Pattern 5: Periodic Batch Processing
For scheduled data processing tasks, this pattern uses serverless functions triggered by time-based events.
This diagram requires JavaScript.
Enable JavaScript in your browser to use this feature.
AWS Implementation with EventBridge Scheduler:
# AWS CDK code for periodic batch processing
from aws_cdk import (
Stack,
aws_events as events,
aws_events_targets as targets,
aws_lambda as lambda_,
aws_s3 as s3,
aws_glue as glue,
)
from constructs import Construct
class PeriodicBatchStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# Create S3 buckets
source_bucket = s3.Bucket(self, "SourceBucket")
destination_bucket = s3.Bucket(self, "DestinationBucket")
# Create Glue database and table
database = glue.Database(self, "BatchDatabase")
# Create batch processor Lambda
batch_processor = lambda_.Function(
self, "BatchProcessor",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="batch.handler",
code=lambda_.Code.from_asset("lambda/batch_processor"),
environment={
"SOURCE_BUCKET": source_bucket.bucket_name,
"DESTINATION_BUCKET": destination_bucket.bucket_name
},
timeout=Duration.minutes(15) # Longer timeout for batch jobs
)
# Grant permissions
source_bucket.grant_read(batch_processor)
destination_bucket.grant_write(batch_processor)
# Create EventBridge rule for daily execution
daily_rule = events.Rule(
self, "DailyProcessingRule",
schedule=events.Schedule.cron(
minute="0",
hour="1", # 1 AM UTC
day="*",
month="*",
year="*"
)
)
# Add Lambda as target
daily_rule.add_target(targets.LambdaFunction(batch_processor))
Batch processor Lambda implementation:
# batch.py
import boto3
import os
import json
from datetime import datetime, timedelta
s3_client = boto3.client('s3')
SOURCE_BUCKET = os.environ['SOURCE_BUCKET']
DESTINATION_BUCKET = os.environ['DESTINATION_BUCKET']
def handler(event, context):
# Get yesterday's date (for daily batch processing)
yesterday = datetime.now() - timedelta(days=1)
date_prefix = yesterday.strftime('%Y/%m/%d/')
# List objects with yesterday's prefix
response = s3_client.list_objects_v2(
Bucket=SOURCE_BUCKET,
Prefix=date_prefix
)
processed_count = 0
if 'Contents' in response:
for obj in response['Contents']:
key = obj['Key']
# Process each object
process_object(key)
processed_count += 1
return {
'statusCode': 200,
'body': json.dumps(f'Processed {processed_count} objects for {date_prefix}')
}
def process_object(key):
# Get object
response = s3_client.get_object(
Bucket=SOURCE_BUCKET,
Key=key
)
content = response['Body'].read().decode('utf-8')
# Process the data (example: aggregate daily metrics)
try:
# Parse as JSON
data = json.loads(content)
# Example aggregation: sum values by category
aggregated = {}
if 'records' in data:
for record in data['records']:
category = record.get('category', 'unknown')
value = record.get('value', 0)
if category not in aggregated:
aggregated[category] = 0
aggregated[category] += value
# Create summary with totals
summary = {
'date': key.split('/')[0] + '-' + key.split('/')[1] + '-' + key.split('/')[2],
'total_records': len(data.get('records', [])),
'aggregated_values': aggregated,
'total_value': sum(aggregated.values()),
'processed_at': datetime.now().isoformat()
}
# Write to destination bucket
dest_key = f"processed/{key.split('/')[-1]}"
s3_client.put_object(
Bucket=DESTINATION_BUCKET,
Key=dest_key,
Body=json.dumps(summary),
ContentType='application/json'
)
print(f"Successfully processed {key} to {dest_key}")
except Exception as e:
print(f"Error processing {key}: {str(e)}")
raise
Best Practices for Serverless Data Pipelines
To build robust serverless data pipelines, follow these best practices:
1. Design for Idempotency
Ensure that processing functions can handle duplicate events without side effects:
# Idempotent Lambda function example
import boto3
import json
import hashlib
import os
dynamodb = boto3.resource('dynamodb')
processed_table = dynamodb.Table(os.environ['PROCESSED_ITEMS_TABLE'])
def handler(event, context):
for record in event['Records']:
# Generate a unique ID for this record
record_body = record['body']
record_id = hashlib.md5(record_body.encode()).hexdigest()
# Check if we've processed this record before
response = processed_table.get_item(
Key={'record_id': record_id}
)
if 'Item' in response:
print(f"Record {record_id} already processed, skipping")
continue
# Process the record
process_record(record_body)
# Mark as processed
processed_table.put_item(
Item={
'record_id': record_id,
'processed_at': datetime.now().isoformat()
}
)
2. Implement Dead Letter Queues (DLQs)
Use DLQs to capture and handle failed processing attempts:
# AWS CDK code for DLQ configuration
processing_queue = sqs.Queue(self, "ProcessingQueue")
# Create Dead Letter Queue
dlq = sqs.Queue(self, "DeadLetterQueue")
# Configure source queue to use DLQ
source_queue = sqs.Queue(
self, "SourceQueue",
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3, # After 3 failed attempts
queue=dlq
)
)
# Create Lambda for DLQ processing
dlq_processor = lambda_.Function(
self, "DLQProcessor",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="dlq_handler.handler",
code=lambda_.Code.from_asset("lambda/dlq_handler")
)
# Connect DLQ processor
dlq_processor.add_event_source(lambda_events.SqsEventSource(dlq))
3. Implement Proper Error Handling
Handle errors gracefully in your Lambda functions:
def handler(event, context):
try:
# Main processing logic
result = process_data(event)
return {
'statusCode': 200,
'body': json.dumps(result)
}
except ValueError as e:
# Handle validation errors
print(f"Validation error: {str(e)}")
return {
'statusCode': 400,
'body': json.dumps({'error': 'Validation error', 'message': str(e)})
}
except Exception as e:
# Log unexpected errors
print(f"Unexpected error: {str(e)}")
# Re-raise to trigger retry or DLQ mechanisms
raise
4. Use Appropriate Timeout and Memory Settings
Configure Lambda functions with appropriate timeouts and memory based on their workload:
# Memory-intensive processor
transform_fn = lambda_.Function(
self, "TransformFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="transform.handler",
code=lambda_.Code.from_asset("lambda/transform"),
memory_size=1024, # 1 GB for memory-intensive tasks
timeout=Duration.seconds(60)
)
# Quick processor with lower memory needs
notification_fn = lambda_.Function(
self, "NotificationFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="notify.handler",
code=lambda_.Code.from_asset("lambda/notify"),
memory_size=128, # 128 MB is sufficient for simple tasks
timeout=Duration.seconds(10)
)
5. Monitoring and Observability
Implement comprehensive monitoring with CloudWatch and X-Ray:
# AWS CDK code for monitoring configuration
processor_lambda = lambda_.Function(
self, "ProcessorFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="processor.handler",
code=lambda_.Code.from_asset("lambda/processor"),
tracing=lambda_.Tracing.ACTIVE, # Enable X-Ray tracing
)
# Create custom CloudWatch Dashboard
dashboard = cloudwatch.Dashboard(
self, "PipelineDashboard",
dashboard_name="ServerlessDataPipeline"
)
# Add Lambda metrics to dashboard
dashboard.add_widgets(
cloudwatch.GraphWidget(
title="Processor Invocations",
left=[processor_lambda.metric_invocations()]
),
cloudwatch.GraphWidget(
title="Processor Errors",
left=[processor_lambda.metric_errors()]
),
cloudwatch.GraphWidget(
title="Processor Duration",
left=[processor_lambda.metric_duration()]
)
)
Decision Rules
Use this checklist for serverless data pipeline decisions:
- If your pipeline has variable workloads, evaluate serverless against provisioned capacity costs
- If you need strict latency guarantees, remember Lambda’s ~1s cold start overhead
- If processing requires >15 minutes, use Step Functions or ECS instead of Lambda
- If you need VPC access, configure Lambda with proper security groups and subnets
- If you process sensitive data, apply serverless-specific encryption and access controls
Serverless adds vendor lock-in. Factor this into your architecture decisions.