Data quality determines decision quality. Poor data leads to flawed analytics and misguided business decisions. Manual data quality reviews don’t scale and catch issues too late.
This article covers building automated, continuous data quality monitoring systems.
Why Automate Data Quality Monitoring?
Manual data quality checks face several limitations:
- Scale limitations: Human reviewers can only sample data, not examine entire datasets
- Delayed detection: Issues are often discovered long after they occur
- Inconsistent application: Different reviewers may apply different standards
- Resource intensity: Manual reviews consume valuable analyst time
Automated monitoring addresses these challenges by providing:
- Continuous vigilance: 24/7 monitoring across all data pipelines
- Comprehensive coverage: Ability to check entire datasets, not just samples
- Immediate alerts: Real-time notification when issues arise
- Consistent application: Rules applied uniformly across all data
- Historical tracking: Ability to observe quality trends over time
Core Components of Automated Data Quality Monitoring
A complete data quality monitoring system typically includes these components:
1. Data Profiling and Metadata Collection
Before you can monitor data quality, you need to understand your data’s characteristics:
# Example using Great Expectations for data profiling
import great_expectations as ge
import pandas as pd
# Load data
df = pd.read_csv("customer_transactions.csv")
ge_df = ge.from_pandas(df)
# Generate data profile
profile = ge_df.profile_column("transaction_amount")
print(f"Min value: {profile.min}")
print(f"Max value: {profile.max}")
print(f"Mean value: {profile.mean}")
print(f"Null count: {profile.null_count}")
print(f"Distinct values: {profile.distinct_count}")
Profiling should capture:
- Statistical distributions
- Data types and formats
- Null/missing value patterns
- Cardinality of fields
- Data relationships and dependencies
2. Quality Rule Definition Framework
Next, establish a framework for defining quality expectations:
# Example using dbt test framework
# In your schema.yml file:
version: 2
models:
- name: customers
columns:
- name: customer_id
tests:
- unique
- not_null
- name: email
tests:
- not_null
- unique
- custom_regex_match:
regex: '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
- name: signup_date
tests:
- not_null
- date_before_today
Quality rules typically include:
- Correctness: Values conform to expected formats and fall within valid ranges
- Completeness: Required fields contain data
- Uniqueness: No unwanted duplicates exist
- Consistency: Related data elements don’t contradict each other
- Timeliness: Data is available when needed and reflects current state
- Referential integrity: Foreign keys match existing primary keys
3. Validation Engine
The validation engine executes quality rules against your data:
# Example using Soda Core for validation
import soda.scan as scan
scan_builder = scan.ScanBuilder()
scan_builder.add_dataset_option("customers", "database_name", "marketing")
scan_builder.set_data_source_name("postgres")
# Add validation checks
scan_builder.add_sodacl_yaml_str("""
checks for customers:
- row_count > 0
- duplicate_count(customer_id) = 0
- missing_count(email) = 0
- invalid_percent(email, REGEX '^[\\w.-]+@[\\w.-]+\\.[a-zA-Z]{2,}$') = 0
- avg(lifetime_value) between 100 and 5000
""")
# Execute the scan
scan_result = scan_builder.build().execute()
if scan_result.has_check_fails():
print("Quality issues detected!")
for check in scan_result.get_check_results():
if check.outcome == "fail":
print(f"Failed check: {check.name}")
else:
print("All quality checks passed!")
Your validation engine should support:
- Scheduled and event-triggered execution
- Both batch and streaming data validation
- Configurable severity levels for different types of issues
- Performance optimization for large datasets
4. Anomaly Detection System
Beyond defined rules, use anomaly detection to catch unexpected issues:
# Example using PyOD for anomaly detection
import numpy as np
from pyod.models.iforest import IForest
from pyod.utils.data import generate_data
# Load historical data and train model
historical_data = pd.read_csv("historical_order_metrics.csv")
X_train = historical_data[['order_count', 'average_order_value', 'return_rate']].values
# Initialize and fit the model
clf = IForest(contamination=0.05) # Expect about 5% anomalies
clf.fit(X_train)
# Check today's data
today_data = pd.read_csv("today_order_metrics.csv")
X_test = today_data[['order_count', 'average_order_value', 'return_rate']].values
# Get anomaly scores and predictions
y_scores = clf.decision_function(X_test) # Anomaly scores
y_preds = clf.predict(X_test) # Binary labels (0: normal, 1: anomaly)
# Identify and report anomalies
for i, is_anomaly in enumerate(y_preds):
if is_anomaly == 1:
print(f"Anomaly detected on row {i}:")
print(f" Order count: {X_test[i][0]}")
print(f" AOV: ${X_test[i][1]:.2f}")
print(f" Return rate: {X_test[i][2]:.2%}")
print(f" Anomaly score: {y_scores[i]:.4f}")
Effective anomaly detection considers:
- Seasonal patterns and trends
- Multiple dimensions simultaneously
- Relative changes and absolute thresholds
- Historical context and business cycles
5. Alerting and Notification System
When issues are detected, stakeholders need to be notified:
# Example using Python for Slack notifications
import requests
import json
def send_data_quality_alert(webhook_url, issue_description, severity, affected_tables, metrics):
# Format the message payload
payload = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"🚨 Data Quality Alert: {severity.upper()}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Issue:* {issue_description}\n*Affected Tables:* {', '.join(affected_tables)}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Metrics:*\n" + "\n".join([f"• {k}: {v}" for k, v in metrics.items()])
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "View Details"
},
"url": "https://data-quality-dashboard.company.com/alerts/123"
}
]
}
]
}
# Send the notification
response = requests.post(
webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(f"Request to Slack returned an error {response.status_code}, response: {response.text}")
Your alerting system should include:
- Severity-based routing and prioritization
- Customizable notification channels (email, Slack, SMS, etc.)
- Alert aggregation to prevent alert fatigue
- Escalation paths for critical issues
- Context and recommendations for remediation
6. Dashboard and Reporting Interface
Create a central location to monitor data quality metrics:
# Example using Streamlit for a simple data quality dashboard
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
st.title("Data Quality Dashboard")
# Load quality metrics data
quality_metrics = pd.read_csv("quality_metrics_history.csv")
# Show overall health score
current_health = quality_metrics.iloc[-1]["overall_health_score"]
st.metric(
label="Overall Data Health Score",
value=f"{current_health:.1f}%",
delta=f"{current_health - quality_metrics.iloc[-2]['overall_health_score']:.1f}%"
)
# Display quality trends
st.subheader("Quality Score Trends")
fig = px.line(
quality_metrics,
x="date",
y=["completeness_score", "accuracy_score", "consistency_score"],
labels={"value": "Score (%)", "variable": "Metric"}
)
st.plotly_chart(fig)
# Show active issues
st.subheader("Active Quality Issues")
issues = pd.read_csv("active_issues.csv")
st.dataframe(issues)
# Display data volume vs. error rate
st.subheader("Data Volume vs. Error Rate")
fig = go.Figure()
fig.add_trace(go.Bar(
x=quality_metrics["date"],
y=quality_metrics["record_count"],
name="Record Count"
))
fig.add_trace(go.Scatter(
x=quality_metrics["date"],
y=quality_metrics["error_rate"] * 100,
name="Error Rate (%)",
yaxis="y2"
))
fig.update_layout(
yaxis=dict(title="Record Count"),
yaxis2=dict(title="Error Rate (%)", overlaying="y", side="right")
)
st.plotly_chart(fig)
An effective dashboard provides:
- Real-time quality metrics and trends
- Drill-down capabilities for detailed analysis
- Historical comparison and benchmarking
- Impact assessment of quality issues
- Integration with data catalogs and lineage tools
Implementation Approaches
There are several ways to implement automated data quality monitoring:
In-Pipeline Validation
Embed quality checks directly into your data pipelines:
# Example in Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
import pandas as pd
import great_expectations as ge
def extract_data():
# Extract data logic here
return {"data": extracted_data}
def validate_data(**context):
# Get data from previous task
data = context['ti'].xcom_pull(task_ids='extract_data')['data']
# Create Great Expectations DataFrame
ge_df = ge.from_pandas(pd.DataFrame(data))
# Validate data quality
validation_result = ge_df.validate(
expectation_suite_name="source_data_quality_suite"
)
# If validation fails, raise exception to halt the pipeline
if not validation_result["success"]:
failed_expectations = [exp for exp in validation_result["results"] if not exp["success"]]
raise ValueError(f"Data quality validation failed: {failed_expectations}")
return {"data": data, "validation_result": validation_result}
def transform_data(**context):
# Only runs if validation passes
data = context['ti'].xcom_pull(task_ids='validate_data')['data']
# Transform logic here
with DAG(
'etl_with_quality_checks',
start_date=days_ago(1),
schedule_interval='@daily'
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_data
)
validate = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
provide_context=True
)
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
provide_context=True
)
# More pipeline tasks...
extract >> validate >> transform
Benefits:
- Prevents bad data from propagating downstream
- Provides immediate feedback during processing
- Integrates naturally with existing pipelines
Limitations:
- May increase pipeline complexity and execution time
- Limited to scheduled pipeline runs
- Usually focuses on structural quality rather than business context
Sidecar Monitoring
Run quality checks in parallel with your data pipelines:
# Example Docker Compose configuration for a quality monitoring sidecar
version: '3'
services:
data-pipeline:
image: data-pipeline-image
volumes:
- ./data:/data
environment:
- PIPELINE_CONFIG=production
networks:
- data-network
quality-monitor:
image: quality-monitor-image
volumes:
- ./data:/data:ro # Read-only access to the same data
environment:
- MONITOR_CONFIG=production
- ALERT_WEBHOOK=https://hooks.slack.com/services/XXX/YYY/ZZZ
networks:
- data-network
depends_on:
- data-pipeline
networks:
data-network:
Benefits:
- Doesn’t impact pipeline performance
- Can be deployed/updated independently
- Can monitor multiple pipelines simultaneously
Limitations:
- Potential lag in issue detection
- Requires additional infrastructure
- May need custom integration with pipelines
Continuous Monitoring Service
Implement a dedicated service that continuously monitors data sources:
# Example using Prefect for a continuous monitoring service
from prefect import task, Flow
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
import pandas as pd
import sqlalchemy
import great_expectations as ge
# Configure monitoring schedule
schedule = IntervalSchedule(
start_date=datetime.utcnow(),
interval=timedelta(minutes=15)
)
@task
def get_database_connection():
return sqlalchemy.create_engine("postgresql://user:password@host:port/database")
@task
def get_tables_to_monitor(conn):
query = "SELECT table_name FROM monitoring_registry WHERE is_active = true"
return pd.read_sql(query, conn)["table_name"].tolist()
@task
def check_table_quality(conn, table_name):
# Query the latest data
query = f"SELECT * FROM {table_name} WHERE created_at >= NOW() - INTERVAL '15 minutes'"
df = pd.read_sql(query, conn)
if df.empty:
return {"table": table_name, "status": "NO_DATA"}
# Create Great Expectations DataFrame
ge_df = ge.from_pandas(df)
# Get expectations for this table
expectations = get_expectations_for_table(table_name)
# Validate against expectations
results = []
for expectation in expectations:
exp_type = expectation["type"]
exp_args = expectation["args"]
if exp_type == "column_values_to_not_be_null":
result = ge_df.expect_column_values_to_not_be_null(**exp_args)
elif exp_type == "column_values_to_be_between":
result = ge_df.expect_column_values_to_be_between(**exp_args)
# Add more expectation types as needed
results.append(result)
# Determine if there are any failures
failures = [r for r in results if not r.success]
if failures:
return {
"table": table_name,
"status": "FAILURE",
"failures": failures
}
else:
return {
"table": table_name,
"status": "SUCCESS"
}
@task
def send_alerts(results):
failures = [r for r in results if r["status"] == "FAILURE"]
if failures:
for failure in failures:
send_alert_notification(
table=failure["table"],
failures=failure["failures"]
)
with Flow("Continuous Data Quality Monitoring", schedule=schedule) as flow:
conn = get_database_connection()
tables = get_tables_to_monitor(conn)
results = [check_table_quality(conn, table) for table in tables]
send_alerts(results)
# Register the flow with the Prefect server
flow.register(project_name="data-quality")
Benefits:
- Provides near-real-time monitoring independent of pipelines
- Centralized configuration and management
- Can detect issues even when pipelines aren’t running
Limitations:
- Requires additional infrastructure and management
- May create additional load on data sources
- Needs careful access control to data resources
Advanced Features and Considerations
Self-Learning Quality Rules
Implement systems that learn normal patterns and adjust quality thresholds automatically:
# Example of self-learning thresholds
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
class AdaptiveThresholdMonitor:
def __init__(self, metric_name, history_window=30, contamination=0.05):
self.metric_name = metric_name
self.history_window = history_window
self.contamination = contamination
self.history = []
self.model = None
def add_measurement(self, value, timestamp):
self.history.append({"timestamp": timestamp, "value": value})
# Keep only the history window
if len(self.history) > self.history_window:
self.history = self.history[-self.history_window:]
# Once we have enough history, train the model
if len(self.history) >= self.history_window:
self._train_model()
def _train_model(self):
# Extract values and reshape for sklearn
values = np.array([h["value"] for h in self.history]).reshape(-1, 1)
# Train isolation forest model
self.model = IsolationForest(contamination=self.contamination)
self.model.fit(values)
def is_anomaly(self, value):
if self.model is None:
# Not enough history to determine
return False
# Predict if this is an anomaly (-1 for anomalies, 1 for normal)
prediction = self.model.predict(np.array([value]).reshape(1, -1))
return prediction[0] == -1
def get_current_thresholds(self):
if len(self.history) < self.history_window:
# Not enough data yet, use simple statistics
values = [h["value"] for h in self.history]
mean = np.mean(values)
std = np.std(values)
return {
"lower": mean - 3 * std,
"upper": mean + 3 * std,
"type": "statistical"
}
# For isolation forest, we can't directly get thresholds
# Instead, sample points and find decision boundary
min_val = min(h["value"] for h in self.history)
max_val = max(h["value"] for h in self.history)
# Generate test points across the range
test_range = np.linspace(min_val - 0.1 * (max_val - min_val),
max_val + 0.1 * (max_val - min_val),
100).reshape(-1, 1)
# Get predictions for each point
preds = self.model.predict(test_range)
# Find threshold crossings
threshold_points = []
for i in range(1, len(preds)):
if preds[i] != preds[i-1]:
threshold_points.append(test_range[i][0])
if not threshold_points:
# No clear boundary found
return {
"model": "isolation_forest",
"thresholds": "non-linear"
}
return {
"model": "isolation_forest",
"thresholds": threshold_points
}
Root Cause Analysis
Automatically identify potential causes of data quality issues:
# Example of root cause analysis
def analyze_root_cause(issue_table, issue_column, issue_description):
# Connect to the data warehouse
conn = get_warehouse_connection()
# Get data lineage information
lineage = get_column_lineage(issue_table, issue_column)
# Check upstream tables in the lineage
upstream_issues = []
for upstream_table, upstream_column in lineage["upstream_columns"]:
# Check if this column has issues
query = f"""
SELECT COUNT(*) as issue_count
FROM {upstream_table}
WHERE {get_issue_condition(upstream_column, issue_description)}
"""
result = pd.read_sql(query, conn)
if result.iloc[0]["issue_count"] > 0:
# This upstream table has similar issues
upstream_issues.append({
"table": upstream_table,
"column": upstream_column,
"issue_count": result.iloc[0]["issue_count"]
})
# Check for recent changes to ETL code
etl_changes = get_recent_etl_changes(issue_table)
# Check for data volume anomalies
volume_check_query = f"""
SELECT
DATE_TRUNC('day', created_at) as day,
COUNT(*) as record_count
FROM {issue_table}
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY 1
ORDER BY 1
"""
volume_data = pd.read_sql(volume_check_query, conn)
# Check for schema changes
schema_changes = get_recent_schema_changes(issue_table)
return {
"upstream_issues": upstream_issues,
"etl_changes": etl_changes,
"volume_anomalies": analyze_volume_trends(volume_data),
"schema_changes": schema_changes
}
Impact Assessment
Determine the business impact of data quality issues:
# Example of impact assessment
def assess_quality_issue_impact(issue_details):
impact_score = 0
impact_areas = []
# Check downstream dependencies
downstream_tables = get_downstream_tables(issue_details["table"])
# Check for critical business processes affected
for table in downstream_tables:
business_processes = get_business_processes_using_table(table)
for process in business_processes:
impact_score += process["criticality_score"]
impact_areas.append({
"process": process["name"],
"criticality": process["criticality_score"],
"owners": process["owners"]
})
# Check for affected dashboards/reports
affected_dashboards = get_dashboards_using_table(issue_details["table"])
for dashboard in affected_dashboards:
impact_score += dashboard["usage_score"]
impact_areas.append({
"dashboard": dashboard["name"],
"users": dashboard["user_count"],
"last_viewed": dashboard["last_viewed"]
})
# Check for ML models using this data
affected_models = get_ml_models_using_table(issue_details["table"])
for model in affected_models:
impact_score += model["importance_score"]
impact_areas.append({
"model": model["name"],
"usage": model["usage_description"],
"owner": model["owner"]
})
return {
"overall_impact_score": impact_score,
"impact_level": categorize_impact_score(impact_score),
"impact_areas": impact_areas
}
Auto-Remediation
For certain types of issues, implement automatic fixes:
# Example of auto-remediation for common issues
def apply_auto_remediation(issue):
remediation_applied = False
if issue["type"] == "missing_values" and issue["column"] in AUTO_REMEDIATE_COLUMNS:
# Get default value for this column
default_value = get_default_value_for_column(issue["table"], issue["column"])
# Apply default value
query = f"""
UPDATE {issue["table"]}
SET {issue["column"]} = %s
WHERE {issue["column"]} IS NULL
"""
execute_query(query, [default_value])
remediation_applied = True
elif issue["type"] == "out_of_range_values" and issue["column"] in AUTO_REMEDIATE_COLUMNS:
# Get valid range for this column
valid_range = get_valid_range_for_column(issue["table"], issue["column"])
# Clamp values to valid range
query = f"""
UPDATE {issue["table"]}
SET {issue["column"]} = LEAST(GREATEST({issue["column"]}, %s), %s)
WHERE {issue["column"]} < %s OR {issue["column"]} > %s
"""
execute_query(query, [valid_range["min"], valid_range["max"], valid_range["min"], valid_range["max"]])
remediation_applied = True
elif issue["type"] == "duplicate_records":
# Keep only one record per duplicate group
query = f"""
DELETE FROM {issue["table"]} a
USING {issue["table"]} b
WHERE a.{issue["primary_key"]} > b.{issue["primary_key"]}
AND a.{issue["duplicate_key"]} = b.{issue["duplicate_key"]}
"""
execute_query(query)
remediation_applied = True
# Log the remediation action
log_remediation_action(issue, remediation_applied)
return remediation_applied
Implementation Challenges and Solutions
Scaling with Data Volume
As data volumes grow, quality checks can become expensive:
Solution: Sampling and Partitioning
# Example of intelligent sampling for quality checks
def smart_quality_check(table_name, column_name, check_type):
# Determine table size
row_count = get_table_row_count(table_name)
if row_count > 10_000_000:
# Very large table - use stratified sampling
sample_query = f"""
WITH strata AS (
SELECT
{column_name},
NTILE(100) OVER (ORDER BY {column_name}) AS stratum
FROM {table_name}
)
SELECT t.*
FROM {table_name} t
JOIN (
SELECT DISTINCT {column_name}
FROM strata
TABLESAMPLE BERNOULLI (1) -- 1% sample from each stratum
) s
ON t.{column_name} = s.{column_name}
LIMIT 100000
"""
sample_df = pd.read_sql(sample_query, conn)
elif row_count > 1_000_000:
# Large table - use random sampling
sample_query = f"""
SELECT *
FROM {table_name}
TABLESAMPLE BERNOULLI (1)
LIMIT 10000
"""
sample_df = pd.read_sql(sample_query, conn)
else:
# Small enough table - use all data
sample_df = pd.read_sql(f"SELECT * FROM {table_name}", conn)
# Now perform the quality check on the sample
return perform_quality_check(sample_df, column_name, check_type)
Managing Alert Fatigue
Too many alerts can lead to ignored alerts:
Solution: Alert Aggregation and Prioritization
# Example of alert aggregation
class AlertManager:
def __init__(self):
self.active_alerts = {}
self.alert_history = []
def add_alert(self, alert):
# Generate a key for this type of alert
alert_key = f"{alert['table']}_{alert['column']}_{alert['issue_type']}"
if alert_key in self.active_alerts:
# Update existing alert
existing_alert = self.active_alerts[alert_key]
existing_alert["occurrence_count"] += 1
existing_alert["last_detected"] = alert["detected_at"]
existing_alert["samples"].extend(alert["samples"])
existing_alert["samples"] = existing_alert["samples"][-5:] # Keep last 5 samples
# Only send notification if this is a significant update
if self.should_resend_notification(existing_alert):
self.send_notification(existing_alert)
else:
# New alert
alert["occurrence_count"] = 1
alert["first_detected"] = alert["detected_at"]
alert["status"] = "active"
self.active_alerts[alert_key] = alert
# Calculate priority
alert["priority"] = self.calculate_priority(alert)
# Send initial notification
self.send_notification(alert)
def resolve_alert(self, alert_key):
if alert_key in self.active_alerts:
alert = self.active_alerts[alert_key]
alert["status"] = "resolved"
alert["resolved_at"] = datetime.now()
# Move to history
self.alert_history.append(alert)
del self.active_alerts[alert_key]
# Send resolution notification
self.send_resolution_notification(alert)
def calculate_priority(self, alert):
# Calculate priority based on multiple factors
priority = 0
# Factor 1: Impact of the table
table_importance = get_table_importance(alert["table"])
priority += table_importance * 5
# Factor 2: Type of issue
if alert["issue_type"] in ["data_loss", "corruption"]:
priority += 30
elif alert["issue_type"] in ["schema_change", "reference_violation"]:
priority += 20
elif alert["issue_type"] in ["missing_values", "duplicates"]:
priority += 10
# Factor 3: Percentage of affected rows
affected_pct = alert.get("affected_percentage", 0)
if affected_pct > 50:
priority += 20
elif affected_pct > 10:
priority += 10
elif affected_pct > 1:
priority += 5
# Categorize priority
if priority >= 40:
return "high"
elif priority >= 20:
return "medium"
else:
return "low"
def should_resend_notification(self, alert):
# Logic to determine if we should send another notification
# Based on time elapsed, severity, and number of occurrences
if alert["priority"] == "high":
# Resend high priority alerts every hour
time_since_last = datetime.now() - alert.get("last_notification", alert["first_detected"])
return time_since_last.total_seconds() > 3600
elif alert["priority"] == "medium":
# Resend medium priority alerts daily
time_since_last = datetime.now() - alert.get("last_notification", alert["first_detected"])
return time_since_last.total_seconds() > 86400
else:
# Low priority - only resend if occurrence count crosses thresholds
return alert["occurrence_count"] in [10, 100, 1000]
def send_notification(self, alert):
# Logic to send notification based on priority
alert["last_notification"] = datetime.now()
if alert["priority"] == "high":
# Send to urgent channels
send_slack_notification(URGENT_CHANNEL, format_alert(alert))
send_email_notification(get_table_owners(alert["table"]), format_alert_email(alert))
elif alert["priority"] == "medium":
# Send to general channels
send_slack_notification(GENERAL_CHANNEL, format_alert(alert))
else:
# Low priority - just log in dashboard
update_dashboard_alerts(alert)
Balancing Precision and Recall
Setting thresholds too tight leads to false positives, too loose leads to missed issues:
Solution: Adaptive Thresholds
# Example of adaptive thresholds based on historical patterns
class AdaptiveThreshold:
def __init__(self, column_name, initial_threshold=0.05):
self.column_name = column_name
self.base_threshold = initial_threshold
self.history = []
self.seasonal_patterns = {}
self.trend = None
def add_measurement(self, value, timestamp):
self.history.append({"timestamp": timestamp, "value": value})
# Keep last 90 days of history
cutoff = timestamp - timedelta(days=90)
self.history = [h for h in self.history if h["timestamp"] >= cutoff]
# Update patterns when we have enough data
if len(self.history) >= 30:
self._update_patterns()
def _update_patterns(self):
df = pd.DataFrame(self.history)
# Convert timestamp to datetime
df["timestamp"] = pd.to_datetime(df["timestamp"])
# Extract time components
df["hour"] = df["timestamp"].dt.hour
df["day_of_week"] = df["timestamp"].dt.dayofweek
df["day_of_month"] = df["timestamp"].dt.day
df["month"] = df["timestamp"].dt.month
# Calculate hourly patterns
hourly = df.groupby("hour")["value"].agg(["mean", "std"]).reset_index()
self.seasonal_patterns["hourly"] = hourly.set_index("hour").to_dict('index')
# Calculate day of week patterns
dow = df.groupby("day_of_week")["value"].agg(["mean", "std"]).reset_index()
self.seasonal_patterns["day_of_week"] = dow.set_index("day_of_week").to_dict('index')
# Calculate monthly patterns
monthly = df.groupby("month")["value"].agg(["mean", "std"]).reset_index()
self.seasonal_patterns["monthly"] = monthly.set_index("month").to_dict('index')
# Calculate overall trend
df = df.sort_values("timestamp")
if len(df) >= 60:
# Use last 60 days for trend calculation
recent_df = df.tail(60)
x = np.array(range(len(recent_df)))
y = recent_df["value"].values
# Linear regression for trend
z = np.polyfit(x, y, 1)
self.trend = {
"slope": z[0],
"intercept": z[1]
}
def get_threshold(self, timestamp):
# Start with base threshold
threshold = self.base_threshold
# If we have enough history, adjust based on patterns
if self.seasonal_patterns and len(self.history) >= 30:
# Extract time components
dt = pd.to_datetime(timestamp)
hour = dt.hour
day_of_week = dt.dayofweek
month = dt.month
# Adjust based on hourly pattern
if hour in self.seasonal_patterns["hourly"]:
hourly_std = self.seasonal_patterns["hourly"][hour]["std"]
threshold = max(threshold, hourly_std * 2)
# Adjust based on day of week
if day_of_week in self.seasonal_patterns["day_of_week"]:
dow_std = self.seasonal_patterns["day_of_week"][day_of_week]["std"]
threshold = max(threshold, dow_std * 2)
# Adjust based on month
if month in self.seasonal_patterns["monthly"]:
monthly_std = self.seasonal_patterns["monthly"][month]["std"]
threshold = max(threshold, monthly_std * 2)
return threshold
def is_anomaly(self, value, timestamp):
threshold = self.get_threshold(timestamp)
# If we have seasonal patterns, compare to the expected value
if self.seasonal_patterns:
dt = pd.to_datetime(timestamp)
hour = dt.hour
day_of_week = dt.dayofweek
# Get expected value based on hour and day of week
expected_value_hour = self.seasonal_patterns["hourly"].get(hour, {}).get("mean")
expected_value_dow = self.seasonal_patterns["day_of_week"].get(day_of_week, {}).get("mean")
if expected_value_hour is not None and expected_value_dow is not None:
# Use average of hour and day of week expectations
expected_value = (expected_value_hour + expected_value_dow) / 2
# Adjust for trend if available
if self.trend:
# Calculate days since first data point
days_diff = (timestamp - self.history[0]["timestamp"]).days
trend_adjustment = self.trend["slope"] * days_diff
expected_value += trend_adjustment
# Calculate deviation
deviation = abs((value - expected_value) / expected_value)
return deviation > threshold
# If we don't have patterns yet, use simple statistical approach
if len(self.history) >= 10:
values = [h["value"] for h in self.history]
mean_value = sum(values) / len(values)
std_dev = (sum((x - mean_value) ** 2 for x in values) / len(values)) ** 0.5
# Check if value is more than 3 standard deviations from mean
return abs(value - mean_value) > 3 * std_dev
# Too little history, use very permissive threshold
return False
Decision Rules
Use this checklist for data quality monitoring decisions:
- If bad data causes downstream failures, add validation checks before the pipeline runs
- If you only sample data for quality checks, the issues you miss are the ones that matter
- If alert fatigue is a problem, aggregate similar alerts rather than sending one per incident
- If thresholds are causing false positives, use historical data to set adaptive thresholds
- If you don’t know what good data looks like, profile it first before defining rules
Data quality monitoring only adds value if you act on the alerts.