Data Quality Monitoring Automation

Data Quality Monitoring Automation

Simor Consulting | 01 Jul, 2024 | 11 Mins read

Data quality determines decision quality. Poor data leads to flawed analytics and misguided business decisions. Manual data quality reviews don’t scale and catch issues too late.

This article covers building automated, continuous data quality monitoring systems.

Why Automate Data Quality Monitoring?

Manual data quality checks face several limitations:

  1. Scale limitations: Human reviewers can only sample data, not examine entire datasets
  2. Delayed detection: Issues are often discovered long after they occur
  3. Inconsistent application: Different reviewers may apply different standards
  4. Resource intensity: Manual reviews consume valuable analyst time

Automated monitoring addresses these challenges by providing:

  • Continuous vigilance: 24/7 monitoring across all data pipelines
  • Comprehensive coverage: Ability to check entire datasets, not just samples
  • Immediate alerts: Real-time notification when issues arise
  • Consistent application: Rules applied uniformly across all data
  • Historical tracking: Ability to observe quality trends over time

Core Components of Automated Data Quality Monitoring

A complete data quality monitoring system typically includes these components:

1. Data Profiling and Metadata Collection

Before you can monitor data quality, you need to understand your data’s characteristics:

# Example using Great Expectations for data profiling
import great_expectations as ge
import pandas as pd

# Load data
df = pd.read_csv("customer_transactions.csv")
ge_df = ge.from_pandas(df)

# Generate data profile
profile = ge_df.profile_column("transaction_amount")

print(f"Min value: {profile.min}")
print(f"Max value: {profile.max}")
print(f"Mean value: {profile.mean}")
print(f"Null count: {profile.null_count}")
print(f"Distinct values: {profile.distinct_count}")

Profiling should capture:

  • Statistical distributions
  • Data types and formats
  • Null/missing value patterns
  • Cardinality of fields
  • Data relationships and dependencies

2. Quality Rule Definition Framework

Next, establish a framework for defining quality expectations:

# Example using dbt test framework
# In your schema.yml file:
version: 2

models:
  - name: customers
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null
      - name: email
        tests:
          - not_null
          - unique
          - custom_regex_match:
              regex: '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
      - name: signup_date
        tests:
          - not_null
          - date_before_today

Quality rules typically include:

  • Correctness: Values conform to expected formats and fall within valid ranges
  • Completeness: Required fields contain data
  • Uniqueness: No unwanted duplicates exist
  • Consistency: Related data elements don’t contradict each other
  • Timeliness: Data is available when needed and reflects current state
  • Referential integrity: Foreign keys match existing primary keys

3. Validation Engine

The validation engine executes quality rules against your data:

# Example using Soda Core for validation
import soda.scan as scan

scan_builder = scan.ScanBuilder()
scan_builder.add_dataset_option("customers", "database_name", "marketing")
scan_builder.set_data_source_name("postgres")

# Add validation checks
scan_builder.add_sodacl_yaml_str("""
checks for customers:
  - row_count > 0
  - duplicate_count(customer_id) = 0
  - missing_count(email) = 0
  - invalid_percent(email, REGEX '^[\\w.-]+@[\\w.-]+\\.[a-zA-Z]{2,}$') = 0
  - avg(lifetime_value) between 100 and 5000
""")

# Execute the scan
scan_result = scan_builder.build().execute()

if scan_result.has_check_fails():
    print("Quality issues detected!")
    for check in scan_result.get_check_results():
        if check.outcome == "fail":
            print(f"Failed check: {check.name}")
else:
    print("All quality checks passed!")

Your validation engine should support:

  • Scheduled and event-triggered execution
  • Both batch and streaming data validation
  • Configurable severity levels for different types of issues
  • Performance optimization for large datasets

4. Anomaly Detection System

Beyond defined rules, use anomaly detection to catch unexpected issues:

# Example using PyOD for anomaly detection
import numpy as np
from pyod.models.iforest import IForest
from pyod.utils.data import generate_data

# Load historical data and train model
historical_data = pd.read_csv("historical_order_metrics.csv")
X_train = historical_data[['order_count', 'average_order_value', 'return_rate']].values

