DataOps Automation with Dagster, Prefect 2 & Airflow 2

DataOps Automation with Dagster, Prefect 2 & Airflow 2

Simor Consulting | 08 Aug, 2025 | 04 Mins read

A fintech company’s data platform ground to a halt when a schema change cascaded through dozens of pipelines. Their homegrown orchestration system—a maze of cron jobs and bash scripts—offered no visibility. Data scientists could not access fresh data. Executives flew blind without dashboards. Risk models operated on stale information.

This is what happens when orchestration is an afterthought.

Why Traditional Orchestration Fails

Most organizations begin with:

  • Cron jobs scheduling ETL scripts
  • Bash scripts chaining commands
  • Success meaning absence of error emails
  • Failure meaning frantic debugging
  • Dependencies as tribal knowledge
  • Monitoring meaning checking if files appeared

This works with ten pipelines processing gigabytes. It collapses with hundreds processing terabytes.

Dependency complexity: Modern pipelines form complex graphs where upstream failures cascade unpredictably.

Heterogeneous workloads: Different tasks require different execution environments. Spark jobs need clusters. Python scripts need specific libraries. SQL transformations need database connections.

Dynamic workflows: Business logic is not static. The number of files varies. Processing steps depend on data content. Workflows branch based on conditions.

Operational visibility: When pipelines fail at 3 AM, engineers need to understand what happened, why, and what to do.

Modern DataOps Principles

Infrastructure as code: Workflows defined programmatically, version controlled, tested, deployed like applications.

Observable by design: Every task, every run, every piece of data tracked and queryable.

Failure as normal: Systems designed assuming things fail. Automatic retries, circuit breakers, graceful degradation.

Developer experience first: Tools that make the right thing the easy thing.

Dagster: Asset-Centric Orchestration

Dagster reimagined orchestration around assets rather than tasks:

This diagram requires JavaScript.

Enable JavaScript in your browser to use this feature.

Asset focus provides:

  • Explicit lineage: trace any data back to its sources
  • Clear dependencies: assets depend on other assets
  • Natural testing: test individual assets or subgraphs
  • Built-in observability: know what data exists, when updated, its quality

Dagster in Practice

from dagster import asset, AssetIn, Output, AssetKey
from dagster_pandas import PandasColumn, create_dagster_pandas_dataframe_type
import pandas as pd

TransactionSchema = create_dagster_pandas_dataframe_type(
    name="TransactionSchema",
    columns=[
        PandasColumn.string_column("transaction_id", non_nullable=True),
        PandasColumn.datetime_column("timestamp", non_nullable=True),
        PandasColumn.float_column("amount", min_value=0),
        PandasColumn.string_column("user_id", non_nullable=True),
        PandasColumn.string_column("merchant_id"),
        PandasColumn.boolean_column("is_fraudulent")
    ]
)

@asset(
    description="Raw transactions from payment processor",
    required_resource_keys={"warehouse"},
    metadata={
        "owner": "payments_team",
        "sla_hours": 1,
        "pii": True
    }
)
def raw_transactions(context) -> Output[pd.DataFrame]:
    query = """
    SELECT * FROM payment_processor.transactions
    WHERE date = CURRENT_DATE
    """
    df = context.resources.warehouse.query(query)
    context.log.info(f"Loaded {len(df)} transactions")

    return Output(
        df,
        metadata={
            "row_count": len(df),
            "amount_sum": float(df['amount'].sum()),
            "preview": MetadataValue.md(df.head().to_markdown())
        }
    )

@asset(
    ins={"raw_transactions": AssetIn(key=AssetKey("raw_transactions"))},
    description="Cleaned and validated transactions",
    dagster_type=TransactionSchema
)
def cleaned_transactions(context, raw_transactions: pd.DataFrame) -> pd.DataFrame:
    df = raw_transactions.drop_duplicates(subset=['transaction_id'])
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['amount'] = pd.to_numeric(df['amount'], errors='coerce')

    initial_count = len(df)
    df = df[df['amount'] > 0]
    removed_count = initial_count - len(df)

    if removed_count > 0:
        context.log.warning(f"Removed {removed_count} invalid transactions")

    df['hour_of_day'] = df['timestamp'].dt.hour
    df['day_of_week'] = df['timestamp'].dt.dayofweek

    return df

Prefect 2: Pythonic Workflows

Prefect 2 made Python workflows first-class citizens:

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd

@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def extract_transactions(date: str, source: str) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info(f"Extracting {source} transactions for {date}")
    df = read_from_source(source, date)
    if df.empty:
        logger.warning(f"No data found for {source} on {date}")
    return df

@flow(name="Dynamic ETL Pipeline")
def etl_pipeline(
    date: str,
    sources: list[str],
    validation_rules: dict,
    transform_config: dict
) -> dict:
    logger = get_run_logger()
    logger.info(f"Starting ETL for {date} with {len(sources)} sources")

    raw_data = []
    for source in sources:
        df = extract_transactions.submit(date, source)
        raw_data.append(df)

    combined = pd.concat([df.result() for df in raw_data])
    validated = validate_data(combined, validation_rules)
    transformed = transform_transactions(validated, transform_config)

    if len(transformed) > 1000000:
        logger.info("Large dataset - triggering distributed processing")
        results = distributed_processing_flow(transformed)
    else:
        results = simple_processing(transformed)

    return {
        "records_processed": len(transformed),
        "sources": sources,
        "results": results
    }

