Serverless Data Pipelines: Architecture Patterns

Serverless Data Pipelines: Architecture Patterns

Simor Consulting | 05 Jun, 2024 | 08 Mins read

Serverless Data Pipelines: Architecture Patterns

Serverless computing eliminates server management and provides automatic scaling with pay-per-use billing. These benefits matter for data pipelines with variable workloads.

This article covers architectural patterns for serverless data pipelines.

Why Serverless for Data Pipelines?

Traditional data pipeline architectures often require provisioning servers or clusters that can handle peak loads, resulting in idle resources during periods of low activity. Serverless computing addresses this inefficiency with several key benefits:

  1. Automatic scaling: Resources scale up and down based on workload
  2. Cost efficiency: Pay only for resources used during execution
  3. Reduced operational overhead: No server management or patching required
  4. High availability: Built-in redundancy across availability zones
  5. Event-driven processing: Native support for event-based architectures

Core Serverless Data Pipeline Patterns

Let’s explore several architectural patterns for building serverless data pipelines, with implementation examples using AWS services (though similar patterns apply to other cloud providers).

Pattern 1: Event-Triggered Processing

This pattern uses events to trigger data processing functions, creating a responsive pipeline that processes data as it arrives.

This diagram requires JavaScript.

Enable JavaScript in your browser to use this feature.

AWS Implementation Example:

# AWS CDK code for event-triggered data pipeline
from aws_cdk import (
    Stack,
    aws_s3 as s3,
    aws_lambda as lambda_,
    aws_s3_notifications as s3n,
)
from constructs import Construct

class EventTriggeredPipelineStack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        # Create S3 buckets for input and processed data
        input_bucket = s3.Bucket(self, "InputBucket")
        processed_bucket = s3.Bucket(self, "ProcessedBucket")

        # Create Lambda function for data processing
        processor = lambda_.Function(
            self, "DataProcessor",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="processor.handler",
            code=lambda_.Code.from_asset("lambda/processor"),
            environment={
                "PROCESSED_BUCKET": processed_bucket.bucket_name
            }
        )

        # Grant permissions
        processed_bucket.grant_write(processor)

        # Create S3 event notification to trigger Lambda
        input_bucket.add_event_notification(
            s3.EventType.OBJECT_CREATED,
            s3n.LambdaDestination(processor)
        )

Lambda function implementation:

# processor.py
import boto3
import os
import json
from datetime import datetime

s3_client = boto3.client('s3')
processed_bucket = os.environ['PROCESSED_BUCKET']

def handler(event, context):
    # Extract S3 event details
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        # Get the object
        response = s3_client.get_object(Bucket=bucket, Key=key)
        data = response['Body'].read().decode('utf-8')

        # Process the data (example: JSON transformation)
        try:
            json_data = json.loads(data)
            # Add processing timestamp
            json_data['processed_at'] = datetime.now().isoformat()

            # Example transformation: calculate totals
            if 'items' in json_data:
                json_data['total_items'] = sum(item['quantity'] for item in json_data['items'])
                json_data['total_value'] = sum(item['quantity'] * item['price'] for item in json_data['items'])

            # Write to processed bucket
            processed_key = f"processed/{key}"
            s3_client.put_object(
                Bucket=processed_bucket,
                Key=processed_key,
                Body=json.dumps(json_data),
                ContentType='application/json'
            )

            return {
                'statusCode': 200,
                'body': f"Successfully processed {key}"
            }

        except Exception as e:
            print(f"Error processing {key}: {str(e)}")
            raise

Pattern 2: Fan-Out Processing

This pattern distributes incoming data to multiple processing functions, enabling parallel processing and specialization of tasks.

This diagram requires JavaScript.

Enable JavaScript in your browser to use this feature.

AWS Implementation Example with EventBridge and SQS:

# AWS CDK code for fan-out pattern
from aws_cdk import (
    Stack,
    aws_lambda as lambda_,
    aws_s3 as s3,
    aws_s3_notifications as s3n,
    aws_sqs as sqs,
    aws_lambda_event_sources as lambda_events,
    aws_events as events,
    aws_events_targets as targets,
)
from constructs import Construct

class FanOutPipelineStack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        # Create S3 bucket for input
        input_bucket = s3.Bucket(self, "InputBucket")

        # Create output bucket
        output_bucket = s3.Bucket(self, "OutputBucket")

        # Create the router function
        router = lambda_.Function(
            self, "RouterFunction",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="router.handler",
            code=lambda_.Code.from_asset("lambda/router")
        )

        # Create event bus for routing
        bus = events.EventBus(self, "DataPipelineBus")

        # Create SQS queues for different data types
        type_a_queue = sqs.Queue(self, "TypeAQueue")
        type_b_queue = sqs.Queue(self, "TypeBQueue")
        type_c_queue = sqs.Queue(self, "TypeCQueue")

        # Create processor functions
        processor_a = lambda_.Function(
            self, "ProcessorA",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="processor_a.handler",
            code=lambda_.Code.from_asset("lambda/processor_a"),
            environment={"OUTPUT_BUCKET": output_bucket.bucket_name}
        )

        processor_b = lambda_.Function(
            self, "ProcessorB",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="processor_b.handler",
            code=lambda_.Code.from_asset("lambda/processor_b"),
            environment={"OUTPUT_BUCKET": output_bucket.bucket_name}
        )

        processor_c = lambda_.Function(
            self, "ProcessorC",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="processor_c.handler",
            code=lambda_.Code.from_asset("lambda/processor_c"),
            environment={"OUTPUT_BUCKET": output_bucket.bucket_name}
        )

        # Connect processors to SQS queues
        processor_a.add_event_source(lambda_events.SqsEventSource(type_a_queue))
        processor_b.add_event_source(lambda_events.SqsEventSource(type_b_queue))
        processor_c.add_event_source(lambda_events.SqsEventSource(type_c_queue))

        # Grant permissions to write to output bucket
        output_bucket.grant_write(processor_a)
        output_bucket.grant_write(processor_b)
        output_bucket.grant_write(processor_c)

        # Connect S3 events to router
        input_bucket.add_event_notification(
            s3.EventType.OBJECT_CREATED,
            s3n.LambdaDestination(router)
        )

        # Grant permissions to router
        bus.grant_put_events(router)

        # Create EventBridge rules to route events to queues
        events.Rule(
            self, "TypeARule",
            event_bus=bus,
            event_pattern=events.EventPattern(
                detail_type=["data-event"],
                detail={"data_type": ["type_a"]}
            ),
            targets=[targets.SqsQueue(type_a_queue)]
        )

        events.Rule(
            self, "TypeBRule",
            event_bus=bus,
            event_pattern=events.EventPattern(
                detail_type=["data-event"],
                detail={"data_type": ["type_b"]}
            ),
            targets=[targets.SqsQueue(type_b_queue)]
        )

        events.Rule(
            self, "TypeCRule",
            event_bus=bus,
            event_pattern=events.EventPattern(
                detail_type=["data-event"],
                detail={"data_type": ["type_c"]}
            ),
            targets=[targets.SqsQueue(type_c_queue)]
        )

Router function implementation:

# router.py
import boto3
import json
import os

s3_client = boto3.client('s3')
events_client = boto3.client('events')

def handler(event, context):
    # Process S3 events
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        # Get object content
        response = s3_client.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read().decode('utf-8')

        try:
            # Parse content as JSON
            data = json.loads(content)

            # Determine data type based on content
            data_type = None
            if 'type' in data:
                data_type = data['type']
            elif key.startswith('customer_'):
                data_type = 'type_a'
            elif key.startswith('product_'):
                data_type = 'type_b'
            else:
                data_type = 'type_c'

            # Send event to EventBridge
            response = events_client.put_events(
                Entries=[
                    {
                        'Source': 'data-pipeline',
                        'DetailType': 'data-event',
                        'Detail': json.dumps({
                            'bucket': bucket,
                            'key': key,
                            'data_type': data_type
                        }),
                        'EventBusName': 'DataPipelineBus'
                    }
                ]
            )

            return {
                'statusCode': 200,
                'body': f"Successfully routed {key} to {data_type} queue"
            }

        except Exception as e:
            print(f"Error routing {key}: {str(e)}")
            raise