# Initialize and fit the model
clf = IForest(contamination=0.05)  # Expect about 5% anomalies
clf.fit(X_train)

# Check today's data
today_data = pd.read_csv("today_order_metrics.csv")
X_test = today_data[['order_count', 'average_order_value', 'return_rate']].values

# Get anomaly scores and predictions
y_scores = clf.decision_function(X_test)  # Anomaly scores
y_preds = clf.predict(X_test)  # Binary labels (0: normal, 1: anomaly)

# Identify and report anomalies
for i, is_anomaly in enumerate(y_preds):
    if is_anomaly == 1:
        print(f"Anomaly detected on row {i}:")
        print(f"  Order count: {X_test[i][0]}")
        print(f"  AOV: ${X_test[i][1]:.2f}")
        print(f"  Return rate: {X_test[i][2]:.2%}")
        print(f"  Anomaly score: {y_scores[i]:.4f}")

Effective anomaly detection considers:

  • Seasonal patterns and trends
  • Multiple dimensions simultaneously
  • Relative changes and absolute thresholds
  • Historical context and business cycles

5. Alerting and Notification System

When issues are detected, stakeholders need to be notified:

# Example using Python for Slack notifications
import requests
import json

def send_data_quality_alert(webhook_url, issue_description, severity, affected_tables, metrics):
    # Format the message payload
    payload = {
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"🚨 Data Quality Alert: {severity.upper()}"
                }
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*Issue:* {issue_description}\n*Affected Tables:* {', '.join(affected_tables)}"
                }
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "*Metrics:*\n" + "\n".join([f"• {k}: {v}" for k, v in metrics.items()])
                }
            },
            {
                "type": "actions",
                "elements": [
                    {
                        "type": "button",
                        "text": {
                            "type": "plain_text",
                            "text": "View Details"
                        },
                        "url": "https://data-quality-dashboard.company.com/alerts/123"
                    }
                ]
            }
        ]
    }

    # Send the notification
    response = requests.post(
        webhook_url,
        data=json.dumps(payload),
        headers={'Content-Type': 'application/json'}
    )

    if response.status_code != 200:
        raise ValueError(f"Request to Slack returned an error {response.status_code}, response: {response.text}")

Your alerting system should include:

  • Severity-based routing and prioritization
  • Customizable notification channels (email, Slack, SMS, etc.)
  • Alert aggregation to prevent alert fatigue
  • Escalation paths for critical issues
  • Context and recommendations for remediation

6. Dashboard and Reporting Interface

Create a central location to monitor data quality metrics:

# Example using Streamlit for a simple data quality dashboard
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

st.title("Data Quality Dashboard")

# Load quality metrics data
quality_metrics = pd.read_csv("quality_metrics_history.csv")

# Show overall health score
current_health = quality_metrics.iloc[-1]["overall_health_score"]
st.metric(
    label="Overall Data Health Score",
    value=f"{current_health:.1f}%",
    delta=f"{current_health - quality_metrics.iloc[-2]['overall_health_score']:.1f}%"
)

# Display quality trends
st.subheader("Quality Score Trends")
fig = px.line(
    quality_metrics,
    x="date",
    y=["completeness_score", "accuracy_score", "consistency_score"],
    labels={"value": "Score (%)", "variable": "Metric"}
)
st.plotly_chart(fig)

# Show active issues
st.subheader("Active Quality Issues")
issues = pd.read_csv("active_issues.csv")
st.dataframe(issues)

# Display data volume vs. error rate
st.subheader("Data Volume vs. Error Rate")
fig = go.Figure()
fig.add_trace(go.Bar(
    x=quality_metrics["date"],
    y=quality_metrics["record_count"],
    name="Record Count"
))
fig.add_trace(go.Scatter(
    x=quality_metrics["date"],
    y=quality_metrics["error_rate"] * 100,
    name="Error Rate (%)",
    yaxis="y2"
))
fig.update_layout(
    yaxis=dict(title="Record Count"),
    yaxis2=dict(title="Error Rate (%)", overlaying="y", side="right")
)
st.plotly_chart(fig)

