Incremental ML: Continuous Learning Systems

Incremental ML: Continuous Learning Systems

Simor Consulting | 12 Jul, 2024 | 11 Mins read

Traditional ML trains on historical data, deploys, and waits until performance degrades. This fails in dynamic environments where data patterns evolve. Incremental ML continuously updates models as new data arrives.

This article covers designing and implementing continuous learning systems.

Why Static Models Fall Short

Before diving into incremental learning approaches, let’s understand the limitations of traditional static models:

  1. Concept Drift: Real-world data distributions change over time, causing model performance to degrade
  2. Data Volume Challenges: Retraining on all historical data becomes computationally expensive as data grows
  3. Latency in Adaptation: Infrequent retraining means systems respond slowly to emerging patterns
  4. Missed Learning Opportunities: Valuable feedback data is often unused until the next retraining cycle

Consider a demand forecasting model for an e-commerce platform. Consumer behavior shifts due to seasonality, trends, or unexpected events (like a pandemic). A static model can’t adapt to these changes until a manual retraining cycle, potentially leading to weeks or months of suboptimal predictions.

Incremental Learning Approaches

Incremental learning systems update models continuously as new data arrives. Here are the key approaches:

1. Online Learning

In online learning, models update with each new training example:

# Example using River (formerly Creme), a Python library for online ML
from river import linear_model
from river import metrics
from river import preprocessing

# Initialize model pipeline
scaler = preprocessing.StandardScaler()
model = linear_model.LinearRegression()
pipeline = scaler | model

# Metrics to track
metric = metrics.MAE()

# Online learning loop
for x, y in stream_of_data():
    # Make a prediction
    y_pred = pipeline.predict_one(x)

    # Update performance metric
    metric.update(y, y_pred)

    # Learn from the example
    pipeline.learn_one(x, y)

    # Log current performance
    print(f"Current MAE: {metric.get()}")

Advantages:

  • Immediate adaptation to new patterns
  • Minimal memory footprint
  • Continuous improvement

Challenges:

  • Susceptibility to noisy data or outliers
  • Potential for catastrophic forgetting
  • Limited to algorithms that support online updates

2. Mini-batch Learning

Mini-batch learning updates the model after accumulating a small batch of examples:

# Example using PyTorch for mini-batch incremental learning
import torch
from torch.utils.data import DataLoader, TensorDataset

def mini_batch_training_loop(model, optimizer, new_data_queue, batch_size=32):
    while True:
        # Collect data until we have enough for a mini-batch
        batch_x, batch_y = [], []

        for _ in range(batch_size):
            if new_data_queue.empty():
                # Wait for more data if queue is empty
                time.sleep(1)
                continue

            x, y = new_data_queue.get()
            batch_x.append(x)
            batch_y.append(y)

        # Convert to tensors
        x_tensor = torch.tensor(batch_x, dtype=torch.float32)
        y_tensor = torch.tensor(batch_y, dtype=torch.float32)

        # Create dataset and dataloader
        dataset = TensorDataset(x_tensor, y_tensor)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

        # Update model with mini-batch
        for x_batch, y_batch in dataloader:
            # Forward pass
            y_pred = model(x_batch)
            loss = torch.nn.functional.mse_loss(y_pred, y_batch)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Log performance metrics
        log_performance_metrics(model, x_tensor, y_tensor)

Advantages:

  • More stable than pure online learning
  • Allows use of batch-oriented algorithms
  • Better utilization of vectorized operations

Challenges:

  • Slightly delayed adaptation compared to online learning
  • Requires careful batch size selection
  • Resource consumption increases with batch size

3. Incremental Batch Learning

This approach combines periodic batch training with the existing model as a starting point:

# Example using Scikit-learn's partial_fit
from sklearn.linear_model import SGDClassifier
import numpy as np
from datetime import datetime, timedelta

# Initialize model
model = SGDClassifier(loss="log", random_state=42)

# Initial training
X_initial, y_initial = get_initial_training_data()
model.partial_fit(X_initial, y_initial, classes=np.unique(y_initial))

# Incremental batch learning loop
last_update = datetime.now()
batch_data_x, batch_data_y = [], []

while True:
    # Collect new data
    new_x, new_y = get_new_data()
    batch_data_x.append(new_x)
    batch_data_y.append(new_y)

    # Check if it's time for batch update (every 1 hour)
    if datetime.now() - last_update > timedelta(hours=1) and len(batch_data_x) > 0:
        # Convert to numpy arrays
        X_batch = np.array(batch_data_x)
        y_batch = np.array(batch_data_y)

        # Update model with new batch
        model.partial_fit(X_batch, y_batch)

        # Clear batch data
        batch_data_x, batch_data_y = [], []
        last_update = datetime.now()

        # Evaluate and log performance
        performance = evaluate_model(model)
        log_performance(performance)