Pattern 3: Orchestrated Workflows

For complex pipelines with multiple sequential or conditional steps, an orchestration service manages the execution flow.

This diagram requires JavaScript.

Enable JavaScript in your browser to use this feature.

AWS Implementation using Step Functions:

# AWS CDK code for orchestrated workflow
from aws_cdk import (
    Stack,
    aws_lambda as lambda_,
    aws_stepfunctions as sfn,
    aws_stepfunctions_tasks as tasks,
    aws_s3 as s3,
    aws_s3_notifications as s3n,
)
from constructs import Construct

class OrchestratedPipelineStack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        # Create S3 buckets
        input_bucket = s3.Bucket(self, "InputBucket")
        processed_bucket = s3.Bucket(self, "ProcessedBucket")

        # Create Lambda functions for pipeline steps
        extract_fn = lambda_.Function(
            self, "ExtractFunction",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="extract.handler",
            code=lambda_.Code.from_asset("lambda/extract"),
            environment={"BUCKET_NAME": input_bucket.bucket_name}
        )

        transform_fn = lambda_.Function(
            self, "TransformFunction",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="transform.handler",
            code=lambda_.Code.from_asset("lambda/transform")
        )

        load_fn = lambda_.Function(
            self, "LoadFunction",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="load.handler",
            code=lambda_.Code.from_asset("lambda/load"),
            environment={"OUTPUT_BUCKET": processed_bucket.bucket_name}
        )

        notify_fn = lambda_.Function(
            self, "NotifyFunction",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="notify.handler",
            code=lambda_.Code.from_asset("lambda/notify")
        )

        # Grant necessary permissions
        input_bucket.grant_read(extract_fn)
        processed_bucket.grant_write(load_fn)

        # Define Step Functions tasks
        extract_task = tasks.LambdaInvoke(
            self, "Extract",
            lambda_function=extract_fn,
            output_path="$.Payload"
        )

        transform_task = tasks.LambdaInvoke(
            self, "Transform",
            lambda_function=transform_fn,
            output_path="$.Payload"
        )

        load_task = tasks.LambdaInvoke(
            self, "Load",
            lambda_function=load_fn,
            output_path="$.Payload"
        )

        notify_task = tasks.LambdaInvoke(
            self, "Notify",
            lambda_function=notify_fn
        )

        # Define error handling
        fail_state = sfn.Fail(
            self, "ProcessingFailed",
            cause="Data Processing Failed",
            error="DataPipelineError"
        )

        # Define workflow
        definition = extract_task \
            .next(transform_task) \
            .next(load_task) \
            .next(notify_task) \
            .catch(fail_state, errors=["States.ALL"])

        # Create state machine
        state_machine = sfn.StateMachine(
            self, "DataPipelineStateMachine",
            definition=definition
        )

        # Create trigger Lambda
        trigger_fn = lambda_.Function(
            self, "TriggerFunction",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="trigger.handler",
            code=lambda_.Code.from_asset("lambda/trigger"),
            environment={"STATE_MACHINE_ARN": state_machine.state_machine_arn}
        )

        # Grant permission to start execution
        state_machine.grant_start_execution(trigger_fn)

        # Add S3 event notification
        input_bucket.add_event_notification(
            s3.EventType.OBJECT_CREATED,
            s3n.LambdaDestination(trigger_fn)
        )

Step Functions State Machine Definition (JSON):