An effective dashboard provides:

  • Real-time quality metrics and trends
  • Drill-down capabilities for detailed analysis
  • Historical comparison and benchmarking
  • Impact assessment of quality issues
  • Integration with data catalogs and lineage tools

Implementation Approaches

There are several ways to implement automated data quality monitoring:

In-Pipeline Validation

Embed quality checks directly into your data pipelines:

# Example in Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
import pandas as pd
import great_expectations as ge

def extract_data():
    # Extract data logic here
    return {"data": extracted_data}

def validate_data(**context):
    # Get data from previous task
    data = context['ti'].xcom_pull(task_ids='extract_data')['data']

    # Create Great Expectations DataFrame
    ge_df = ge.from_pandas(pd.DataFrame(data))

    # Validate data quality
    validation_result = ge_df.validate(
        expectation_suite_name="source_data_quality_suite"
    )

    # If validation fails, raise exception to halt the pipeline
    if not validation_result["success"]:
        failed_expectations = [exp for exp in validation_result["results"] if not exp["success"]]
        raise ValueError(f"Data quality validation failed: {failed_expectations}")

    return {"data": data, "validation_result": validation_result}

def transform_data(**context):
    # Only runs if validation passes
    data = context['ti'].xcom_pull(task_ids='validate_data')['data']
    # Transform logic here

with DAG(
    'etl_with_quality_checks',
    start_date=days_ago(1),
    schedule_interval='@daily'
) as dag:

    extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data
    )

    validate = PythonOperator(
        task_id='validate_data',
        python_callable=validate_data,
        provide_context=True
    )

    transform = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
        provide_context=True
    )

    # More pipeline tasks...

    extract >> validate >> transform

Benefits:

  • Prevents bad data from propagating downstream
  • Provides immediate feedback during processing
  • Integrates naturally with existing pipelines

Limitations:

  • May increase pipeline complexity and execution time
  • Limited to scheduled pipeline runs
  • Usually focuses on structural quality rather than business context

Sidecar Monitoring

Run quality checks in parallel with your data pipelines:

# Example Docker Compose configuration for a quality monitoring sidecar
version: '3'
services:
  data-pipeline:
    image: data-pipeline-image
    volumes:
      - ./data:/data
    environment:
      - PIPELINE_CONFIG=production
    networks:
      - data-network

  quality-monitor:
    image: quality-monitor-image
    volumes:
      - ./data:/data:ro  # Read-only access to the same data
    environment:
      - MONITOR_CONFIG=production
      - ALERT_WEBHOOK=https://hooks.slack.com/services/XXX/YYY/ZZZ
    networks:
      - data-network
    depends_on:
      - data-pipeline

networks:
  data-network:

Benefits:

  • Doesn’t impact pipeline performance
  • Can be deployed/updated independently
  • Can monitor multiple pipelines simultaneously

Limitations:

  • Potential lag in issue detection
  • Requires additional infrastructure
  • May need custom integration with pipelines

Continuous Monitoring Service

Implement a dedicated service that continuously monitors data sources:

# Example using Prefect for a continuous monitoring service
from prefect import task, Flow
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
import pandas as pd
import sqlalchemy
import great_expectations as ge

# Configure monitoring schedule
schedule = IntervalSchedule(
    start_date=datetime.utcnow(),
    interval=timedelta(minutes=15)
)

@task
def get_database_connection():
    return sqlalchemy.create_engine("postgresql://user:password@host:port/database")

@task
def get_tables_to_monitor(conn):
    query = "SELECT table_name FROM monitoring_registry WHERE is_active = true"
    return pd.read_sql(query, conn)["table_name"].tolist()

