A fintech company’s data platform ground to a halt when a schema change cascaded through dozens of pipelines. Their homegrown orchestration system—a maze of cron jobs and bash scripts—offered no visibility. Data scientists could not access fresh data. Executives flew blind without dashboards. Risk models operated on stale information.
This is what happens when orchestration is an afterthought.
Why Traditional Orchestration Fails
Most organizations begin with:
- Cron jobs scheduling ETL scripts
- Bash scripts chaining commands
- Success meaning absence of error emails
- Failure meaning frantic debugging
- Dependencies as tribal knowledge
- Monitoring meaning checking if files appeared
This works with ten pipelines processing gigabytes. It collapses with hundreds processing terabytes.
Dependency complexity: Modern pipelines form complex graphs where upstream failures cascade unpredictably.
Heterogeneous workloads: Different tasks require different execution environments. Spark jobs need clusters. Python scripts need specific libraries. SQL transformations need database connections.
Dynamic workflows: Business logic is not static. The number of files varies. Processing steps depend on data content. Workflows branch based on conditions.
Operational visibility: When pipelines fail at 3 AM, engineers need to understand what happened, why, and what to do.
Modern DataOps Principles
Infrastructure as code: Workflows defined programmatically, version controlled, tested, deployed like applications.
Observable by design: Every task, every run, every piece of data tracked and queryable.
Failure as normal: Systems designed assuming things fail. Automatic retries, circuit breakers, graceful degradation.
Developer experience first: Tools that make the right thing the easy thing.
Dagster: Asset-Centric Orchestration
Dagster reimagined orchestration around assets rather than tasks:
This diagram requires JavaScript.
Enable JavaScript in your browser to use this feature.
Asset focus provides:
- Explicit lineage: trace any data back to its sources
- Clear dependencies: assets depend on other assets
- Natural testing: test individual assets or subgraphs
- Built-in observability: know what data exists, when updated, its quality
Dagster in Practice
from dagster import asset, AssetIn, Output, AssetKey
from dagster_pandas import PandasColumn, create_dagster_pandas_dataframe_type
import pandas as pd
TransactionSchema = create_dagster_pandas_dataframe_type(
name="TransactionSchema",
columns=[
PandasColumn.string_column("transaction_id", non_nullable=True),
PandasColumn.datetime_column("timestamp", non_nullable=True),
PandasColumn.float_column("amount", min_value=0),
PandasColumn.string_column("user_id", non_nullable=True),
PandasColumn.string_column("merchant_id"),
PandasColumn.boolean_column("is_fraudulent")
]
)
@asset(
description="Raw transactions from payment processor",
required_resource_keys={"warehouse"},
metadata={
"owner": "payments_team",
"sla_hours": 1,
"pii": True
}
)
def raw_transactions(context) -> Output[pd.DataFrame]:
query = """
SELECT * FROM payment_processor.transactions
WHERE date = CURRENT_DATE
"""
df = context.resources.warehouse.query(query)
context.log.info(f"Loaded {len(df)} transactions")
return Output(
df,
metadata={
"row_count": len(df),
"amount_sum": float(df['amount'].sum()),
"preview": MetadataValue.md(df.head().to_markdown())
}
)
@asset(
ins={"raw_transactions": AssetIn(key=AssetKey("raw_transactions"))},
description="Cleaned and validated transactions",
dagster_type=TransactionSchema
)
def cleaned_transactions(context, raw_transactions: pd.DataFrame) -> pd.DataFrame:
df = raw_transactions.drop_duplicates(subset=['transaction_id'])
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
initial_count = len(df)
df = df[df['amount'] > 0]
removed_count = initial_count - len(df)
if removed_count > 0:
context.log.warning(f"Removed {removed_count} invalid transactions")
df['hour_of_day'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
return df
Prefect 2: Pythonic Workflows
Prefect 2 made Python workflows first-class citizens:
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def extract_transactions(date: str, source: str) -> pd.DataFrame:
logger = get_run_logger()
logger.info(f"Extracting {source} transactions for {date}")
df = read_from_source(source, date)
if df.empty:
logger.warning(f"No data found for {source} on {date}")
return df
@flow(name="Dynamic ETL Pipeline")
def etl_pipeline(
date: str,
sources: list[str],
validation_rules: dict,
transform_config: dict
) -> dict:
logger = get_run_logger()
logger.info(f"Starting ETL for {date} with {len(sources)} sources")
raw_data = []
for source in sources:
df = extract_transactions.submit(date, source)
raw_data.append(df)
combined = pd.concat([df.result() for df in raw_data])
validated = validate_data(combined, validation_rules)
transformed = transform_transactions(validated, transform_config)
if len(transformed) > 1000000:
logger.info("Large dataset - triggering distributed processing")
results = distributed_processing_flow(transformed)
else:
results = simple_processing(transformed)
return {
"records_processed": len(transformed),
"sources": sources,
"results": results
}
Prefect Deployments
from prefect.deployments import Deployment
from prefect.infrastructure import KubernetesJob
from prefect.storage import S3
k8s_infrastructure = KubernetesJob(
namespace="data-pipelines",
image="company/data-platform:latest",
image_pull_policy="Always",
service_account_name="prefect-agent",
env={
"ENVIRONMENT": "production",
"AWS_REGION": "us-east-1"
}
)
fraud_detection_deployment = Deployment.build_from_flow(
flow=etl_pipeline,
name="fraud-detection-etl",
parameters={
"sources": ["payments", "chargebacks", "merchant_data"],
"validation_rules": {
"amount_positive": lambda df: df['amount'] > 0,
"user_exists": lambda df: df['user_id'].notna()
},
"transform_config": {
"normalize_amounts": True,
"encode_categories": True
}
},
infrastructure=k8s_infrastructure,
storage=S3(bucket="prefect-deployments"),
work_queue_name="ml-pipelines",
tags=["ml", "fraud", "critical"],
schedule=IntervalSchedule(interval=timedelta(hours=1))
)
Airflow 2: Enterprise Orchestration
Airflow 2 modernized DAG concepts while maintaining backward compatibility:
from airflow import DAG
from airflow.decorators import task, dag
from airflow.providers.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from datetime import datetime, timedelta
@dag(
schedule_interval='@hourly',
start_date=datetime(2023, 1, 1),
catchup=False,
max_active_runs=2,
tags=['ml', 'production', 'fraud']
)
def fraud_detection_pipeline():
@task(multiple_outputs=True, pool='data_extraction', queue='critical')
def extract_features() -> dict:
context = get_current_context()
execution_date = context['execution_date']
transactions = extract_transactions(execution_date)
user_profiles = extract_user_profiles(execution_date)
return {
'transactions': transactions.to_json(),
'user_profiles': user_profiles.to_json(),
'metadata': {
'execution_date': str(execution_date),
'record_count': len(transactions)
}
}
@task.virtualenv(requirements=['scikit-learn==1.0.2'], system_site_packages=False)
def train_model(features: dict) -> str:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
transactions = pd.read_json(features['transactions'])
model = RandomForestClassifier(n_estimators=100)
model.fit(X, y)
return save_model(model, features['metadata'])
gpu_training = KubernetesPodOperator(
task_id='train_deep_model',
name='deep-fraud-detection',
namespace='ml-pipelines',
image='company/ml-training:latest',
gpu_limit='1',
arguments=['python', 'train_deep_model.py', '--execution-date', '{{ ds }}'],
get_logs=True,
is_delete_operator_pod=True
)
features = extract_features()
sklearn_model = train_model(features)
[sklearn_model, gpu_training] >> check_model_performance(sklearn_model)
Airflow Provider Ecosystem
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.providers.dbt.operators.dbt import DbtRunOperator
validate_source_data = GreatExpectationsOperator(
task_id='validate_source_data',
expectation_suite_name='transactions.critical',
fail_task_on_validation_failure=True
)
transform_transactions = SparkSubmitOperator(
task_id='transform_transactions',
application='/spark-jobs/transform_transactions.py',
conf={'spark.sql.adaptive.enabled': 'true'},
executor_memory='4g',
num_executors=10
)
Decision Rules
Choose Dagster when:
- Data lineage and observability are critical
- You think in assets, not tasks
- Testing and local development matter
- You want opinionated best practices
Choose Prefect 2 when:
- Python is your primary language
- Workflows are highly dynamic
- You need minimal infrastructure
- Developer experience is paramount
Choose Airflow 2 when:
- You need extensive integrations
- Complex scheduling requirements exist
- Enterprise features matter
- Team has Airflow experience
The underlying principle: orchestration is not about scheduling jobs. It is about building resilient systems that handle complexity, scale, and constant change.
Modern orchestrators provide infrastructure-as-code, observable-by-design, and failure-resilient systems. The choice between Dagster, Prefect, and Airflow matters less than adopting one with modern DataOps principles.