{
  "Comment": "Data Processing Pipeline Workflow",
  "StartAt": "Extract",
  "States": {
    "Extract": {
      "Type": "Task",
      "Resource": "${ExtractFunctionArn}",
      "Next": "Transform",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "ProcessingFailed"
        }
      ]
    },
    "Transform": {
      "Type": "Task",
      "Resource": "${TransformFunctionArn}",
      "Next": "Load",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "ProcessingFailed"
        }
      ]
    },
    "Load": {
      "Type": "Task",
      "Resource": "${LoadFunctionArn}",
      "Next": "Notify",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "ProcessingFailed"
        }
      ]
    },
    "Notify": {
      "Type": "Task",
      "Resource": "${NotifyFunctionArn}",
      "End": true
    },
    "ProcessingFailed": {
      "Type": "Fail",
      "Cause": "Data Processing Failed",
      "Error": "DataPipelineError"
    }
  }
}

Pattern 4: Stream Processing

This pattern is ideal for continuous data processing, using stream-based services to process data in near real-time.

This diagram requires JavaScript.

Enable JavaScript in your browser to use this feature.

AWS Implementation with Kinesis and Lambda:

# AWS CDK code for stream processing
from aws_cdk import (
    Stack,
    aws_kinesis as kinesis,
    aws_lambda as lambda_,
    aws_lambda_event_sources as lambda_events,
    aws_dynamodb as dynamodb,
)
from constructs import Construct

class StreamProcessingStack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        # Create Kinesis data stream
        data_stream = kinesis.Stream(
            self, "DataStream",
            shard_count=1
        )

        # Create DynamoDB table for processed results
        results_table = dynamodb.Table(
            self, "ResultsTable",
            partition_key=dynamodb.Attribute(
                name="id",
                type=dynamodb.AttributeType.STRING
            ),
            stream=dynamodb.StreamViewType.NEW_IMAGE
        )

        # Create stream processor Lambda
        processor = lambda_.Function(
            self, "StreamProcessor",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="processor.handler",
            code=lambda_.Code.from_asset("lambda/stream_processor"),
            environment={
                "RESULTS_TABLE": results_table.table_name
            }
        )

        # Add Kinesis as event source
        processor.add_event_source(
            lambda_events.KinesisEventSource(
                data_stream,
                starting_position=lambda_.StartingPosition.LATEST,
                batch_size=100,
                max_batching_window=core.Duration.seconds(60)
            )
        )

        # Grant permissions
        results_table.grant_write_data(processor)

Stream processor Lambda implementation:

# processor.py
import boto3
import base64
import json
import os
from datetime import datetime

# Initialize DynamoDB client
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['RESULTS_TABLE'])

def handler(event, context):
    records_processed = 0

    for record in event['Records']:
        # Decode and parse record data
        payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
        try:
            data = json.loads(payload)

            # Process data (example: calculate anomaly score)
            process_data(data)

            records_processed += 1

        except Exception as e:
            print(f"Error processing record: {str(e)}")

    print(f"Processed {records_processed} records")
    return {
        'statusCode': 200,
        'body': json.dumps(f'Processed {records_processed} records')
    }

def process_data(data):
    # Add processing timestamp
    data['processed_at'] = datetime.now().isoformat()

    # Example: Detect anomalies
    if 'value' in data:
        # Simple threshold-based anomaly detection
        anomaly_score = 0
        if 'threshold' in data:
            threshold = data['threshold']
            value = data['value']

            if value > threshold * 1.5:
                anomaly_score = (value / threshold) - 1
                data['anomaly'] = True
            else:
                data['anomaly'] = False

        data['anomaly_score'] = anomaly_score

    # Store in DynamoDB
    item_id = data.get('id', str(datetime.now().timestamp()))

    table.put_item(
        Item={
            'id': item_id,
            'timestamp': data.get('timestamp', datetime.now().isoformat()),
            'data': data,
            'anomaly_score': data.get('anomaly_score', 0),
            'processed_at': data['processed_at']
        }
    )