@task
def check_table_quality(conn, table_name):
    # Query the latest data
    query = f"SELECT * FROM {table_name} WHERE created_at >= NOW() - INTERVAL '15 minutes'"
    df = pd.read_sql(query, conn)

    if df.empty:
        return {"table": table_name, "status": "NO_DATA"}

    # Create Great Expectations DataFrame
    ge_df = ge.from_pandas(df)

    # Get expectations for this table
    expectations = get_expectations_for_table(table_name)

    # Validate against expectations
    results = []
    for expectation in expectations:
        exp_type = expectation["type"]
        exp_args = expectation["args"]

        if exp_type == "column_values_to_not_be_null":
            result = ge_df.expect_column_values_to_not_be_null(**exp_args)
        elif exp_type == "column_values_to_be_between":
            result = ge_df.expect_column_values_to_be_between(**exp_args)
        # Add more expectation types as needed

        results.append(result)

    # Determine if there are any failures
    failures = [r for r in results if not r.success]

    if failures:
        return {
            "table": table_name,
            "status": "FAILURE",
            "failures": failures
        }
    else:
        return {
            "table": table_name,
            "status": "SUCCESS"
        }

@task
def send_alerts(results):
    failures = [r for r in results if r["status"] == "FAILURE"]
    if failures:
        for failure in failures:
            send_alert_notification(
                table=failure["table"],
                failures=failure["failures"]
            )

with Flow("Continuous Data Quality Monitoring", schedule=schedule) as flow:
    conn = get_database_connection()
    tables = get_tables_to_monitor(conn)
    results = [check_table_quality(conn, table) for table in tables]
    send_alerts(results)

# Register the flow with the Prefect server
flow.register(project_name="data-quality")

Benefits:

  • Provides near-real-time monitoring independent of pipelines
  • Centralized configuration and management
  • Can detect issues even when pipelines aren’t running

Limitations:

  • Requires additional infrastructure and management
  • May create additional load on data sources
  • Needs careful access control to data resources

Advanced Features and Considerations

Self-Learning Quality Rules

Implement systems that learn normal patterns and adjust quality thresholds automatically:

# Example of self-learning thresholds
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest

class AdaptiveThresholdMonitor:
    def __init__(self, metric_name, history_window=30, contamination=0.05):
        self.metric_name = metric_name
        self.history_window = history_window
        self.contamination = contamination
        self.history = []
        self.model = None

    def add_measurement(self, value, timestamp):
        self.history.append({"timestamp": timestamp, "value": value})

        # Keep only the history window
        if len(self.history) > self.history_window:
            self.history = self.history[-self.history_window:]

        # Once we have enough history, train the model
        if len(self.history) >= self.history_window:
            self._train_model()

    def _train_model(self):
        # Extract values and reshape for sklearn
        values = np.array([h["value"] for h in self.history]).reshape(-1, 1)

        # Train isolation forest model
        self.model = IsolationForest(contamination=self.contamination)
        self.model.fit(values)

    def is_anomaly(self, value):
        if self.model is None:
            # Not enough history to determine
            return False

        # Predict if this is an anomaly (-1 for anomalies, 1 for normal)
        prediction = self.model.predict(np.array([value]).reshape(1, -1))
        return prediction[0] == -1

    def get_current_thresholds(self):
        if len(self.history) < self.history_window:
            # Not enough data yet, use simple statistics
            values = [h["value"] for h in self.history]
            mean = np.mean(values)
            std = np.std(values)
            return {
                "lower": mean - 3 * std,
                "upper": mean + 3 * std,
                "type": "statistical"
            }

        # For isolation forest, we can't directly get thresholds
        # Instead, sample points and find decision boundary
        min_val = min(h["value"] for h in self.history)
        max_val = max(h["value"] for h in self.history)

        # Generate test points across the range
        test_range = np.linspace(min_val - 0.1 * (max_val - min_val),
                                max_val + 0.1 * (max_val - min_val),
                                100).reshape(-1, 1)

        # Get predictions for each point
        preds = self.model.predict(test_range)

        # Find threshold crossings
        threshold_points = []
        for i in range(1, len(preds)):
            if preds[i] != preds[i-1]:
                threshold_points.append(test_range[i][0])

        if not threshold_points:
            # No clear boundary found
            return {
                "model": "isolation_forest",
                "thresholds": "non-linear"
            }

        return {
            "model": "isolation_forest",
            "thresholds": threshold_points
        }

Root Cause Analysis