Advantages:

  • Balances adaptation speed with stability
  • Works with a wider range of algorithms
  • Efficient use of computational resources

Challenges:

  • Requires algorithms that support warm-starting
  • More complex to implement than pure online learning
  • Batch timing selection impacts performance

Implementing Continuous Learning Systems

Now let’s explore how to build a robust continuous learning system in production.

System Architecture

A well-designed incremental learning system typically includes these components:

┌────────────────┐     ┌───────────────┐     ┌─────────────────┐
│  Data Sources  │────▶│ Feature Store │────▶│ Training Pipeline│
└────────────────┘     └───────────────┘     └─────────────────┘
                                                     │
┌────────────────┐     ┌───────────────┐             ▼
│   Monitoring   │◀────│  Model Store  │◀────┌─────────────────┐
└────────────────┘     └───────────────┘     │ Model Evaluation │
        │                                    └─────────────────┘
        ▼                                            ▲
┌────────────────┐     ┌───────────────┐             │
│  Drift Detector│────▶│ Model Registry│─────────────┘
└────────────────┘     └───────────────┘

Let’s implement key components of this architecture:

1. Feature Store for Incremental Learning

A feature store helps maintain consistency between training and inference:

# Example using Feast for feature management in incremental learning
from feast import FeatureStore
from feast.data_source import FileSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.feature import Feature
from feast.value_type import ValueType
from datetime import datetime, timedelta

# Define entities and features
customer = Entity(name="customer_id", value_type=ValueType.INT64)

customer_features = FileSource(
    path="s3://bucket/customer_features.parquet",
    event_timestamp_column="event_timestamp",
)

customer_feature_view = FeatureView(
    name="customer_features",
    entities=[customer],
    ttl=timedelta(days=90),  # Features are valid for 90 days
    features=[
        Feature(name="total_purchases", dtype=ValueType.FLOAT),
        Feature(name="avg_order_value", dtype=ValueType.FLOAT),
        Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
    ],
    online=True,
    input=customer_features,
)

# Initialize a feature store
store = FeatureStore(repo_path="feature_repo")

# Get training data for incremental learning
def get_incremental_training_data(start_time, end_time):
    # Get historical feature values and labels
    training_df = store.get_historical_features(
        entity_df=get_training_entities(start_time, end_time),
        features=[
            "customer_features:total_purchases",
            "customer_features:avg_order_value",
            "customer_features:days_since_last_purchase",
        ],
    ).to_df()

    return training_df

# Function to continuously update feature store
def update_feature_store(new_data_stream):
    for batch in new_data_stream:
        # Process and ingest new feature data
        store.apply([customer_feature_view])

        # Materialize new features to online store
        store.materialize_incremental(end_date=datetime.now())

2. Drift Detection and Monitoring

Detecting when data or predictions drift is crucial for incremental learning:

# Example using Alibi-Detect for drift detection
from alibi_detect.cd import KSDrift
import numpy as np
from datetime import datetime
import time

# Initialize drift detector with reference data
reference_data = get_reference_data()
drift_detector = KSDrift(
    reference_data,
    p_val=0.05,  # Statistical significance threshold
    alternative='two-sided'
)

# Function to monitor drift in a prediction stream
def monitor_prediction_drift(prediction_stream, alert_webhook=None):
    batch = []
    last_check = datetime.now()

    for prediction in prediction_stream:
        # Collect predictions in a batch
        batch.append(prediction)

        # Check for drift every hour or when batch size reaches 1000
        if (datetime.now() - last_check > timedelta(hours=1) or len(batch) >= 1000) and len(batch) > 0:
            # Convert batch to numpy array
            prediction_batch = np.array(batch)

            # Run drift detection
            drift_result = drift_detector.predict(prediction_batch)

            # Log drift metrics
            log_drift_metrics(drift_result)

            # Send alert if drift detected
            if drift_result['data']['is_drift'] == 1 and alert_webhook:
                send_drift_alert(
                    alert_webhook,
                    f"Prediction drift detected with p-value: {drift_result['data']['p_val']}"
                )

                # Trigger model update if drift is significant
                if drift_result['data']['p_val'] < 0.01:
                    trigger_model_update()

            # Reset batch and update last check time
            batch = []
            last_check = datetime.now()

3. Versioned Model Registry

A model registry keeps track of model versions in continuous learning:

# Example using MLflow for model versioning in incremental learning
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
from datetime import datetime