Pattern 5: Periodic Batch Processing

For scheduled data processing tasks, this pattern uses serverless functions triggered by time-based events.

This diagram requires JavaScript.

Enable JavaScript in your browser to use this feature.

AWS Implementation with EventBridge Scheduler:

# AWS CDK code for periodic batch processing
from aws_cdk import (
    Stack,
    aws_events as events,
    aws_events_targets as targets,
    aws_lambda as lambda_,
    aws_s3 as s3,
    aws_glue as glue,
)
from constructs import Construct

class PeriodicBatchStack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        # Create S3 buckets
        source_bucket = s3.Bucket(self, "SourceBucket")
        destination_bucket = s3.Bucket(self, "DestinationBucket")

        # Create Glue database and table
        database = glue.Database(self, "BatchDatabase")

        # Create batch processor Lambda
        batch_processor = lambda_.Function(
            self, "BatchProcessor",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="batch.handler",
            code=lambda_.Code.from_asset("lambda/batch_processor"),
            environment={
                "SOURCE_BUCKET": source_bucket.bucket_name,
                "DESTINATION_BUCKET": destination_bucket.bucket_name
            },
            timeout=Duration.minutes(15)  # Longer timeout for batch jobs
        )

        # Grant permissions
        source_bucket.grant_read(batch_processor)
        destination_bucket.grant_write(batch_processor)

        # Create EventBridge rule for daily execution
        daily_rule = events.Rule(
            self, "DailyProcessingRule",
            schedule=events.Schedule.cron(
                minute="0",
                hour="1",  # 1 AM UTC
                day="*",
                month="*",
                year="*"
            )
        )

        # Add Lambda as target
        daily_rule.add_target(targets.LambdaFunction(batch_processor))

Batch processor Lambda implementation:

# batch.py
import boto3
import os
import json
from datetime import datetime, timedelta

s3_client = boto3.client('s3')

SOURCE_BUCKET = os.environ['SOURCE_BUCKET']
DESTINATION_BUCKET = os.environ['DESTINATION_BUCKET']

def handler(event, context):
    # Get yesterday's date (for daily batch processing)
    yesterday = datetime.now() - timedelta(days=1)
    date_prefix = yesterday.strftime('%Y/%m/%d/')

    # List objects with yesterday's prefix
    response = s3_client.list_objects_v2(
        Bucket=SOURCE_BUCKET,
        Prefix=date_prefix
    )

    processed_count = 0
    if 'Contents' in response:
        for obj in response['Contents']:
            key = obj['Key']

            # Process each object
            process_object(key)
            processed_count += 1

    return {
        'statusCode': 200,
        'body': json.dumps(f'Processed {processed_count} objects for {date_prefix}')
    }

def process_object(key):
    # Get object
    response = s3_client.get_object(
        Bucket=SOURCE_BUCKET,
        Key=key
    )
    content = response['Body'].read().decode('utf-8')

    # Process the data (example: aggregate daily metrics)
    try:
        # Parse as JSON
        data = json.loads(content)

        # Example aggregation: sum values by category
        aggregated = {}
        if 'records' in data:
            for record in data['records']:
                category = record.get('category', 'unknown')
                value = record.get('value', 0)

                if category not in aggregated:
                    aggregated[category] = 0

                aggregated[category] += value

        # Create summary with totals
        summary = {
            'date': key.split('/')[0] + '-' + key.split('/')[1] + '-' + key.split('/')[2],
            'total_records': len(data.get('records', [])),
            'aggregated_values': aggregated,
            'total_value': sum(aggregated.values()),
            'processed_at': datetime.now().isoformat()
        }

        # Write to destination bucket
        dest_key = f"processed/{key.split('/')[-1]}"
        s3_client.put_object(
            Bucket=DESTINATION_BUCKET,
            Key=dest_key,
            Body=json.dumps(summary),
            ContentType='application/json'
        )

        print(f"Successfully processed {key} to {dest_key}")

    except Exception as e:
        print(f"Error processing {key}: {str(e)}")
        raise