Automatically identify potential causes of data quality issues:

# Example of root cause analysis
def analyze_root_cause(issue_table, issue_column, issue_description):
    # Connect to the data warehouse
    conn = get_warehouse_connection()

    # Get data lineage information
    lineage = get_column_lineage(issue_table, issue_column)

    # Check upstream tables in the lineage
    upstream_issues = []
    for upstream_table, upstream_column in lineage["upstream_columns"]:
        # Check if this column has issues
        query = f"""
        SELECT COUNT(*) as issue_count
        FROM {upstream_table}
        WHERE {get_issue_condition(upstream_column, issue_description)}
        """
        result = pd.read_sql(query, conn)

        if result.iloc[0]["issue_count"] > 0:
            # This upstream table has similar issues
            upstream_issues.append({
                "table": upstream_table,
                "column": upstream_column,
                "issue_count": result.iloc[0]["issue_count"]
            })

    # Check for recent changes to ETL code
    etl_changes = get_recent_etl_changes(issue_table)

    # Check for data volume anomalies
    volume_check_query = f"""
    SELECT
        DATE_TRUNC('day', created_at) as day,
        COUNT(*) as record_count
    FROM {issue_table}
    WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
    GROUP BY 1
    ORDER BY 1
    """
    volume_data = pd.read_sql(volume_check_query, conn)

    # Check for schema changes
    schema_changes = get_recent_schema_changes(issue_table)

    return {
        "upstream_issues": upstream_issues,
        "etl_changes": etl_changes,
        "volume_anomalies": analyze_volume_trends(volume_data),
        "schema_changes": schema_changes
    }

Impact Assessment

Determine the business impact of data quality issues:

# Example of impact assessment
def assess_quality_issue_impact(issue_details):
    impact_score = 0
    impact_areas = []

    # Check downstream dependencies
    downstream_tables = get_downstream_tables(issue_details["table"])

    # Check for critical business processes affected
    for table in downstream_tables:
        business_processes = get_business_processes_using_table(table)
        for process in business_processes:
            impact_score += process["criticality_score"]
            impact_areas.append({
                "process": process["name"],
                "criticality": process["criticality_score"],
                "owners": process["owners"]
            })

    # Check for affected dashboards/reports
    affected_dashboards = get_dashboards_using_table(issue_details["table"])
    for dashboard in affected_dashboards:
        impact_score += dashboard["usage_score"]
        impact_areas.append({
            "dashboard": dashboard["name"],
            "users": dashboard["user_count"],
            "last_viewed": dashboard["last_viewed"]
        })

    # Check for ML models using this data
    affected_models = get_ml_models_using_table(issue_details["table"])
    for model in affected_models:
        impact_score += model["importance_score"]
        impact_areas.append({
            "model": model["name"],
            "usage": model["usage_description"],
            "owner": model["owner"]
        })

    return {
        "overall_impact_score": impact_score,
        "impact_level": categorize_impact_score(impact_score),
        "impact_areas": impact_areas
    }

Auto-Remediation

For certain types of issues, implement automatic fixes:

# Example of auto-remediation for common issues
def apply_auto_remediation(issue):
    remediation_applied = False

    if issue["type"] == "missing_values" and issue["column"] in AUTO_REMEDIATE_COLUMNS:
        # Get default value for this column
        default_value = get_default_value_for_column(issue["table"], issue["column"])

        # Apply default value
        query = f"""
        UPDATE {issue["table"]}
        SET {issue["column"]} = %s
        WHERE {issue["column"]} IS NULL
        """
        execute_query(query, [default_value])
        remediation_applied = True

    elif issue["type"] == "out_of_range_values" and issue["column"] in AUTO_REMEDIATE_COLUMNS:
        # Get valid range for this column
        valid_range = get_valid_range_for_column(issue["table"], issue["column"])

        # Clamp values to valid range
        query = f"""
        UPDATE {issue["table"]}
        SET {issue["column"]} = LEAST(GREATEST({issue["column"]}, %s), %s)
        WHERE {issue["column"]} < %s OR {issue["column"]} > %s
        """
        execute_query(query, [valid_range["min"], valid_range["max"], valid_range["min"], valid_range["max"]])
        remediation_applied = True

    elif issue["type"] == "duplicate_records":
        # Keep only one record per duplicate group
        query = f"""
        DELETE FROM {issue["table"]} a
        USING {issue["table"]} b
        WHERE a.{issue["primary_key"]} > b.{issue["primary_key"]}
        AND a.{issue["duplicate_key"]} = b.{issue["duplicate_key"]}
        """
        execute_query(query)
        remediation_applied = True

    # Log the remediation action
    log_remediation_action(issue, remediation_applied)

    return remediation_applied