Prefect Deployments

from prefect.deployments import Deployment
from prefect.infrastructure import KubernetesJob
from prefect.storage import S3

k8s_infrastructure = KubernetesJob(
    namespace="data-pipelines",
    image="company/data-platform:latest",
    image_pull_policy="Always",
    service_account_name="prefect-agent",
    env={
        "ENVIRONMENT": "production",
        "AWS_REGION": "us-east-1"
    }
)

fraud_detection_deployment = Deployment.build_from_flow(
    flow=etl_pipeline,
    name="fraud-detection-etl",
    parameters={
        "sources": ["payments", "chargebacks", "merchant_data"],
        "validation_rules": {
            "amount_positive": lambda df: df['amount'] > 0,
            "user_exists": lambda df: df['user_id'].notna()
        },
        "transform_config": {
            "normalize_amounts": True,
            "encode_categories": True
        }
    },
    infrastructure=k8s_infrastructure,
    storage=S3(bucket="prefect-deployments"),
    work_queue_name="ml-pipelines",
    tags=["ml", "fraud", "critical"],
    schedule=IntervalSchedule(interval=timedelta(hours=1))
)

Airflow 2: Enterprise Orchestration

Airflow 2 modernized DAG concepts while maintaining backward compatibility:

from airflow import DAG
from airflow.decorators import task, dag
from airflow.providers.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from datetime import datetime, timedelta

@dag(
    schedule_interval='@hourly',
    start_date=datetime(2023, 1, 1),
    catchup=False,
    max_active_runs=2,
    tags=['ml', 'production', 'fraud']
)
def fraud_detection_pipeline():

    @task(multiple_outputs=True, pool='data_extraction', queue='critical')
    def extract_features() -> dict:
        context = get_current_context()
        execution_date = context['execution_date']
        transactions = extract_transactions(execution_date)
        user_profiles = extract_user_profiles(execution_date)
        return {
            'transactions': transactions.to_json(),
            'user_profiles': user_profiles.to_json(),
            'metadata': {
                'execution_date': str(execution_date),
                'record_count': len(transactions)
            }
        }

    @task.virtualenv(requirements=['scikit-learn==1.0.2'], system_site_packages=False)
    def train_model(features: dict) -> str:
        import pandas as pd
        from sklearn.ensemble import RandomForestClassifier
        transactions = pd.read_json(features['transactions'])
        model = RandomForestClassifier(n_estimators=100)
        model.fit(X, y)
        return save_model(model, features['metadata'])

    gpu_training = KubernetesPodOperator(
        task_id='train_deep_model',
        name='deep-fraud-detection',
        namespace='ml-pipelines',
        image='company/ml-training:latest',
        gpu_limit='1',
        arguments=['python', 'train_deep_model.py', '--execution-date', '{{ ds }}'],
        get_logs=True,
        is_delete_operator_pod=True
    )

    features = extract_features()
    sklearn_model = train_model(features)
    [sklearn_model, gpu_training] >> check_model_performance(sklearn_model)

Airflow Provider Ecosystem

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.providers.dbt.operators.dbt import DbtRunOperator

validate_source_data = GreatExpectationsOperator(
    task_id='validate_source_data',
    expectation_suite_name='transactions.critical',
    fail_task_on_validation_failure=True
)

transform_transactions = SparkSubmitOperator(
    task_id='transform_transactions',
    application='/spark-jobs/transform_transactions.py',
    conf={'spark.sql.adaptive.enabled': 'true'},
    executor_memory='4g',
    num_executors=10
)

Decision Rules

Choose Dagster when:

  • Data lineage and observability are critical
  • You think in assets, not tasks
  • Testing and local development matter
  • You want opinionated best practices

Choose Prefect 2 when:

  • Python is your primary language
  • Workflows are highly dynamic
  • You need minimal infrastructure
  • Developer experience is paramount

Choose Airflow 2 when:

  • You need extensive integrations
  • Complex scheduling requirements exist
  • Enterprise features matter
  • Team has Airflow experience

The underlying principle: orchestration is not about scheduling jobs. It is about building resilient systems that handle complexity, scale, and constant change.

Modern orchestrators provide infrastructure-as-code, observable-by-design, and failure-resilient systems. The choice between Dagster, Prefect, and Airflow matters less than adopting one with modern DataOps principles.

Ready to Implement These AI Data Engineering Solutions?

Get a comprehensive AI Readiness Assessment to determine the best approach for your organization's data infrastructure and AI implementation needs.

Similar Articles

MLOps vs DataOps: Understanding the Differences and Overlaps
MLOps vs DataOps: Understanding the Differences and Overlaps
08 Feb, 2024 | 03 Mins read

DataOps and MLOps both aim to improve reliability and efficiency in data-centric workflows, but they address different parts of the data science lifecycle. Understanding their boundaries helps organiz

DataOps: Creating Culture and Processes for Reliable Data
DataOps: Creating Culture and Processes for Reliable Data
01 Jun, 2024 | 03 Mins read

# DataOps: Creating Culture and Processes for Reliable Data Data quality issues cascade downstream. DataOps applies DevOps principles to data workflows: automation, collaboration, and continuous impr

Implementing Data Observability
Implementing Data Observability
01 Sep, 2024 | 15 Mins read

# Implementing Data Observability: Beyond Monitoring Traditional data monitoring checks predefined metrics. Data observability provides comprehensive visibility into health, quality, and behavior acr