Best Practices for Serverless Data Pipelines

To build robust serverless data pipelines, follow these best practices:

1. Design for Idempotency

Ensure that processing functions can handle duplicate events without side effects:

# Idempotent Lambda function example
import boto3
import json
import hashlib
import os

dynamodb = boto3.resource('dynamodb')
processed_table = dynamodb.Table(os.environ['PROCESSED_ITEMS_TABLE'])

def handler(event, context):
    for record in event['Records']:
        # Generate a unique ID for this record
        record_body = record['body']
        record_id = hashlib.md5(record_body.encode()).hexdigest()

        # Check if we've processed this record before
        response = processed_table.get_item(
            Key={'record_id': record_id}
        )

        if 'Item' in response:
            print(f"Record {record_id} already processed, skipping")
            continue

        # Process the record
        process_record(record_body)

        # Mark as processed
        processed_table.put_item(
            Item={
                'record_id': record_id,
                'processed_at': datetime.now().isoformat()
            }
        )

2. Implement Dead Letter Queues (DLQs)

Use DLQs to capture and handle failed processing attempts:

# AWS CDK code for DLQ configuration
processing_queue = sqs.Queue(self, "ProcessingQueue")

# Create Dead Letter Queue
dlq = sqs.Queue(self, "DeadLetterQueue")

# Configure source queue to use DLQ
source_queue = sqs.Queue(
    self, "SourceQueue",
    dead_letter_queue=sqs.DeadLetterQueue(
        max_receive_count=3,  # After 3 failed attempts
        queue=dlq
    )
)

# Create Lambda for DLQ processing
dlq_processor = lambda_.Function(
    self, "DLQProcessor",
    runtime=lambda_.Runtime.PYTHON_3_9,
    handler="dlq_handler.handler",
    code=lambda_.Code.from_asset("lambda/dlq_handler")
)

# Connect DLQ processor
dlq_processor.add_event_source(lambda_events.SqsEventSource(dlq))

3. Implement Proper Error Handling

Handle errors gracefully in your Lambda functions:

def handler(event, context):
    try:
        # Main processing logic
        result = process_data(event)
        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }
    except ValueError as e:
        # Handle validation errors
        print(f"Validation error: {str(e)}")
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Validation error', 'message': str(e)})
        }
    except Exception as e:
        # Log unexpected errors
        print(f"Unexpected error: {str(e)}")
        # Re-raise to trigger retry or DLQ mechanisms
        raise

4. Use Appropriate Timeout and Memory Settings

Configure Lambda functions with appropriate timeouts and memory based on their workload:

# Memory-intensive processor
transform_fn = lambda_.Function(
    self, "TransformFunction",
    runtime=lambda_.Runtime.PYTHON_3_9,
    handler="transform.handler",
    code=lambda_.Code.from_asset("lambda/transform"),
    memory_size=1024,  # 1 GB for memory-intensive tasks
    timeout=Duration.seconds(60)
)

# Quick processor with lower memory needs
notification_fn = lambda_.Function(
    self, "NotificationFunction",
    runtime=lambda_.Runtime.PYTHON_3_9,
    handler="notify.handler",
    code=lambda_.Code.from_asset("lambda/notify"),
    memory_size=128,  # 128 MB is sufficient for simple tasks
    timeout=Duration.seconds(10)
)

5. Monitoring and Observability

Implement comprehensive monitoring with CloudWatch and X-Ray:

# AWS CDK code for monitoring configuration
processor_lambda = lambda_.Function(
    self, "ProcessorFunction",
    runtime=lambda_.Runtime.PYTHON_3_9,
    handler="processor.handler",
    code=lambda_.Code.from_asset("lambda/processor"),
    tracing=lambda_.Tracing.ACTIVE,  # Enable X-Ray tracing
)