Implementation Challenges and Solutions

Scaling with Data Volume

As data volumes grow, quality checks can become expensive:

Solution: Sampling and Partitioning

# Example of intelligent sampling for quality checks
def smart_quality_check(table_name, column_name, check_type):
    # Determine table size
    row_count = get_table_row_count(table_name)

    if row_count > 10_000_000:
        # Very large table - use stratified sampling
        sample_query = f"""
        WITH strata AS (
            SELECT
                {column_name},
                NTILE(100) OVER (ORDER BY {column_name}) AS stratum
            FROM {table_name}
        )
        SELECT t.*
        FROM {table_name} t
        JOIN (
            SELECT DISTINCT {column_name}
            FROM strata
            TABLESAMPLE BERNOULLI (1) -- 1% sample from each stratum
        ) s
        ON t.{column_name} = s.{column_name}
        LIMIT 100000
        """
        sample_df = pd.read_sql(sample_query, conn)

    elif row_count > 1_000_000:
        # Large table - use random sampling
        sample_query = f"""
        SELECT *
        FROM {table_name}
        TABLESAMPLE BERNOULLI (1)
        LIMIT 10000
        """
        sample_df = pd.read_sql(sample_query, conn)

    else:
        # Small enough table - use all data
        sample_df = pd.read_sql(f"SELECT * FROM {table_name}", conn)

    # Now perform the quality check on the sample
    return perform_quality_check(sample_df, column_name, check_type)

Managing Alert Fatigue

Too many alerts can lead to ignored alerts:

Solution: Alert Aggregation and Prioritization

# Example of alert aggregation
class AlertManager:
    def __init__(self):
        self.active_alerts = {}
        self.alert_history = []

    def add_alert(self, alert):
        # Generate a key for this type of alert
        alert_key = f"{alert['table']}_{alert['column']}_{alert['issue_type']}"

        if alert_key in self.active_alerts:
            # Update existing alert
            existing_alert = self.active_alerts[alert_key]
            existing_alert["occurrence_count"] += 1
            existing_alert["last_detected"] = alert["detected_at"]
            existing_alert["samples"].extend(alert["samples"])
            existing_alert["samples"] = existing_alert["samples"][-5:]  # Keep last 5 samples

            # Only send notification if this is a significant update
            if self.should_resend_notification(existing_alert):
                self.send_notification(existing_alert)
        else:
            # New alert
            alert["occurrence_count"] = 1
            alert["first_detected"] = alert["detected_at"]
            alert["status"] = "active"
            self.active_alerts[alert_key] = alert

            # Calculate priority
            alert["priority"] = self.calculate_priority(alert)

            # Send initial notification
            self.send_notification(alert)

    def resolve_alert(self, alert_key):
        if alert_key in self.active_alerts:
            alert = self.active_alerts[alert_key]
            alert["status"] = "resolved"
            alert["resolved_at"] = datetime.now()

            # Move to history
            self.alert_history.append(alert)
            del self.active_alerts[alert_key]

            # Send resolution notification
            self.send_resolution_notification(alert)

    def calculate_priority(self, alert):
        # Calculate priority based on multiple factors
        priority = 0

        # Factor 1: Impact of the table
        table_importance = get_table_importance(alert["table"])
        priority += table_importance * 5

        # Factor 2: Type of issue
        if alert["issue_type"] in ["data_loss", "corruption"]:
            priority += 30
        elif alert["issue_type"] in ["schema_change", "reference_violation"]:
            priority += 20
        elif alert["issue_type"] in ["missing_values", "duplicates"]:
            priority += 10

        # Factor 3: Percentage of affected rows
        affected_pct = alert.get("affected_percentage", 0)
        if affected_pct > 50:
            priority += 20
        elif affected_pct > 10:
            priority += 10
        elif affected_pct > 1:
            priority += 5

        # Categorize priority
        if priority >= 40:
            return "high"
        elif priority >= 20:
            return "medium"
        else:
            return "low"

    def should_resend_notification(self, alert):
        # Logic to determine if we should send another notification
        # Based on time elapsed, severity, and number of occurrences
        if alert["priority"] == "high":
            # Resend high priority alerts every hour
            time_since_last = datetime.now() - alert.get("last_notification", alert["first_detected"])
            return time_since_last.total_seconds() > 3600

        elif alert["priority"] == "medium":
            # Resend medium priority alerts daily
            time_since_last = datetime.now() - alert.get("last_notification", alert["first_detected"])
            return time_since_last.total_seconds() > 86400

        else:
            # Low priority - only resend if occurrence count crosses thresholds
            return alert["occurrence_count"] in [10, 100, 1000]

    def send_notification(self, alert):
        # Logic to send notification based on priority
        alert["last_notification"] = datetime.now()

        if alert["priority"] == "high":
            # Send to urgent channels
            send_slack_notification(URGENT_CHANNEL, format_alert(alert))
            send_email_notification(get_table_owners(alert["table"]), format_alert_email(alert))

        elif alert["priority"] == "medium":
            # Send to general channels
            send_slack_notification(GENERAL_CHANNEL, format_alert(alert))

        else:
            # Low priority - just log in dashboard
            update_dashboard_alerts(alert)