# Configure MLflow
mlflow.set_tracking_uri("http://mlflow-server:5000")
client = MlflowClient()

# Function to register and version a model after incremental training
def register_incremental_model(model, metrics, features_used, version_description):
    # Get current time for versioning
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

    # Start MLflow run
    with mlflow.start_run(run_name=f"incremental_update_{timestamp}"):
        # Log model
        mlflow.sklearn.log_model(model, "model")

        # Log metrics
        for metric_name, metric_value in metrics.items():
            mlflow.log_metric(metric_name, metric_value)

        # Log feature information
        mlflow.log_param("features_used", features_used)
        mlflow.log_param("update_time", timestamp)

        run_id = mlflow.active_run().info.run_id

    # Register model in registry or create new one if it doesn't exist
    try:
        model_details = client.get_latest_versions("recommendation_model", stages=["Production"])[0]
        new_version = int(model_details.version) + 1
    except:
        # Model doesn't exist yet, register as first version
        result = mlflow.register_model(f"runs:/{run_id}/model", "recommendation_model")
        new_version = result.version

    # Add version description
    client.update_model_version(
        name="recommendation_model",
        version=new_version,
        description=version_description
    )

    # Transition to staging
    client.transition_model_version_stage(
        name="recommendation_model",
        version=new_version,
        stage="Staging"
    )

    return new_version

# Function to promote model to production after validation
def promote_model_to_production(model_name, version, validation_metrics):
    # Log promotion decision and validation metrics
    with mlflow.start_run(run_name=f"promotion_{model_name}_v{version}"):
        for metric_name, metric_value in validation_metrics.items():
            mlflow.log_metric(metric_name, metric_value)

        mlflow.log_param("promotion_time", datetime.now().strftime("%Y%m%d%H%M%S"))

    # Transition to production
    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage="Production"
    )

    # Archive previous production version
    for mv in client.get_latest_versions(model_name, stages=["Production"]):
        if mv.version != version:
            client.transition_model_version_stage(
                name=model_name,
                version=mv.version,
                stage="Archived"
            )

4. Continuous Integration/Deployment Pipeline

Automating the entire incremental learning cycle is key:

# Example CI/CD pipeline for incremental ML using Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