# Create custom CloudWatch Dashboard
dashboard = cloudwatch.Dashboard(
    self, "PipelineDashboard",
    dashboard_name="ServerlessDataPipeline"
)

# Add Lambda metrics to dashboard
dashboard.add_widgets(
    cloudwatch.GraphWidget(
        title="Processor Invocations",
        left=[processor_lambda.metric_invocations()]
    ),
    cloudwatch.GraphWidget(
        title="Processor Errors",
        left=[processor_lambda.metric_errors()]
    ),
    cloudwatch.GraphWidget(
        title="Processor Duration",
        left=[processor_lambda.metric_duration()]
    )
)

Decision Rules

Use this checklist for serverless data pipeline decisions:

  1. If your pipeline has variable workloads, evaluate serverless against provisioned capacity costs
  2. If you need strict latency guarantees, remember Lambda’s ~1s cold start overhead
  3. If processing requires >15 minutes, use Step Functions or ECS instead of Lambda
  4. If you need VPC access, configure Lambda with proper security groups and subnets
  5. If you process sensitive data, apply serverless-specific encryption and access controls

Serverless adds vendor lock-in. Factor this into your architecture decisions.

Ready to Implement These AI Data Engineering Solutions?

Get a comprehensive AI Readiness Assessment to determine the best approach for your organization's data infrastructure and AI implementation needs.

Similar Articles

The Modern Data Stack for AI Readiness: Architecture and Implementation
The Modern Data Stack for AI Readiness: Architecture and Implementation
28 Jan, 2025 | 03 Mins read

Existing data infrastructure often cannot support ML workflows. The modern data stack offers a foundation, but it requires adaptation to become AI-ready. This article covers building a data architectu

The data pipeline that cost $50K/month — and the audit that found why
The data pipeline that cost $50K/month — and the audit that found why
22 Apr, 2026 | 04 Mins read

A financial services firm running analytics on trade settlement data came to us with a specific complaint: their cloud data platform cost had tripled in eighteen months, and nobody could explain why.

Data Lakehouse Security Best Practices
Data Lakehouse Security Best Practices
22 Feb, 2024 | 02 Mins read

Data lakehouses combine lake flexibility with warehouse performance but introduce security challenges from their hybrid nature. Securing these environments requires layered approaches covering authent

Semantic Layer Implementation: Challenges and Solutions
Semantic Layer Implementation: Challenges and Solutions
20 Mar, 2024 | 02 Mins read

A semantic layer provides business-friendly abstraction over technical data structures, enabling self-service analytics and consistent metric interpretation. Implementing one involves technical challe

Event-Driven Data Architecture
Event-Driven Data Architecture
15 Sep, 2024 | 02 Mins read

Event-driven architectures treat changes in state as events that trigger immediate actions and data flows. Rather than processing data in batches or through scheduled jobs, components react to changes

Serverless Machine Learning: Patterns with AWS Lambda, GCP Cloud Run & Azure Functions
Serverless Machine Learning: Patterns with AWS Lambda, GCP Cloud Run & Azure Functions
18 Jul, 2025 | 05 Mins read

A social media analytics company watched their Kubernetes cluster fail to handle traffic spikes from trending topics. The cluster would scale from 50 to 500 pods in minutes, but not fast enough to pre

From Data Silos to Data Mesh: The Evolution of Enterprise Data Architecture
From Data Silos to Data Mesh: The Evolution of Enterprise Data Architecture
15 Feb, 2025 | 03 Mins read

Traditional centralized data architectures worked for BI but struggle with AI workloads. Centralized teams become bottlenecks as data volumes grow. Domain experts who understand the data are separated

Feature Stores for AI: The Missing MLOps Component Reaching Maturity
Feature Stores for AI: The Missing MLOps Component Reaching Maturity
12 Mar, 2026 | 11 Mins read

A recommendation system team built their tenth model. Each model required feature engineering. Each feature engineering project started by copying code from the previous project, then modifying it for