Balancing Precision and Recall

Setting thresholds too tight leads to false positives, too loose leads to missed issues:

Solution: Adaptive Thresholds

# Example of adaptive thresholds based on historical patterns
class AdaptiveThreshold:
    def __init__(self, column_name, initial_threshold=0.05):
        self.column_name = column_name
        self.base_threshold = initial_threshold
        self.history = []
        self.seasonal_patterns = {}
        self.trend = None

    def add_measurement(self, value, timestamp):
        self.history.append({"timestamp": timestamp, "value": value})

        # Keep last 90 days of history
        cutoff = timestamp - timedelta(days=90)
        self.history = [h for h in self.history if h["timestamp"] >= cutoff]

        # Update patterns when we have enough data
        if len(self.history) >= 30:
            self._update_patterns()

    def _update_patterns(self):
        df = pd.DataFrame(self.history)

        # Convert timestamp to datetime
        df["timestamp"] = pd.to_datetime(df["timestamp"])

        # Extract time components
        df["hour"] = df["timestamp"].dt.hour
        df["day_of_week"] = df["timestamp"].dt.dayofweek
        df["day_of_month"] = df["timestamp"].dt.day
        df["month"] = df["timestamp"].dt.month

        # Calculate hourly patterns
        hourly = df.groupby("hour")["value"].agg(["mean", "std"]).reset_index()
        self.seasonal_patterns["hourly"] = hourly.set_index("hour").to_dict('index')

        # Calculate day of week patterns
        dow = df.groupby("day_of_week")["value"].agg(["mean", "std"]).reset_index()
        self.seasonal_patterns["day_of_week"] = dow.set_index("day_of_week").to_dict('index')

        # Calculate monthly patterns
        monthly = df.groupby("month")["value"].agg(["mean", "std"]).reset_index()
        self.seasonal_patterns["monthly"] = monthly.set_index("month").to_dict('index')

        # Calculate overall trend
        df = df.sort_values("timestamp")
        if len(df) >= 60:
            # Use last 60 days for trend calculation
            recent_df = df.tail(60)
            x = np.array(range(len(recent_df)))
            y = recent_df["value"].values

            # Linear regression for trend
            z = np.polyfit(x, y, 1)
            self.trend = {
                "slope": z[0],
                "intercept": z[1]
            }

    def get_threshold(self, timestamp):
        # Start with base threshold
        threshold = self.base_threshold

        # If we have enough history, adjust based on patterns
        if self.seasonal_patterns and len(self.history) >= 30:
            # Extract time components
            dt = pd.to_datetime(timestamp)
            hour = dt.hour
            day_of_week = dt.dayofweek
            month = dt.month

            # Adjust based on hourly pattern
            if hour in self.seasonal_patterns["hourly"]:
                hourly_std = self.seasonal_patterns["hourly"][hour]["std"]
                threshold = max(threshold, hourly_std * 2)

            # Adjust based on day of week
            if day_of_week in self.seasonal_patterns["day_of_week"]:
                dow_std = self.seasonal_patterns["day_of_week"][day_of_week]["std"]
                threshold = max(threshold, dow_std * 2)

            # Adjust based on month
            if month in self.seasonal_patterns["monthly"]:
                monthly_std = self.seasonal_patterns["monthly"][month]["std"]
                threshold = max(threshold, monthly_std * 2)

        return threshold

    def is_anomaly(self, value, timestamp):
        threshold = self.get_threshold(timestamp)

        # If we have seasonal patterns, compare to the expected value
        if self.seasonal_patterns:
            dt = pd.to_datetime(timestamp)
            hour = dt.hour
            day_of_week = dt.dayofweek

            # Get expected value based on hour and day of week
            expected_value_hour = self.seasonal_patterns["hourly"].get(hour, {}).get("mean")
            expected_value_dow = self.seasonal_patterns["day_of_week"].get(day_of_week, {}).get("mean")

            if expected_value_hour is not None and expected_value_dow is not None:
                # Use average of hour and day of week expectations
                expected_value = (expected_value_hour + expected_value_dow) / 2

                # Adjust for trend if available
                if self.trend:
                    # Calculate days since first data point
                    days_diff = (timestamp - self.history[0]["timestamp"]).days
                    trend_adjustment = self.trend["slope"] * days_diff
                    expected_value += trend_adjustment

                # Calculate deviation
                deviation = abs((value - expected_value) / expected_value)
                return deviation > threshold

        # If we don't have patterns yet, use simple statistical approach
        if len(self.history) >= 10:
            values = [h["value"] for h in self.history]
            mean_value = sum(values) / len(values)
            std_dev = (sum((x - mean_value) ** 2 for x in values) / len(values)) ** 0.5

            # Check if value is more than 3 standard deviations from mean
            return abs(value - mean_value) > 3 * std_dev

        # Too little history, use very permissive threshold
        return False

