Implementing Data Observability: Beyond Monitoring
Traditional data monitoring checks predefined metrics. Data observability provides comprehensive visibility into health, quality, and behavior across the data lifecycle. It helps teams detect and resolve issues before they impact downstream systems.
This article covers implementing data observability frameworks.
Understanding Data Observability
Data observability adapts concepts from software observability to the data domain, focusing on five critical pillars:
The Five Pillars of Data Observability
- Data Freshness: How up-to-date is your data? When was it last updated? Is it arriving on schedule?
- Data Quality: Is the data accurate, complete, consistent, and valid according to business rules?
- Data Schema: Is the structure of your data consistent? Have there been schema changes or drift?
- Data Lineage: Where did the data originate, and what transformations has it undergone?
- Data Usage: How is data being accessed and utilized across the organization?
Beyond Traditional Monitoring
Traditional data monitoring typically involves predefined thresholds and rules. Observability expands on this by:
- Enabling exploratory analysis of unknown issues
- Providing context through metadata and lineage information
- Supporting root cause analysis through interconnected layers of telemetry
- Focusing on business impact rather than just technical metrics
Building a Data Observability Framework
Implementing effective data observability requires a structured approach spanning people, processes, and technology.
Step 1: Define Data Observability Objectives
Start by clearly articulating what you want to achieve:
# Example Data Observability Objectives Document
## Business Objectives
- Reduce time to detect data quality issues from days to hours
- Decrease incident investigation time by 50%
- Improve data team productivity by 30%
- Minimize business impact from data incidents
## Technical Objectives
- Achieve end-to-end visibility of data flows across all systems
- Automatically detect anomalies in data quality and pipeline performance
- Trace data lineage for all critical data assets
- Monitor schema changes and their impact
Step 2: Map Your Data Ecosystem
Create a comprehensive inventory of your data landscape:
# Example data asset inventory structure
data_assets = [
{
"id": "sales_transactions",
"type": "table",
"source_system": "ERP",
"ingestion_method": "CDC",
"update_frequency": "hourly",
"downstream_dependencies": ["sales_analytics", "financial_reporting"],
"business_criticality": "high",
"data_owners": ["finance_team", "sales_operations"],
"data_domains": ["sales", "finance"],
"pii_classification": "contains_pii",
"freshness_slo": {"max_delay": "3 hours"},
"quality_rules": [
{"rule": "no_null_values", "columns": ["transaction_id", "amount", "customer_id"]},
{"rule": "positive_values", "columns": ["amount"]},
{"rule": "unique_values", "columns": ["transaction_id"]}
]
}
]
This mapping helps prioritize observability efforts and identify critical points for instrumentation.
Step 3: Instrument Your Data Stack
Deploy instrumentation across your data ecosystem to collect the necessary telemetry:
Data Pipeline Instrumentation
# Example: Instrumenting an Apache Airflow DAG for observability
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import json
import time
import uuid
# Import observability libraries
from data_observability import DataObserver, DataQualityCheck
# Create observability context
observer = DataObserver(
service_name="sales_data_pipeline",
environment="production"
)
def extract_sales_data(**context):
# Generate execution ID for tracing
execution_id = str(uuid.uuid4())
context['ti'].xcom_push(key='execution_id', value=execution_id)
# Start observability span
with observer.start_span(operation="extract_sales_data", execution_id=execution_id):
start_time = time.time()
try:
# Record metadata about the operation
observer.record_metadata(
operation="extract_sales_data",
source_system="sales_db",
table="transactions",
row_count_before=0
)
# Actual data extraction logic here
# ...
sales_data = extract_from_source()
rows_extracted = len(sales_data)
# Record successful outcome and metrics
observer.record_metrics(
operation="extract_sales_data",
execution_id=execution_id,
metrics={
"rows_extracted": rows_extracted,
"execution_time_ms": (time.time() - start_time) * 1000
}
)
return sales_data
except Exception as e:
# Record failure with details
observer.record_failure(
operation="extract_sales_data",
execution_id=execution_id,
error_message=str(e),
stack_trace=traceback.format_exc()
)
raise e
def transform_sales_data(sales_data, **context):
execution_id = context['ti'].xcom_pull(key='execution_id')
with observer.start_span(operation="transform_sales_data", execution_id=execution_id):
# Transformation logic here
# ...
# Record schema information
observer.record_schema(
dataset="transformed_sales",
schema={
"transaction_id": "string",
"customer_id": "string",
"product_id": "string",
"amount": "decimal",
"transaction_date": "timestamp"
}
)
# Perform and record data quality checks
quality_results = DataQualityCheck(sales_data).run_checks([
{"check": "completeness", "column": "customer_id", "threshold": 0.99},
{"check": "uniqueness", "column": "transaction_id", "threshold": 1.0},
{"check": "validity", "column": "amount", "condition": "amount > 0"}
])
observer.record_data_quality(
dataset="transformed_sales",
execution_id=execution_id,
quality_results=quality_results
)
return transformed_data
# Define the DAG
dag = DAG(
'sales_data_pipeline',
default_args={
'owner': 'data_engineering',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Sales data ETL pipeline with observability',
schedule_interval=timedelta(hours=1),
start_date=datetime(2023, 1, 1),
catchup=False,
tags=['sales', 'observability'],
)
extract_task = PythonOperator(
task_id='extract_sales_data',
python_callable=extract_sales_data,
provide_context=True,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_sales_data',
python_callable=transform_sales_data,
provide_context=True,
dag=dag,
)
load_task = PythonOperator(
# Similar pattern for the load operation
# ...
)
# Define task dependencies
extract_task >> transform_task >> load_task
Database Instrumentations
# Example: Tracking database schema changes
from sqlalchemy import create_engine, event, Table, MetaData
from data_observability import SchemaObserver
# Initialize schema observer
schema_observer = SchemaObserver(
service="customer_database",
environment="production",
send_to_observability_platform=True
)
# Connect to database
engine = create_engine('postgresql://user:password@localhost:5432/customer_db')
# Set up event listeners
@event.listens_for(Table, 'after_create')
def receive_after_create(target, connection, **kw):
schema_observer.record_schema_change(
table_name=target.name,
schema_name=target.schema,
change_type="table_created",
details={
"columns": [{"name": column.name, "type": str(column.type)}
for column in target.columns]
}
)
@event.listens_for(Table, 'after_drop')
def receive_after_drop(target, connection, **kw):
schema_observer.record_schema_change(
table_name=target.name,
schema_name=target.schema,
change_type="table_dropped"
)
Data Warehouse Instrumentation
For modern data warehouses, you can leverage built-in features or external tools:
-- Snowflake: Setting up query tagging for observability
ALTER SESSION SET QUERY_TAG = 'data_source=crm,data_consumer=marketing_analytics,query_type=dashboard';
-- Creating access history views for usage analytics
CREATE OR REPLACE VIEW data_observability.query_history AS
SELECT
query_id,
session_id,
user_name,
role_name,
warehouse_name,
database_name,
schema_name,
query_text,
start_time,
end_time,
total_elapsed_time,
bytes_scanned,
rows_produced,
query_tag
FROM
snowflake.account_usage.query_history
WHERE
start_time > DATEADD(day, -7, CURRENT_TIMESTAMP());
Step 4: Implement Observability Data Collection
Create a central system to ingest, process, and store observability data:
# Example: Data Observability Collector Service
class DataObservabilityCollector:
def __init__(self, storage_backend, streaming_backend=None):
self.storage = storage_backend # e.g., Elasticsearch, MongoDB, etc.
self.streaming = streaming_backend # e.g., Kafka for real-time alerts
def ingest_pipeline_event(self, event):
"""Ingest pipeline execution events"""
# Add metadata
event['received_timestamp'] = datetime.utcnow().isoformat()
# Process for anomalies
self._check_for_anomalies(event)
# Persist event
self.storage.store_pipeline_event(event)
# Stream for real-time processing if configured
if self.streaming:
self.streaming.publish('pipeline_events', event)
def ingest_data_quality_result(self, result):
"""Ingest data quality check results"""
# Add to time series storage
self.storage.store_quality_result(result)
# Check thresholds and generate alerts
if self._violates_threshold(result):
self._generate_alert(result)
def ingest_schema_change(self, change_event):
"""Record schema changes"""
previous_schema = self.storage.get_latest_schema(
change_event['database'],
change_event['table']
)
# Compare with previous schema
if previous_schema:
changes = self._compute_schema_diff(previous_schema, change_event['new_schema'])
change_event['diff'] = changes
# Assess impact of changes
impacted_assets = self._assess_schema_change_impact(
change_event['database'],
change_event['table'],
changes
)
change_event['impacted_assets'] = impacted_assets
# Store the change
self.storage.store_schema_change(change_event)
# Generate notifications for impactful changes
if change_event.get('impacted_assets'):
self._notify_schema_change(change_event)
def ingest_usage_metrics(self, usage_data):
"""Record data asset usage metrics"""
# Associate usage with data assets
enriched_data = self._enrich_usage_data(usage_data)
# Store for trend analysis
self.storage.store_usage_metrics(enriched_data)
# Update asset popularity scores
self._update_popularity_scores(enriched_data)
def _check_for_anomalies(self, event):
"""Detect anomalies in pipeline events"""
# Implementation using statistical or ML-based anomaly detection
pass
def _violates_threshold(self, quality_result):
"""Check if quality result violates defined thresholds"""
# Implementation of threshold checking
pass
# Other helper methods
# ...
Step 5: Develop Observability Analytics
Extract insights from collected observability data:
# Example: Data Observability Analytics
class DataObservabilityAnalytics:
def __init__(self, storage_backend):
self.storage = storage_backend
def calculate_freshness_metrics(self, dataset_id, time_window="7d"):
"""Calculate data freshness metrics for a dataset"""
events = self.storage.get_pipeline_events(
dataset_id=dataset_id,
event_type="data_loaded",
time_window=time_window
)
if not events:
return {"status": "insufficient_data"}
# Analyze arrival patterns
arrivals = [event['timestamp'] for event in events]
intervals = self._calculate_intervals(arrivals)
return {
"last_arrival": max(arrivals),
"average_interval_hours": statistics.mean(intervals) / 3600,
"std_dev_interval_hours": statistics.stdev(intervals) / 3600 if len(intervals) > 1 else 0,
"expected_next_arrival": self._predict_next_arrival(arrivals),
"trend": self._calculate_trend(intervals)
}
def analyze_data_quality_trends(self, dataset_id, metric, time_window="30d"):
"""Analyze trends in data quality metrics"""
quality_results = self.storage.get_quality_results(
dataset_id=dataset_id,
metric=metric,
time_window=time_window
)
# Extract time series
timestamps = [r['timestamp'] for r in quality_results]
values = [r['value'] for r in quality_results]
# Calculate trend
trend = self._calculate_linear_regression(timestamps, values)
# Detect anomalies
anomalies = self._detect_anomalies_using_iqr(values)
return {
"current_value": values[-1] if values else None,
"min_value": min(values) if values else None,
"max_value": max(values) if values else None,
"trend_coefficient": trend["coefficient"],
"trend_direction": "improving" if trend["coefficient"] > 0 else "deteriorating",
"anomalies": anomalies
}
def analyze_schema_stability(self, dataset_id, time_window="90d"):
"""Analyze schema stability for a dataset"""
schema_changes = self.storage.get_schema_changes(
dataset_id=dataset_id,
time_window=time_window
)
# Count changes by type
change_counts = {}
for change in schema_changes:
for change_type, details in change['diff'].items():
change_counts[change_type] = change_counts.get(change_type, 0) + len(details)
return {
"total_changes": len(schema_changes),
"change_frequency": len(schema_changes) / 90, # changes per day
"change_types": change_counts,
"stability_score": self._calculate_schema_stability_score(schema_changes)
}
def analyze_data_usage(self, dataset_id, time_window="30d"):
"""Analyze how a dataset is being used"""
usage_data = self.storage.get_usage_metrics(
dataset_id=dataset_id,
time_window=time_window
)
# Aggregate by user type
usage_by_user_type = {}
for entry in usage_data:
user_type = entry['user_type']
usage_by_user_type[user_type] = usage_by_user_type.get(user_type, 0) + 1
# Aggregate by application
usage_by_application = {}
for entry in usage_data:
app = entry['application']
usage_by_application[app] = usage_by_application.get(app, 0) + 1
return {
"total_access_count": len(usage_data),
"unique_users": len(set(entry['user_id'] for entry in usage_data)),
"usage_by_user_type": usage_by_user_type,
"usage_by_application": usage_by_application,
"usage_trend": self._calculate_usage_trend(usage_data)
}
# Helper methods
def _calculate_intervals(self, timestamps):
"""Calculate time intervals between events"""
if len(timestamps) <= 1:
return []
sorted_timestamps = sorted(timestamps)
return [(sorted_timestamps[i] - sorted_timestamps[i-1]).total_seconds()
for i in range(1, len(sorted_timestamps))]
def _predict_next_arrival(self, timestamps):
"""Predict when next data should arrive"""
# Implementation of time series forecasting
pass
def _calculate_trend(self, values):
"""Calculate trend from a series of values"""
# Implementation of trend analysis
pass
# Other analytics methods
# ...
Step 6: Build Observability Dashboards and Alerts
Create visualizations and notification systems to make observability data actionable:
# Example: Data Observability Dashboard Configuration
dashboard_config = {
"title": "Data Observability Overview",
"refresh_rate": "5m",
"layout": "grid",
"panels": [
{
"title": "Data Freshness Status",
"type": "status_panel",
"data_source": "freshness_metrics",
"query": """
SELECT
asset_name,
last_updated,
expected_update,
CASE
WHEN NOW() - last_updated > expected_interval * 2 THEN 'critical'
WHEN NOW() - last_updated > expected_interval * 1.5 THEN 'warning'
ELSE 'normal'
END as status
FROM data_assets
ORDER BY status, asset_name
""",
"visualization": {
"type": "table",
"color_by_value": {"column": "status"}
}
},
{
"title": "Data Quality Trends",
"type": "time_series",
"data_source": "quality_metrics",
"query": """
SELECT
time_bucket('1d', timestamp) as day,
asset_name,
quality_dimension,
AVG(score) as avg_score
FROM quality_measurements
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY day, asset_name, quality_dimension
ORDER BY day
""",
"visualization": {
"type": "line_chart",
"x_axis": "day",
"y_axis": "avg_score",
"series": "quality_dimension",
"facet_by": "asset_name"
}
},
{
"title": "Recent Schema Changes",
"type": "event_list",
"data_source": "schema_changes",
"query": """
SELECT
timestamp,
asset_name,
change_type,
details,
impact_score
FROM schema_changes
WHERE timestamp > NOW() - INTERVAL '7 days'
ORDER BY timestamp DESC
LIMIT 10
""",
"visualization": {
"type": "timeline",
"color_by": "impact_score"
}
},
{
"title": "Pipeline Health",
"type": "status_panel",
"data_source": "pipeline_executions",
"query": """
WITH recent_executions AS (
SELECT
pipeline_name,
MAX(execution_time) as last_run,
AVG(duration) as avg_duration,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures,
COUNT(*) as total_runs
FROM pipeline_executions
WHERE execution_time > NOW() - INTERVAL '24 hours'
GROUP BY pipeline_name
)
SELECT
pipeline_name,
last_run,
avg_duration,
failures,
total_runs,
CASE
WHEN failures > 0 THEN 'critical'
WHEN AVG(duration) > expected_duration * 1.5 THEN 'warning'
ELSE 'normal'
END as status
FROM recent_executions
JOIN pipeline_metadata USING (pipeline_name)
ORDER BY status, pipeline_name
""",
"visualization": {
"type": "status_grid",
"color_by": "status"
}
}
],
"filters": [
{"name": "time_range", "type": "time_picker", "default": "last 24 hours"},
{"name": "data_domain", "type": "multi_select", "source": "data_domains"},
{"name": "criticality", "type": "select", "options": ["high", "medium", "low"]}
],
"alerts": [
{
"name": "stale_data_alert",
"description": "Alert when data freshness exceeds threshold",
"query": """
SELECT
asset_name,
NOW() - last_updated as staleness
FROM data_assets
WHERE NOW() - last_updated > expected_interval * 1.5
""",
"threshold": "rows > 0",
"frequency": "10m",
"channels": ["slack", "email"],
"severity": "warning"
},
{
"name": "data_quality_drop_alert",
"description": "Alert on significant quality score drops",
"query": """
WITH current_scores AS (
SELECT
asset_name,
quality_dimension,
AVG(score) as current_score
FROM quality_measurements
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY asset_name, quality_dimension
),
baseline_scores AS (
SELECT
asset_name,
quality_dimension,
AVG(score) as baseline_score
FROM quality_measurements
WHERE timestamp BETWEEN NOW() - INTERVAL '7 days' AND NOW() - INTERVAL '1 hour'
GROUP BY asset_name, quality_dimension
)
SELECT
c.asset_name,
c.quality_dimension,
c.current_score,
b.baseline_score,
(c.current_score - b.baseline_score) / b.baseline_score as change_pct
FROM current_scores c
JOIN baseline_scores b USING (asset_name, quality_dimension)
WHERE (c.current_score - b.baseline_score) / b.baseline_score < -0.10 -- 10% drop
""",
"threshold": "rows > 0",
"frequency": "1h",
"channels": ["slack", "email", "pagerduty"],
"severity": "critical"
}
]
}
Technical Architecture for Data Observability
A robust data observability platform typically consists of these components:
┌─────────────────────────────────────────────────────────────────────┐
│ Data Sources & Instrumentation │
├────────────┬───────────────┬──────────────┬──────────────┬──────────┤
│ Databases │ Data Pipelines│ Data Warehouse│ Data Lake │ APIs │
└────────────┴───────────────┴──────────────┴──────────────┴──────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Collection & Processing │
├────────────┬───────────────┬──────────────┬──────────────┬──────────┤
│ Event Bus │ Streaming │ Batch │ Collectors │ Agents │
│ (Kafka) │ Processing │ Processing │ & Proxies │ & SDKs │
└────────────┴───────────────┴──────────────┴──────────────┴──────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Storage │
├────────────┬───────────────┬──────────────┬──────────────┬──────────┤
│ Time Series│ Document Store│ Graph Database│ Object Store│ Metadata │
│ Database │ (MongoDB) │ (Neo4j) │ (S3) │ Registry │
└────────────┴───────────────┴──────────────┴──────────────┴──────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Analytics & Intelligence │
├────────────┬───────────────┬──────────────┬──────────────┬──────────┤
│ Statistical│ Anomaly │ Impact │ Root Cause │ ML-Based │
│ Analysis │ Detection │ Analysis │ Analysis │ Models │
└────────────┴───────────────┴──────────────┴──────────────┴──────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Presentation & Automation │
├────────────┬───────────────┬──────────────┬──────────────┬──────────┤
│ Dashboards │ Alerts & │ Self-Healing │ APIs & │ Reports │
│ │ Notifications │ Automation │ Integration │ │
└────────────┴───────────────┴──────────────┴──────────────┴──────────┘
Specific Technology Choices
Different organizations may select various technologies based on their existing data stack:
| Component | Open Source Options | Commercial Options |
|---|---|---|
| Collection | Telegraf, Logstash, Fluentd | Datadog Agent, New Relic Agent |
| Event Bus | Apache Kafka, RabbitMQ | AWS Kinesis, Google Pub/Sub |
| Processing | Apache Spark, Apache Flink | Databricks, Dataflow |
| Time Series DB | InfluxDB, Prometheus | Timestream, TimescaleDB |
| Document Store | MongoDB, Elasticsearch | Cosmos DB, Firestore |
| Graph Database | Neo4j, JanusGraph | Neptune, Cosmos Graph API |
| Dashboards | Grafana, Kibana | Datadog, New Relic |
| Alerts | Alertmanager, PagerDuty | Opsgenie, VictorOps |
Implementing Key Use Cases
Let’s explore implementations for critical data observability use cases:
Automated Data Quality Monitoring
# Example: Statistical Profile-based Data Quality Monitoring
import pandas as pd
import numpy as np
from scipy import stats
from datetime import datetime, timedelta
class DataQualityMonitor:
def __init__(self, connection, observability_client):
self.connection = connection
self.observability = observability_client
self.profiles = {} # Storage for baseline profiles
def learn_baseline(self, table_name, column_name, lookback_days=30):
"""Learn baseline statistical profile for a column"""
query = f"""
SELECT {column_name}
FROM {table_name}
WHERE created_at >= DATEADD(day, -{lookback_days}, CURRENT_DATE())
"""
df = pd.read_sql(query, self.connection)
# Handle different data types
if pd.api.types.is_numeric_dtype(df[column_name]):
profile = self._create_numeric_profile(df[column_name])
elif pd.api.types.is_string_dtype(df[column_name]):
profile = self._create_string_profile(df[column_name])
elif pd.api.types.is_datetime64_dtype(df[column_name]):
profile = self._create_datetime_profile(df[column_name])
else:
profile = self._create_generic_profile(df[column_name])
# Store profile
self.profiles[f"{table_name}.{column_name}"] = {
"profile": profile,
"updated_at": datetime.now(),
"sample_size": len(df)
}
return profile
def monitor_new_data(self, table_name, column_name, time_window="1d"):
"""Monitor recent data against baseline profile"""
# Get baseline profile
profile_key = f"{table_name}.{column_name}"
if profile_key not in self.profiles:
raise ValueError(f"No baseline profile for {profile_key}")
baseline = self.profiles[profile_key]
# Get recent data
query = f"""
SELECT {column_name}
FROM {table_name}
WHERE created_at >= DATEADD(day, -1, CURRENT_DATE())
"""
df = pd.read_sql(query, self.connection)
if len(df) == 0:
self.observability.record_event(
event_type="data_quality_check",
status="skipped",
details={
"table": table_name,
"column": column_name,
"reason": "no_data_in_window"
}
)
return None
# Run appropriate comparison based on data type
if pd.api.types.is_numeric_dtype(df[column_name]):
results = self._compare_numeric_data(df[column_name], baseline["profile"])
elif pd.api.types.is_string_dtype(df[column_name]):
results = self._compare_string_data(df[column_name], baseline["profile"])
elif pd.api.types.is_datetime64_dtype(df[column_name]):
results = self._compare_datetime_data(df[column_name], baseline["profile"])
else:
results = self._compare_generic_data(df[column_name], baseline["profile"])
# Add metadata to results
results["table"] = table_name
results["column"] = column_name
results["baseline_sample_size"] = baseline["sample_size"]
results["current_sample_size"] = len(df)
results["timestamp"] = datetime.now().isoformat()
# Record results to observability platform
self.observability.record_data_quality_check(results)
# Generate alerts if necessary
if results["status"] == "anomaly":
self.observability.trigger_alert(
alert_type="data_quality_anomaly",
severity=results["severity"],
details=results
)
return results
def _create_numeric_profile(self, series):
"""Create statistical profile for numeric data"""
# Remove nulls for statistical calculations
clean_series = series.dropna()
profile = {
"data_type": "numeric",
"null_count": series.isna().sum(),
"null_percentage": series.isna().mean() * 100,
"min": clean_series.min(),
"max": clean_series.max(),
"mean": clean_series.mean(),
"median": clean_series.median(),
"std": clean_series.std(),
"skew": clean_series.skew(),
"kurtosis": clean_series.kurtosis(),
"percentiles": {
"p1": np.percentile(clean_series, 1),
"p5": np.percentile(clean_series, 5),
"p25": np.percentile(clean_series, 25),
"p75": np.percentile(clean_series, 75),
"p95": np.percentile(clean_series, 95),
"p99": np.percentile(clean_series, 99),
},
"histogram_bins": np.histogram(clean_series, bins=10)[0].tolist(),
"histogram_edges": np.histogram(clean_series, bins=10)[1].tolist(),
"unique_count": clean_series.nunique()
}
return profile
def _create_string_profile(self, series):
"""Create statistical profile for string data"""
# Remove nulls for statistical calculations
clean_series = series.dropna()
# Calculate string length statistics
length_series = clean_series.str.len()
profile = {
"data_type": "string",
"null_count": series.isna().sum(),
"null_percentage": series.isna().mean() * 100,
"unique_count": clean_series.nunique(),
"unique_percentage": (clean_series.nunique() / len(clean_series)) * 100 if len(clean_series) > 0 else 0,
"empty_count": (clean_series == '').sum(),
"empty_percentage": ((clean_series == '').sum() / len(clean_series)) * 100 if len(clean_series) > 0 else 0,
"min_length": length_series.min(),
"max_length": length_series.max(),
"avg_length": length_series.mean(),
"top_values": clean_series.value_counts().head(10).to_dict(),
"pattern_counts": {
"numeric_only": clean_series.str.match(r'^\d+$').sum(),
"alphabetic_only": clean_series.str.match(r'^[a-zA-Z]+$').sum(),
"alphanumeric": clean_series.str.match(r'^[a-zA-Z0-9]+$').sum(),
"email_pattern": clean_series.str.match(r'^[\w.-]+@[\w.-]+\.\w+$').sum(),
"url_pattern": clean_series.str.match(r'^https?://').sum()
}
}
return profile
def _create_datetime_profile(self, series):
"""Create statistical profile for datetime data"""
# Remove nulls for statistical calculations
clean_series = series.dropna()
profile = {
"data_type": "datetime",
"null_count": series.isna().sum(),
"null_percentage": series.isna().mean() * 100,
"min_date": clean_series.min().isoformat() if not clean_series.empty else None,
"max_date": clean_series.max().isoformat() if not clean_series.empty else None,
"unique_count": clean_series.nunique(),
"unique_percentage": (clean_series.nunique() / len(clean_series)) * 100 if len(clean_series) > 0 else 0,
"day_of_week_counts": clean_series.dt.day_name().value_counts().to_dict(),
"month_counts": clean_series.dt.month_name().value_counts().to_dict(),
"year_counts": clean_series.dt.year.value_counts().to_dict(),
"hour_distribution": clean_series.dt.hour.value_counts().to_dict()
}
return profile
def _create_generic_profile(self, series):
"""Create basic profile for other data types"""
profile = {
"data_type": "other",
"null_count": series.isna().sum(),
"null_percentage": series.isna().mean() * 100,
"unique_count": series.nunique(),
"unique_percentage": (series.nunique() / len(series)) * 100 if len(series) > 0 else 0,
"top_values": series.value_counts().head(10).to_dict()
}
return profile
def _compare_numeric_data(self, current_series, baseline_profile):
"""Compare current numeric data against baseline"""
# Remove nulls for statistical calculations
clean_series = current_series.dropna()
# Basic stats of current data
current_stats = {
"null_percentage": current_series.isna().mean() * 100,
"min": clean_series.min(),
"max": clean_series.max(),
"mean": clean_series.mean(),
"median": clean_series.median(),
"std": clean_series.std()
}
# Define anomaly thresholds
null_threshold = baseline_profile["null_percentage"] * 1.5
mean_threshold = baseline_profile["std"] * 3
min_max_threshold = baseline_profile["std"] * 5
# Check for anomalies
anomalies = []
if current_stats["null_percentage"] > null_threshold:
anomalies.append({
"type": "high_null_percentage",
"baseline": baseline_profile["null_percentage"],
"current": current_stats["null_percentage"],
"deviation": (current_stats["null_percentage"] - baseline_profile["null_percentage"]) / max(baseline_profile["null_percentage"], 0.001),
"severity": "high" if current_stats["null_percentage"] > null_threshold * 2 else "medium"
})
if abs(current_stats["mean"] - baseline_profile["mean"]) > mean_threshold:
anomalies.append({
"type": "mean_shift",
"baseline": baseline_profile["mean"],
"current": current_stats["mean"],
"deviation": (current_stats["mean"] - baseline_profile["mean"]) / baseline_profile["std"],
"severity": "high" if abs(current_stats["mean"] - baseline_profile["mean"]) > mean_threshold * 2 else "medium"
})
if current_stats["min"] < baseline_profile["min"] - min_max_threshold:
anomalies.append({
"type": "unusually_low_values",
"baseline_min": baseline_profile["min"],
"current_min": current_stats["min"],
"deviation": (current_stats["min"] - baseline_profile["min"]) / baseline_profile["std"],
"severity": "medium"
})
if current_stats["max"] > baseline_profile["max"] + min_max_threshold:
anomalies.append({
"type": "unusually_high_values",
"baseline_max": baseline_profile["max"],
"current_max": current_stats["max"],
"deviation": (current_stats["max"] - baseline_profile["max"]) / baseline_profile["std"],
"severity": "medium"
})
# Perform distribution comparison (Kolmogorov-Smirnov test)
try:
ks_statistic, p_value = stats.ks_2samp(
clean_series.sample(min(1000, len(clean_series))),
pd.Series(np.random.normal(
baseline_profile["mean"],
baseline_profile["std"],
min(1000, len(clean_series))
))
)
if p_value < 0.01 and ks_statistic > 0.1:
anomalies.append({
"type": "distribution_shift",
"ks_statistic": float(ks_statistic),
"p_value": float(p_value),
"severity": "high" if p_value < 0.001 and ks_statistic > 0.2 else "medium"
})
except:
# Skip distribution test if it fails
pass
# Determine overall status
if not anomalies:
status = "normal"
severity = "none"
else:
status = "anomaly"
severity = "high" if any(a["severity"] == "high" for a in anomalies) else "medium"
return {
"status": status,
"severity": severity,
"current_stats": current_stats,
"anomalies": anomalies
}
# Similar methods for comparing string, datetime, and generic data
# ...
Pipeline Health Monitoring
# Example: Pipeline Health Monitor
class PipelineHealthMonitor:
def __init__(self, observability_client):
self.observability = observability_client
def get_pipeline_metadata(self, pipeline_id):
"""Retrieve metadata about a pipeline"""
return self.observability.query(f"""
SELECT *
FROM pipeline_metadata
WHERE pipeline_id = '{pipeline_id}'
""")
def get_pipeline_trends(self, pipeline_id, days=30):
"""Analyze execution trends for a pipeline"""
executions = self.observability.query(f"""
SELECT
execution_id,
start_time,
end_time,
status,
duration_seconds,
rows_processed,
error_message
FROM pipeline_executions
WHERE pipeline_id = '{pipeline_id}'
AND start_time >= DATE_SUB(CURRENT_DATE(), INTERVAL {days} DAY)
ORDER BY start_time DESC
""")
if len(executions) == 0:
return {"status": "no_data"}
# Calculate success rate
success_rate = sum(1 for e in executions if e["status"] == "success") / len(executions)
# Calculate execution time stats
durations = [e["duration_seconds"] for e in executions if e["status"] == "success"]
if durations:
avg_duration = sum(durations) / len(durations)
min_duration = min(durations)
max_duration = max(durations)
p95_duration = sorted(durations)[int(len(durations) * 0.95)]
else:
avg_duration = min_duration = max_duration = p95_duration = 0
# Calculate throughput
throughputs = [e["rows_processed"] / e["duration_seconds"]
for e in executions
if e["status"] == "success" and e["duration_seconds"] > 0]
avg_throughput = sum(throughputs) / len(throughputs) if throughputs else 0
# Detect execution time trend
if len(durations) >= 5:
recent_avg = sum(durations[:5]) / 5
older_avg = sum(durations[-5:]) / 5
duration_trend = (recent_avg - older_avg) / older_avg if older_avg > 0 else 0
else:
duration_trend = 0
# Find common errors
errors = [e["error_message"] for e in executions if e["status"] == "failed" and e["error_message"]]
error_counts = {}
for error in errors:
error_counts[error] = error_counts.get(error, 0) + 1
common_errors = [{"message": msg, "count": count}
for msg, count in sorted(error_counts.items(),
key=lambda x: x[1],
reverse=True)[:3]]
return {
"success_rate": success_rate,
"execution_counts": {
"success": sum(1 for e in executions if e["status"] == "success"),
"failed": sum(1 for e in executions if e["status"] == "failed"),
"running": sum(1 for e in executions if e["status"] == "running")
},
"duration_stats": {
"avg_seconds": avg_duration,
"min_seconds": min_duration,
"max_seconds": max_duration,
"p95_seconds": p95_duration,
"trend_pct": duration_trend
},
"throughput_stats": {
"avg_rows_per_second": avg_throughput
},
"common_errors": common_errors,
"last_execution": {
"time": executions[0]["start_time"],
"status": executions[0]["status"]
},
"health_status": self._calculate_health_status(success_rate, duration_trend, common_errors)
}
def detect_anomalies(self, pipeline_id):
"""Detect anomalies in pipeline execution"""
# Get recent executions
recent = self.observability.query(f"""
SELECT
execution_id,
start_time,
end_time,
status,
duration_seconds,
rows_processed
FROM pipeline_executions
WHERE pipeline_id = '{pipeline_id}'
AND start_time >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
ORDER BY start_time DESC
""")
if len(recent) < 3:
return {"status": "insufficient_data"}
# Get historical data for baseline
historical = self.observability.query(f"""
SELECT
AVG(duration_seconds) as avg_duration,
STDDEV(duration_seconds) as stddev_duration,
AVG(rows_processed) as avg_rows,
STDDEV(rows_processed) as stddev_rows
FROM pipeline_executions
WHERE pipeline_id = '{pipeline_id}'
AND status = 'success'
AND start_time BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 60 DAY)
AND DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
""")
if not historical or historical[0]["avg_duration"] is None:
return {"status": "insufficient_historical_data"}
baseline = historical[0]
# Check latest execution against baseline
latest = recent[0]
anomalies = []
# Execution time anomaly
if latest["status"] == "success":
z_score_duration = (latest["duration_seconds"] - baseline["avg_duration"]) / max(baseline["stddev_duration"], 1)
if abs(z_score_duration) > 3:
anomalies.append({
"type": "execution_time",
"z_score": z_score_duration,
"baseline": baseline["avg_duration"],
"current": latest["duration_seconds"],
"description": f"Execution time {'increased' if z_score_duration > 0 else 'decreased'} significantly"
})
# Rows processed anomaly
if latest["status"] == "success":
z_score_rows = (latest["rows_processed"] - baseline["avg_rows"]) / max(baseline["stddev_rows"], 1)
if abs(z_score_rows) > 3:
anomalies.append({
"type": "rows_processed",
"z_score": z_score_rows,
"baseline": baseline["avg_rows"],
"current": latest["rows_processed"],
"description": f"Rows processed {'increased' if z_score_rows > 0 else 'decreased'} significantly"
})
# Schedule adherence
if len(recent) >= 2:
# Get pipeline schedule
pipeline_metadata = self.get_pipeline_metadata(pipeline_id)
if pipeline_metadata and "schedule_interval" in pipeline_metadata:
expected_interval = self._parse_interval_to_seconds(pipeline_metadata["schedule_interval"])
# Calculate actual intervals
intervals = []
for i in range(len(recent) - 1):
start_time1 = datetime.fromisoformat(recent[i]["start_time"])
start_time2 = datetime.fromisoformat(recent[i+1]["start_time"])
interval = (start_time1 - start_time2).total_seconds()
intervals.append(interval)
avg_interval = sum(intervals) / len(intervals)
interval_diff = abs(avg_interval - expected_interval) / expected_interval
if interval_diff > 0.5: # Over 50% deviation from expected schedule
anomalies.append({
"type": "schedule_adherence",
"expected_interval_seconds": expected_interval,
"actual_interval_seconds": avg_interval,
"deviation_pct": interval_diff * 100,
"description": f"Pipeline schedule deviated significantly from expected interval"
})
return {
"status": "anomaly" if anomalies else "normal",
"anomalies": anomalies
}
def _calculate_health_status(self, success_rate, duration_trend, common_errors):
"""Calculate overall pipeline health status"""
if success_rate < 0.5 or (success_rate < 0.8 and len(common_errors) > 0):
return "critical"
elif success_rate < 0.8 or duration_trend > 0.3 or len(common_errors) > 0:
return "warning"
else:
return "healthy"
def _parse_interval_to_seconds(self, interval_str):
"""Parse interval string like '1h' or '30m' to seconds"""
unit = interval_str[-1].lower()
value = int(interval_str[:-1])
if unit == 'd':
return value * 86400
elif unit == 'h':
return value * 3600
elif unit == 'm':
return value * 60
elif unit == 's':
return value
else:
raise ValueError(f"Unknown interval unit: {unit}")
Automatic Lineage Tracking
# Example: Data Lineage Tracking with OpenLineage integration
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState, Run, Job, Dataset
from datetime import datetime
import uuid
class DataLineageTracker:
def __init__(self, openlineage_url, namespace):
self.client = OpenLineageClient(url=openlineage_url)
self.namespace = namespace
def start_job_run(self, job_name, inputs=None, parent_run_id=None):
"""Record the start of a data job execution"""
run_id = str(uuid.uuid4())
# Create event
event = RunEvent(
eventType=RunState.START,
eventTime=datetime.now().isoformat(),
run=self._create_run(run_id, parent_run_id),
job=self._create_job(job_name),
inputs=self._create_datasets(inputs) if inputs else None,
producer="data_observability_framework"
)
# Send event
self.client.emit(event)
return run_id
def complete_job_run(self, run_id, job_name, outputs=None, status="COMPLETE"):
"""Record the completion of a data job execution"""
event = RunEvent(
eventType=RunState.COMPLETE if status == "COMPLETE" else RunState.FAIL,
eventTime=datetime.now().isoformat(),
run=self._create_run(run_id),
job=self._create_job(job_name),
outputs=self._create_datasets(outputs) if outputs else None,
producer="data_observability_framework"
)
self.client.emit(event)
def record_input_output_relation(self, job_name, inputs, outputs):
"""Record the relationship between input and output datasets"""
run_id = str(uuid.uuid4())
start_event = RunEvent(
eventType=RunState.START,
eventTime=datetime.now().isoformat(),
run=self._create_run(run_id),
job=self._create_job(job_name),
inputs=self._create_datasets(inputs),
producer="data_observability_framework"
)
complete_event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now().isoformat(),
run=self._create_run(run_id),
job=self._create_job(job_name),
inputs=self._create_datasets(inputs),
outputs=self._create_datasets(outputs),
producer="data_observability_framework"
)
self.client.emit(start_event)
self.client.emit(complete_event)
def _create_run(self, run_id, parent_run_id=None):
"""Create a run object"""
run_facets = {}
if parent_run_id:
run_facets["parent"] = {
"_producer": "data_observability_framework",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ParentRunFacet.json",
"run": {
"runId": parent_run_id
},
"job": {
"namespace": self.namespace,
"name": "parent_job"
}
}
return Run(run_id, run_facets)
def _create_job(self, job_name):
"""Create a job object"""
return Job(self.namespace, job_name)
def _create_datasets(self, datasets):
"""Create dataset objects"""
result = []
for ds in datasets:
# Handle different formats of dataset specifications
if isinstance(ds, dict):
namespace = ds.get("namespace", self.namespace)
name = ds.get("name")
# Optionally add facets with dataset metadata
facets = {}
if "schema" in ds:
facets["schema"] = {
"_producer": "data_observability_framework",
"_schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json#/$defs/SchemaDatasetFacet",
"fields": [
{"name": field["name"], "type": field["type"]}
for field in ds["schema"]
]
}
if "dataSource" in ds:
facets["dataSource"] = {
"_producer": "data_observability_framework",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasourceDatasetFacet.json",
"name": ds["dataSource"].get("name", ""),
"uri": ds["dataSource"].get("uri", "")
}
result.append(Dataset(namespace, name, facets))
else:
# Simple string representation
result.append(Dataset(self.namespace, ds))
return result
Case Studies: Data Observability in Practice
Financial Services: Ensuring Data Accuracy for Regulatory Reporting
Challenge: A large bank needed to ensure data quality and freshness for critical regulatory reporting.
Solution:
- Implemented comprehensive data observability across 200+ data assets
- Added quality checks at each stage of data transformation
- Built automated lineage tracking for audit trails
- Created real-time alerts for data quality issues
Implementation:
# Example: Regulatory Reporting Data Quality Framework
class RegulatoryReportingQuality:
def __init__(self, observability_client):
self.observability = observability_client
def register_critical_dataset(self, dataset_id, regulatory_requirements):
"""Register a dataset for regulatory compliance monitoring"""
self.observability.register_asset({
"id": dataset_id,
"criticality": "high",
"regulatory_reports": regulatory_requirements.get("reports", []),
"compliance_rules": regulatory_requirements.get("rules", []),
"data_owners": regulatory_requirements.get("owners", []),
"attestation_required": regulatory_requirements.get("attestation_required", True),
"quality_slas": regulatory_requirements.get("quality_slas", {})
})
# Set up baseline quality measurements
if "quality_metrics" in regulatory_requirements:
for metric in regulatory_requirements["quality_metrics"]:
self.observability.register_quality_metric(
dataset_id=dataset_id,
metric_name=metric["name"],
metric_type=metric["type"],
threshold=metric.get("threshold"),
critical=metric.get("critical", False)
)
Results:
- 72% reduction in data quality incidents
- Regulatory report preparation time reduced by 40%
- Audit findings decreased by 65%
E-commerce: Ensuring Reliable Analytics for Merchandising
Challenge: An e-commerce company struggled with merchandising decisions due to unreliable analytics data.
Solution:
- Implemented end-to-end pipeline observability
- Added anomaly detection for key product metrics
- Built dashboards showing data health by domain
- Created self-healing mechanisms for common data issues
Implementation:
# Example: E-commerce Metrics Anomaly Detection
class ProductMetricsMonitor:
def __init__(self, observability_client):
self.observability = observability_client
def detect_product_metrics_anomalies(self):
"""Detect anomalies in product performance metrics"""
results = []
# Get product categories
categories = self.observability.query("""
SELECT DISTINCT category
FROM product_catalog
WHERE active = TRUE
""")
for category in categories:
# Get metrics for this category
metrics = self.observability.query(f"""
SELECT
date,
SUM(views) as total_views,
SUM(add_to_carts) as total_atc,
SUM(purchases) as total_purchases,
SUM(revenue) as total_revenue,
AVG(conversion_rate) as avg_conversion
FROM product_performance
WHERE category = '{category}'
AND date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY date
ORDER BY date
""")
# Check for anomalies in each metric
for metric in ["total_views", "total_atc", "total_purchases", "total_revenue", "avg_conversion"]:
values = [m[metric] for m in metrics]
anomalies = self._detect_anomalies_in_time_series(values)
if anomalies:
results.append({
"category": category,
"metric": metric,
"anomalies": anomalies,
"current_value": values[-1] if values else None,
"avg_value": sum(values) / len(values) if values else None
})
return results
Results:
- Improved data quality score from 68% to 94%
- Reduced time to detect data anomalies from days to minutes
- Increased merchandiser trust in data from 43% to 86%
Healthcare: Ensuring Data Privacy and Quality for Patient Records
Challenge: A healthcare provider needed to maintain high data quality for patient records while ensuring privacy compliance.
Solution:
- Implemented data observability with privacy-preserving metrics
- Added automated data lineage to track PHI across systems
- Built compliance dashboards showing potential privacy risks
- Created data quality rules specific to clinical data
Implementation:
# Example: Privacy-Preserving Data Quality Monitoring
class PrivacyAwareQualityMonitoring:
def __init__(self, observability_client):
self.observability = observability_client
def monitor_phi_columns(self, database, table, phi_columns):
"""Monitor PHI columns without accessing actual values"""
results = {}
for column in phi_columns:
# Use privacy-preserving metrics that don't expose values
metrics = self.observability.query(f"""
SELECT
COUNT(*) as total_count,
SUM(CASE WHEN {column} IS NULL THEN 1 ELSE 0 END) as null_count,
COUNT(DISTINCT {column}) as distinct_count
FROM {database}.{table}
""")
if metrics:
# Calculate quality metrics without looking at values
null_rate = metrics[0]["null_count"] / metrics[0]["total_count"] if metrics[0]["total_count"] > 0 else 0
cardinality_ratio = metrics[0]["distinct_count"] / metrics[0]["total_count"] if metrics[0]["total_count"] > 0 else 0
results[column] = {
"null_rate": null_rate,
"cardinality_ratio": cardinality_ratio,
"anomalies": []
}
# Check for potential issues
if null_rate > 0.1: # More than 10% nulls in PHI column
results[column]["anomalies"].append({
"type": "high_null_rate",
"value": null_rate,
"threshold": 0.1
})
if cardinality_ratio < 0.001: # Unusually low cardinality
results[column]["anomalies"].append({
"type": "low_cardinality",
"value": cardinality_ratio,
"threshold": 0.001
})
return results
Results:
- Zero privacy breaches related to data processing
- Improved data completeness in patient records by 28%
- Reduced manual compliance checks by 70%
Decision Rules
Use this checklist for data observability decisions:
- If you don’t know when data is stale, add freshness monitoring first
- If schema changes break pipelines, add schema validation and change alerting
- If lineage is unclear, instrument your transforms to capture upstream sources
- If issues repeat, build automated remediation rather than alert escalation
- If observability tools aren’t used, the problem is usually alert fatigue, not tooling
Data observability only adds value if teams act on the signals.