# Define the DAG
incremental_ml_dag = DAG(
    'incremental_ml_pipeline',
    default_args={
        'owner': 'data_science',
        'depends_on_past': False,
        'email': ['data_science@example.com'],
        'email_on_failure': True,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Incremental ML training pipeline',
    schedule_interval=timedelta(hours=6),  # Run every 6 hours
    start_date=days_ago(1),
    catchup=False,
    tags=['ml', 'incremental'],
)

# Define task functions
def extract_new_data(**context):
    execution_date = context['execution_date']
    previous_execution = execution_date - timedelta(hours=6)

    # Get data that arrived between previous and current execution
    new_data = fetch_data_for_timerange(previous_execution, execution_date)

    # Pass data to next task
    return {
        'data_path': store_data_temporarily(new_data),
        'record_count': len(new_data)
    }

def check_data_quality(**context):
    ti = context['ti']
    data_info = ti.xcom_pull(task_ids='extract_new_data')

    # Load the data
    data = load_data(data_info['data_path'])

    # Run quality checks
    quality_metrics = run_data_quality_checks(data)

    # Decide whether to proceed with training
    if quality_metrics['overall_score'] < 0.8:
        raise ValueError(f"Data quality too low: {quality_metrics}")

    return {
        'data_path': data_info['data_path'],
        'quality_metrics': quality_metrics
    }

def train_incremental_model(**context):
    ti = context['ti']
    data_info = ti.xcom_pull(task_ids='check_data_quality')

    # Load the new data
    new_data = load_data(data_info['data_path'])

    # Load current production model
    current_model = load_production_model()

    # Update model with new data
    updated_model, training_metrics = update_model_incrementally(current_model, new_data)

    # Save updated model
    model_path = save_model_temporarily(updated_model)

    return {
        'model_path': model_path,
        'training_metrics': training_metrics
    }

def evaluate_model_performance(**context):
    ti = context['ti']
    model_info = ti.xcom_pull(task_ids='train_incremental_model')

    # Load the updated model
    updated_model = load_model(model_info['model_path'])

    # Evaluate on holdout test set
    evaluation_metrics = evaluate_model(updated_model)

    # Compare with current production model
    current_model = load_production_model()
    current_model_metrics = evaluate_model(current_model)

    # Calculate improvement
    improvement = calculate_improvement(evaluation_metrics, current_model_metrics)

    return {
        'model_path': model_info['model_path'],
        'evaluation_metrics': evaluation_metrics,
        'improvement': improvement
    }

def register_and_deploy_model(**context):
    ti = context['ti']
    eval_info = ti.xcom_pull(task_ids='evaluate_model_performance')

    # Only deploy if there's significant improvement or it's been a week since last deployment
    if eval_info['improvement'] > 0.05 or is_week_since_last_deployment():
        # Load the updated model
        updated_model = load_model(eval_info['model_path'])

        # Register in model registry
        model_version = register_incremental_model(
            updated_model,
            eval_info['evaluation_metrics'],
            get_feature_list(),
            f"Incremental update with {eval_info['improvement']:.2%} improvement"
        )

        # Promote to production
        promote_model_to_production(
            "recommendation_model",
            model_version,
            eval_info['evaluation_metrics']
        )

        # Deploy to serving infrastructure
        deploy_model_to_serving("recommendation_model", model_version)

        return {
            'deployed': True,
            'version': model_version,
            'improvement': eval_info['improvement']
        }
    else:
        return {
            'deployed': False,
            'reason': 'Insufficient improvement'
        }

# Define the tasks
extract_task = PythonOperator(
    task_id='extract_new_data',
    python_callable=extract_new_data,
    provide_context=True,
    dag=incremental_ml_dag,
)

quality_check_task = PythonOperator(
    task_id='check_data_quality',
    python_callable=check_data_quality,
    provide_context=True,
    dag=incremental_ml_dag,
)

train_task = PythonOperator(
    task_id='train_incremental_model',
    python_callable=train_incremental_model,
    provide_context=True,
    dag=incremental_ml_dag,
)

evaluate_task = PythonOperator(
    task_id='evaluate_model_performance',
    python_callable=evaluate_model_performance,
    provide_context=True,
    dag=incremental_ml_dag,
)

deploy_task = PythonOperator(
    task_id='register_and_deploy_model',
    python_callable=register_and_deploy_model,
    provide_context=True,
    dag=incremental_ml_dag,
)

# Define task dependencies
extract_task >> quality_check_task >> train_task >> evaluate_task >> deploy_task

Handling Challenges in Incremental Learning

Catastrophic Forgetting

Incremental models may “forget” previously learned patterns when updated with new data:

# Example: Elastic Weight Consolidation to prevent catastrophic forgetting
import torch
import torch.nn as nn
import torch.optim as optim
import copy

class ElasticWeightConsolidation:
    def __init__(self, model, fisher_estimation_sample_size=1024):
        self.model = model
        self.device = next(model.parameters()).device

        # Store a copy of the model before updating
        self.old_model = copy.deepcopy(model)
        self.fisher_matrix = self._calculate_fisher_matrix(fisher_estimation_sample_size)

    def _calculate_fisher_matrix(self, sample_size):
        # Get samples to estimate Fisher Information Matrix
        data_loader = get_data_loader(sample_size=sample_size)

        # Initialize Fisher matrix (same shape as model parameters)
        fisher = {}
        for name, param in self.model.named_parameters():
            fisher[name] = torch.zeros_like(param)

        # Set model to evaluation mode
        self.model.eval()

        # Accumulate Fisher Information Matrix
        for input_data, _ in data_loader:
            input_data = input_data.to(self.device)

            # Forward pass
            self.model.zero_grad()
            output = self.model(input_data)

            # Compute log likelihood
            log_probs = nn.functional.log_softmax(output, dim=1)
            # Sample labels from the model's predictions
            sampled_labels = torch.multinomial(torch.exp(log_probs), 1).squeeze()

            # Compute loss
            loss = nn.functional.nll_loss(log_probs, sampled_labels)

            # Backward pass
            loss.backward()

            # Accumulate gradients
            for name, param in self.model.named_parameters():
                fisher[name] += param.grad.pow(2).data / sample_size

        return fisher

    def ewc_loss(self, ewc_lambda=5000):
        """
        Compute EWC regularization loss to prevent catastrophic forgetting
        """
        loss = 0
        for name, param in self.model.named_parameters():
            # Get corresponding parameter from old model
            old_param = self.old_model.state_dict()[name]
            # Get Fisher information
            fisher_info = self.fisher_matrix[name]
            # Compute EWC loss: Fisher * (theta - theta_old)^2
            loss += (fisher_info * (param - old_param).pow(2)).sum()

        return ewc_lambda * loss / 2

# Using EWC in incremental training
def train_with_ewc(model, new_data_loader, ewc_lambda=5000):
    # Initialize EWC
    ewc = ElasticWeightConsolidation(model)

    # Setup optimizer
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Training loop
    model.train()
    for epoch in range(5):
        for inputs, targets in new_data_loader:
            # Forward pass
            outputs = model(inputs)

            # Task loss
            task_loss = nn.functional.cross_entropy(outputs, targets)

            # EWC loss to prevent forgetting
            ewc_loss = ewc.ewc_loss(ewc_lambda)

            # Total loss
            loss = task_loss + ewc_loss

            # Backward pass and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            print(f"Task loss: {task_loss.item()}, EWC loss: {ewc_loss.item()}")

    return model

Concept Drift Detection

Explicitly handling concept drift helps incremental learning adapt appropriately:

# Example of concept drift detection to trigger model retraining
from river import drift

# Initialize drift detector
drift_detector = drift.ADWIN(delta=0.002)

# Function to monitor errors and detect drift
def monitor_predictions_for_drift(true_values, predictions):
    drift_detected = False

    for y_true, y_pred in zip(true_values, predictions):
        # Calculate error (1 for incorrect, 0 for correct)
        error = abs(y_true - y_pred) if isinstance(y_true, (int, float)) else (y_true != y_pred)

        # Update drift detector
        drift_detector.update(error)

        # Check if drift is detected
        if drift_detector.drift_detected:
            drift_detected = True
            # Log drift detection
            print(f"Concept drift detected at {datetime.now()}")
            # Reset the detector
            drift_detector = drift.ADWIN(delta=0.002)

    return drift_detected

# In an online learning loop
def online_learning_with_drift_detection(model, data_stream):
    for x, y_true in data_stream:
        # Make prediction
        y_pred = model.predict_one(x)

        # Check for drift
        if monitor_predictions_for_drift([y_true], [y_pred]):
            # Drift detected, take action
            print("Adapting to detected concept drift...")
            # Options:
            # 1. Adjust learning rate
            model.learning_rate *= 2  # Learn faster to adapt to new concept
            # 2. Forget some history
            model.reset_weights()  # More aggressive approach
            # 3. Switch to a different model
            model = select_alternative_model()

        # Update the model
        model.learn_one(x, y_true)

Performance Monitoring over Time

Tracking incremental model performance is crucial:

# Example of a performance tracking system for incremental models
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

class IncrementalPerformanceTracker:
    def __init__(self, metrics_names, window_size=1000):
        self.metrics_names = metrics_names
        self.window_size = window_size
        self.performance_log = []
        self.current_window = {metric: [] for metric in metrics_names}
        self.window_count = 0

    def add_prediction(self, y_true, y_pred, timestamp=None):
        if timestamp is None:
            timestamp = datetime.now()

        # Calculate metrics for this prediction
        metrics = calculate_metrics(y_true, y_pred, self.metrics_names)

        # Add to current window
        for metric_name, metric_value in metrics.items():
            self.current_window[metric_name].append(metric_value)

        self.window_count += 1

        # If window is full, calculate averages and log
        if self.window_count >= self.window_size:
            window_averages = {
                metric: sum(values) / len(values)
                for metric, values in self.current_window.items()
            }

            self.performance_log.append({
                'timestamp': timestamp,
                **window_averages
            })

            # Reset window
            self.current_window = {metric: [] for metric in self.metrics_names}
            self.window_count = 0

    def get_performance_trend(self, metric_name, days=7):
        # Convert log to DataFrame
        if not self.performance_log:
            return pd.DataFrame(columns=['timestamp', metric_name])

        df = pd.DataFrame(self.performance_log)

        # Filter for recent data
        cutoff = datetime.now() - timedelta(days=days)
        recent_df = df[df['timestamp'] >= cutoff]

        return recent_df[['timestamp', metric_name]]

    def plot_performance_trend(self, metric_name, days=7):
        trend_data = self.get_performance_trend(metric_name, days)

        if trend_data.empty:
            print(f"No data available for {metric_name} in the last {days} days")
            return

        plt.figure(figsize=(10, 6))
        plt.plot(trend_data['timestamp'], trend_data[metric_name])
        plt.title(f"{metric_name} Performance Trend (Last {days} Days)")
        plt.xlabel("Date")
        plt.ylabel(metric_name)
        plt.grid(True)
        plt.tight_layout()

        # Add regression line to show trend
        z = np.polyfit(range(len(trend_data)), trend_data[metric_name], 1)
        p = np.poly1d(z)
        plt.plot(trend_data['timestamp'], p(range(len(trend_data))), "r--", alpha=0.8)

        return plt

# Example usage in a prediction service
performance_tracker = IncrementalPerformanceTracker(
    metrics_names=['accuracy', 'f1_score', 'auc'],
    window_size=100
)

def prediction_service(request):
    # Extract features
    features = extract_features(request)

    # Make prediction using current model
    prediction = current_model.predict(features)

    # If ground truth becomes available later
    if 'actual_value' in request:
        performance_tracker.add_prediction(
            request['actual_value'],
            prediction
        )

    return prediction

Practical Incremental Learning Examples

Example 1: Incremental Recommendation System

# Example of incremental learning for a recommendation system
from surprise import SVD, Dataset, Reader, accuracy
from surprise.model_selection import train_test_split
import pandas as pd
from collections import defaultdict

class IncrementalRecommender:
    def __init__(self, factors=100, epochs=20):
        # Initialize the model
        self.model = SVD(n_factors=factors, n_epochs=epochs)
        self.reader = Reader(rating_scale=(1, 5))

        # Keep track of user-item interactions
        self.user_items = defaultdict(set)

        # Track performance over time
        self.performance_history = []

    def initial_fit(self, ratings_df):
        """Initial training with historical data"""
        # Prepare data
        data = Dataset.load_from_df(
            ratings_df[['user_id', 'item_id', 'rating']],
            self.reader
        )

        # Split for evaluation
        trainset, testset = train_test_split(data, test_size=0.25)

        # Train model
        self.model.fit(trainset)

        # Evaluate
        predictions = self.model.test(testset)
        rmse = accuracy.rmse(predictions)

        # Record performance
        self.performance_history.append({
            'timestamp': pd.Timestamp.now(),
            'rmse': rmse,
            'n_samples': len(trainset.all_ratings())
        })

        # Update user-item tracking
        for user_id, item_id, _ in ratings_df.itertuples(index=False, name=None):
            self.user_items[user_id].add(item_id)

        return rmse

    def incremental_fit(self, new_ratings_df):
        """Update model with new ratings"""
        if new_ratings_df.empty:
            return None

        # Convert to surprise format
        new_data = Dataset.load_from_df(
            new_ratings_df[['user_id', 'item_id', 'rating']],
            self.reader
        )

        # Build a trainset from all ratings
        new_trainset = new_data.build_full_trainset()

        # Update model with new data
        self.model.fit(new_trainset)

        # Hold out some data for evaluation
        new_testset = [(uid, iid, rat) for uid, iid, rat in
                       new_ratings_df.sample(min(len(new_ratings_df), 100)).itertuples(index=False, name=None)]

        # Evaluate
        predictions = self.model.test(new_testset)
        rmse = accuracy.rmse(predictions)

        # Record performance
        self.performance_history.append({
            'timestamp': pd.Timestamp.now(),
            'rmse': rmse,
            'n_samples': len(new_trainset.all_ratings())
        })

        # Update user-item tracking
        for user_id, item_id, _ in new_ratings_df.itertuples(index=False, name=None):
            self.user_items[user_id].add(item_id)

        return rmse

    def recommend(self, user_id, n=10):
        """Generate recommendations for a user"""
        # Get items the user hasn't rated yet
        if user_id in self.user_items:
            user_rated_items = self.user_items[user_id]
            all_items = set([i for u, i, r in self.model.trainset.all_ratings()])
            candidate_items = list(all_items - user_rated_items)
        else:
            # New user - recommend popular items
            return self.recommend_popular(n)

        # Generate predictions for candidate items
        predictions = [self.model.predict(user_id, item_id) for item_id in candidate_items]

        # Sort by predicted rating
        predictions.sort(key=lambda x: x.est, reverse=True)

        # Return top n recommendations
        return [pred.iid for pred in predictions[:n]]

    def recommend_popular(self, n=10):
        """Recommend popular items for new users"""
        # Calculate item popularity
        item_ratings = defaultdict(list)
        for u, i, r in self.model.trainset.all_ratings():
            item_ratings[i].append(r)

        # Calculate average rating per item
        item_avg = {i: sum(rs)/len(rs) for i, rs in item_ratings.items() if len(rs) >= 10}

        # Sort by average rating
        popular_items = sorted(item_avg.items(), key=lambda x: x[1], reverse=True)

        # Return top n popular items
        return [item for item, rating in popular_items[:n]]

    def plot_performance_trend(self):
        """Plot RMSE over time"""
        if not self.performance_history:
            return "No performance data available yet"

        perf_df = pd.DataFrame(self.performance_history)

        plt.figure(figsize=(12, 6))
        plt.subplot(2, 1, 1)
        plt.plot(perf_df['timestamp'], perf_df['rmse'])
        plt.title('RMSE over Time')
        plt.ylabel('RMSE')
        plt.grid(True)

        plt.subplot(2, 1, 2)
        plt.plot(perf_df['timestamp'], perf_df['n_samples'])
        plt.title('Training Set Size over Time')
        plt.ylabel('Number of Samples')
        plt.grid(True)

        plt.tight_layout()
        return plt

Example 2: Incremental Fraud Detection

# Example of incremental fraud detection system
from river import ensemble, feature_extraction, linear_model, metrics, preprocessing
import pandas as pd
import json
import time
from datetime import datetime

class IncrementalFraudDetector:
    def __init__(self):
        # Define the feature preprocessing pipeline
        self.preprocessor = (
            feature_extraction.TargetEncoder() |
            preprocessing.StandardScaler()
        )

        # Define the model
        self.model = ensemble.AdaptiveRandomForestClassifier(
            n_models=10,
            seed=42
        )

        # Combined pipeline
        self.pipeline = self.preprocessor | self.model

        # Metrics to track
        self.metrics = {
            'accuracy': metrics.Accuracy(),
            'auc': metrics.ROCAUC(),
            'f1': metrics.F1(),
            'precision': metrics.Precision(),
            'recall': metrics.Recall(),
            'confusion_matrix': metrics.ConfusionMatrix()
        }

        # Store performance history
        self.performance_history = []
        self.last_log_time = time.time()
        self.predictions_since_last_log = 0
        self.log_interval = 1000  # Log after every 1000 predictions

    def predict(self, transaction):
        """Make a fraud prediction for a single transaction"""
        # Make prediction (before learning)
        fraud_score = self.pipeline.predict_proba_one(transaction)
        # Extract probability of fraud (class 1)
        fraud_probability = fraud_score.get(1, 0)

        # Apply threshold for decision
        is_fraud = fraud_probability >= 0.5

        return {
            'is_fraud': bool(is_fraud),
            'fraud_probability': fraud_probability,
            'timestamp': datetime.now().isoformat()
        }

    def update(self, transaction, is_fraud):
        """Update the model with a labeled transaction"""
        # Get prediction before updating
        y_pred = self.pipeline.predict_one(transaction)

        # Update metrics
        for metric_name, metric in self.metrics.items():
            metric.update(is_fraud, y_pred)

        # Learn from this example
        self.pipeline.learn_one(transaction, is_fraud)

        # Increment counter
        self.predictions_since_last_log += 1

        # Log performance periodically
        if self.predictions_since_last_log >= self.log_interval:
            self._log_performance()
            self.predictions_since_last_log = 0

    def _log_performance(self):
        """Log current performance metrics"""
        current_time = datetime.now()

        # Collect metrics
        current_metrics = {
            name: metric.get() for name, metric in self.metrics.items()
            if name != 'confusion_matrix'  # Special handling for confusion matrix
        }

        # Add confusion matrix
        cm = self.metrics['confusion_matrix']
        current_metrics['true_positives'] = cm.true_positives
        current_metrics['false_positives'] = cm.false_positives
        current_metrics['true_negatives'] = cm.true_negatives
        current_metrics['false_negatives'] = cm.false_negatives

        # Add timestamp
        current_metrics['timestamp'] = current_time

        # Store metrics
        self.performance_history.append(current_metrics)

        # Log to console
        print(f"[{current_time}] Performance metrics:")
        for name, value in current_metrics.items():
            if name != 'timestamp':
                print(f"  {name}: {value}")

    def get_performance_trend(self, metric_name='auc'):
        """Get performance trend for a specific metric"""
        if not self.performance_history:
            return pd.DataFrame()

        # Convert to DataFrame
        perf_df = pd.DataFrame(self.performance_history)

        # Ensure metric exists
        if metric_name not in perf_df.columns:
            raise ValueError(f"Metric {metric_name} not found in performance history")

        return perf_df[['timestamp', metric_name]]

    def save_model(self, filepath):
        """Save the model to disk"""
        # River models can be pickled
        import pickle
        with open(filepath, 'wb') as f:
            pickle.dump(self.pipeline, f)

    @classmethod
    def load_model(cls, filepath):
        """Load a model from disk"""
        import pickle
        detector = cls()
        with open(filepath, 'rb') as f:
            detector.pipeline = pickle.load(f)
        return detector

# Example usage in a transaction processing system
fraud_detector = IncrementalFraudDetector()

def process_transaction(transaction_data):
    # Make prediction
    prediction = fraud_detector.predict(transaction_data)

    # Apply business rules
    if prediction['fraud_probability'] > 0.9:
        # High confidence fraud - block transaction
        action = "block"
    elif prediction['fraud_probability'] > 0.7:
        # Suspicious - require additional verification
        action = "verify"
    else:
        # Likely legitimate - approve
        action = "approve"

    # Log transaction and decision
    log_transaction(transaction_data, prediction, action)

    return {
        'action': action,
        'fraud_score': prediction['fraud_probability'],
        'transaction_id': transaction_data.get('transaction_id')
    }

def process_feedback(transaction_id, actual_fraud):
    """Process feedback when fraud status is confirmed"""
    # Retrieve transaction data
    transaction_data = get_transaction_by_id(transaction_id)

    if transaction_data:
        # Update model with confirmed label
        fraud_detector.update(transaction_data, actual_fraud)

        # Log feedback
        log_feedback(transaction_id, actual_fraud)

        return {"status": "success", "message": "Feedback processed"}
    else:
        return {"status": "error", "message": "Transaction not found"}

Decision Rules

Use this checklist for incremental ML decisions:

  1. If your data patterns change frequently, incremental learning usually beats periodic retraining
  2. If catastrophic forgetting is a concern, add regularization to preserve important weights
  3. If concept drift is suspected, monitor prediction distributions and trigger retraining when drift exceeds thresholds
  4. If model updates need validation before deployment, use a staging promotion process
  5. If online learning is too volatile, use mini-batch updates for stability

Incremental learning adds system complexity. Only use it when the alternative (periodic full retraining) is worse.

Ready to Implement These AI Data Engineering Solutions?

Get a comprehensive AI Readiness Assessment to determine the best approach for your organization's data infrastructure and AI implementation needs.

Similar Articles

Privacy-Preserving Machine Learning Techniques
Privacy-Preserving Machine Learning Techniques
30 Jan, 2024 | 03 Mins read

ML models require data to train effectively, but this data often contains sensitive personal information. Privacy-preserving ML (PPML) techniques enable organizations to build effective models while s

MLOps vs DataOps: Understanding the Differences and Overlaps
MLOps vs DataOps: Understanding the Differences and Overlaps
08 Feb, 2024 | 03 Mins read

DataOps and MLOps both aim to improve reliability and efficiency in data-centric workflows, but they address different parts of the data science lifecycle. Understanding their boundaries helps organiz

Graph Neural Networks: Applications in Enterprise Data
Graph Neural Networks: Applications in Enterprise Data
13 Feb, 2024 | 02 Mins read

Enterprise data naturally forms networks: customer relationships, supply chains, financial transactions, product hierarchies. Graph neural networks (GNNs) process this structured data to derive insigh

Scaling Machine Learning Infrastructure: From POC to Production
Scaling Machine Learning Infrastructure: From POC to Production
10 May, 2024 | 04 Mins read

# Scaling Machine Learning Infrastructure: From POC to Production Moving a machine learning model from notebook to production exposes gaps that notebooks hide. Data scientists produce working models

Deploying ML Models on Kubernetes: Best Practices
Deploying ML Models on Kubernetes: Best Practices
06 May, 2024 | 03 Mins read

# Deploying ML Models on Kubernetes: Best Practices ML models in production need orchestration, scaling, and monitoring infrastructure. Kubernetes provides these capabilities, though the learning cur

Federated Learning for Privacy-Sensitive Industries
Federated Learning for Privacy-Sensitive Industries
17 Jun, 2024 | 04 Mins read

# Federated Learning for Privacy-Sensitive Industries Data privacy regulations constrain how organizations in healthcare, finance, and telecommunications can use machine learning. Federated learning

Feature Store Architectures: Building the Foundation for Enterprise ML
Feature Store Architectures: Building the Foundation for Enterprise ML
18 Jan, 2024 | 03 Mins read

Organizations scaling ML efforts encounter a predictable problem: feature engineering work duplicates across teams, training-serving skew causes model failures in production, and point-in-time correct

Machine Learning Testing Strategies
Machine Learning Testing Strategies
03 Nov, 2024 | 04 Mins read

Testing machine learning systems involves challenges beyond traditional software testing. Unlike deterministic software where inputs consistently produce the same outputs, ML models operate on probabi

Serverless Machine Learning: Patterns with AWS Lambda, GCP Cloud Run & Azure Functions
Serverless Machine Learning: Patterns with AWS Lambda, GCP Cloud Run & Azure Functions
18 Jul, 2025 | 05 Mins read

A social media analytics company watched their Kubernetes cluster fail to handle traffic spikes from trending topics. The cluster would scale from 50 to 500 pods in minutes, but not fast enough to pre

AI Observability: Monitoring Drift, Data Quality & Model Performance
AI Observability: Monitoring Drift, Data Quality & Model Performance
12 Sep, 2025 | 02 Mins read

An insurance company's premium pricing model had been quietly going haywire for two weeks. Young drivers in high-risk areas were getting bargain prices while safe drivers faced astronomical quotes. By