Decision Rules

Use this checklist for data quality monitoring decisions:

  1. If bad data causes downstream failures, add validation checks before the pipeline runs
  2. If you only sample data for quality checks, the issues you miss are the ones that matter
  3. If alert fatigue is a problem, aggregate similar alerts rather than sending one per incident
  4. If thresholds are causing false positives, use historical data to set adaptive thresholds
  5. If you don’t know what good data looks like, profile it first before defining rules

Data quality monitoring only adds value if you act on the alerts.

Ready to Implement These AI Data Engineering Solutions?

Get a comprehensive AI Readiness Assessment to determine the best approach for your organization's data infrastructure and AI implementation needs.

Similar Articles

AI-Driven Data Quality Enhancement
AI-Driven Data Quality Enhancement
12 Oct, 2024 | 05 Mins read

Data quality problems cost organizations between 15% and 25% of revenue. The global cost of bad data runs into trillions annually. Traditional data quality approaches—manual review, rule-based validat

Automated Data Quality Gates with Great Expectations & Soda
Automated Data Quality Gates with Great Expectations & Soda
28 Apr, 2025 | 07 Mins read

Organizations often treat data quality as secondary—something to address after building pipelines and training models. This perspective misunderstands modern data systems. In a world where ML models m

Designing for Data Quality: How to Build Reliable AI Systems
Designing for Data Quality: How to Build Reliable AI Systems
26 Feb, 2025 | 02 Mins read

Most ML projects fail not because of flawed algorithms but because of poor data quality. Data scientists typically spend 80% of their time on data preparation, and even small data quality issues drama