Traditional ML trains on historical data, deploys, and waits until performance degrades. This fails in dynamic environments where data patterns evolve. Incremental ML continuously updates models as new data arrives.
This article covers designing and implementing continuous learning systems.
Why Static Models Fall Short
Before diving into incremental learning approaches, let’s understand the limitations of traditional static models:
- Concept Drift: Real-world data distributions change over time, causing model performance to degrade
- Data Volume Challenges: Retraining on all historical data becomes computationally expensive as data grows
- Latency in Adaptation: Infrequent retraining means systems respond slowly to emerging patterns
- Missed Learning Opportunities: Valuable feedback data is often unused until the next retraining cycle
Consider a demand forecasting model for an e-commerce platform. Consumer behavior shifts due to seasonality, trends, or unexpected events (like a pandemic). A static model can’t adapt to these changes until a manual retraining cycle, potentially leading to weeks or months of suboptimal predictions.
Incremental Learning Approaches
Incremental learning systems update models continuously as new data arrives. Here are the key approaches:
1. Online Learning
In online learning, models update with each new training example:
# Example using River (formerly Creme), a Python library for online ML
from river import linear_model
from river import metrics
from river import preprocessing
# Initialize model pipeline
scaler = preprocessing.StandardScaler()
model = linear_model.LinearRegression()
pipeline = scaler | model
# Metrics to track
metric = metrics.MAE()
# Online learning loop
for x, y in stream_of_data():
# Make a prediction
y_pred = pipeline.predict_one(x)
# Update performance metric
metric.update(y, y_pred)
# Learn from the example
pipeline.learn_one(x, y)
# Log current performance
print(f"Current MAE: {metric.get()}")
Advantages:
- Immediate adaptation to new patterns
- Minimal memory footprint
- Continuous improvement
Challenges:
- Susceptibility to noisy data or outliers
- Potential for catastrophic forgetting
- Limited to algorithms that support online updates
2. Mini-batch Learning
Mini-batch learning updates the model after accumulating a small batch of examples:
# Example using PyTorch for mini-batch incremental learning
import torch
from torch.utils.data import DataLoader, TensorDataset
def mini_batch_training_loop(model, optimizer, new_data_queue, batch_size=32):
while True:
# Collect data until we have enough for a mini-batch
batch_x, batch_y = [], []
for _ in range(batch_size):
if new_data_queue.empty():
# Wait for more data if queue is empty
time.sleep(1)
continue
x, y = new_data_queue.get()
batch_x.append(x)
batch_y.append(y)
# Convert to tensors
x_tensor = torch.tensor(batch_x, dtype=torch.float32)
y_tensor = torch.tensor(batch_y, dtype=torch.float32)
# Create dataset and dataloader
dataset = TensorDataset(x_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Update model with mini-batch
for x_batch, y_batch in dataloader:
# Forward pass
y_pred = model(x_batch)
loss = torch.nn.functional.mse_loss(y_pred, y_batch)
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Log performance metrics
log_performance_metrics(model, x_tensor, y_tensor)
Advantages:
- More stable than pure online learning
- Allows use of batch-oriented algorithms
- Better utilization of vectorized operations
Challenges:
- Slightly delayed adaptation compared to online learning
- Requires careful batch size selection
- Resource consumption increases with batch size
3. Incremental Batch Learning
This approach combines periodic batch training with the existing model as a starting point:
# Example using Scikit-learn's partial_fit
from sklearn.linear_model import SGDClassifier
import numpy as np
from datetime import datetime, timedelta
# Initialize model
model = SGDClassifier(loss="log", random_state=42)
# Initial training
X_initial, y_initial = get_initial_training_data()
model.partial_fit(X_initial, y_initial, classes=np.unique(y_initial))
# Incremental batch learning loop
last_update = datetime.now()
batch_data_x, batch_data_y = [], []
while True:
# Collect new data
new_x, new_y = get_new_data()
batch_data_x.append(new_x)
batch_data_y.append(new_y)
# Check if it's time for batch update (every 1 hour)
if datetime.now() - last_update > timedelta(hours=1) and len(batch_data_x) > 0:
# Convert to numpy arrays
X_batch = np.array(batch_data_x)
y_batch = np.array(batch_data_y)
# Update model with new batch
model.partial_fit(X_batch, y_batch)
# Clear batch data
batch_data_x, batch_data_y = [], []
last_update = datetime.now()
# Evaluate and log performance
performance = evaluate_model(model)
log_performance(performance)
Advantages:
- Balances adaptation speed with stability
- Works with a wider range of algorithms
- Efficient use of computational resources
Challenges:
- Requires algorithms that support warm-starting
- More complex to implement than pure online learning
- Batch timing selection impacts performance
Implementing Continuous Learning Systems
Now let’s explore how to build a robust continuous learning system in production.
System Architecture
A well-designed incremental learning system typically includes these components:
┌────────────────┐ ┌───────────────┐ ┌─────────────────┐
│ Data Sources │────▶│ Feature Store │────▶│ Training Pipeline│
└────────────────┘ └───────────────┘ └─────────────────┘
│
┌────────────────┐ ┌───────────────┐ ▼
│ Monitoring │◀────│ Model Store │◀────┌─────────────────┐
└────────────────┘ └───────────────┘ │ Model Evaluation │
│ └─────────────────┘
▼ ▲
┌────────────────┐ ┌───────────────┐ │
│ Drift Detector│────▶│ Model Registry│─────────────┘
└────────────────┘ └───────────────┘
Let’s implement key components of this architecture:
1. Feature Store for Incremental Learning
A feature store helps maintain consistency between training and inference:
# Example using Feast for feature management in incremental learning
from feast import FeatureStore
from feast.data_source import FileSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.feature import Feature
from feast.value_type import ValueType
from datetime import datetime, timedelta
# Define entities and features
customer = Entity(name="customer_id", value_type=ValueType.INT64)
customer_features = FileSource(
path="s3://bucket/customer_features.parquet",
event_timestamp_column="event_timestamp",
)
customer_feature_view = FeatureView(
name="customer_features",
entities=[customer],
ttl=timedelta(days=90), # Features are valid for 90 days
features=[
Feature(name="total_purchases", dtype=ValueType.FLOAT),
Feature(name="avg_order_value", dtype=ValueType.FLOAT),
Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
],
online=True,
input=customer_features,
)
# Initialize a feature store
store = FeatureStore(repo_path="feature_repo")
# Get training data for incremental learning
def get_incremental_training_data(start_time, end_time):
# Get historical feature values and labels
training_df = store.get_historical_features(
entity_df=get_training_entities(start_time, end_time),
features=[
"customer_features:total_purchases",
"customer_features:avg_order_value",
"customer_features:days_since_last_purchase",
],
).to_df()
return training_df
# Function to continuously update feature store
def update_feature_store(new_data_stream):
for batch in new_data_stream:
# Process and ingest new feature data
store.apply([customer_feature_view])
# Materialize new features to online store
store.materialize_incremental(end_date=datetime.now())
2. Drift Detection and Monitoring
Detecting when data or predictions drift is crucial for incremental learning:
# Example using Alibi-Detect for drift detection
from alibi_detect.cd import KSDrift
import numpy as np
from datetime import datetime
import time
# Initialize drift detector with reference data
reference_data = get_reference_data()
drift_detector = KSDrift(
reference_data,
p_val=0.05, # Statistical significance threshold
alternative='two-sided'
)
# Function to monitor drift in a prediction stream
def monitor_prediction_drift(prediction_stream, alert_webhook=None):
batch = []
last_check = datetime.now()
for prediction in prediction_stream:
# Collect predictions in a batch
batch.append(prediction)
# Check for drift every hour or when batch size reaches 1000
if (datetime.now() - last_check > timedelta(hours=1) or len(batch) >= 1000) and len(batch) > 0:
# Convert batch to numpy array
prediction_batch = np.array(batch)
# Run drift detection
drift_result = drift_detector.predict(prediction_batch)
# Log drift metrics
log_drift_metrics(drift_result)
# Send alert if drift detected
if drift_result['data']['is_drift'] == 1 and alert_webhook:
send_drift_alert(
alert_webhook,
f"Prediction drift detected with p-value: {drift_result['data']['p_val']}"
)
# Trigger model update if drift is significant
if drift_result['data']['p_val'] < 0.01:
trigger_model_update()
# Reset batch and update last check time
batch = []
last_check = datetime.now()
3. Versioned Model Registry
A model registry keeps track of model versions in continuous learning:
# Example using MLflow for model versioning in incremental learning
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
from datetime import datetime
# Configure MLflow
mlflow.set_tracking_uri("http://mlflow-server:5000")
client = MlflowClient()
# Function to register and version a model after incremental training
def register_incremental_model(model, metrics, features_used, version_description):
# Get current time for versioning
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
# Start MLflow run
with mlflow.start_run(run_name=f"incremental_update_{timestamp}"):
# Log model
mlflow.sklearn.log_model(model, "model")
# Log metrics
for metric_name, metric_value in metrics.items():
mlflow.log_metric(metric_name, metric_value)
# Log feature information
mlflow.log_param("features_used", features_used)
mlflow.log_param("update_time", timestamp)
run_id = mlflow.active_run().info.run_id
# Register model in registry or create new one if it doesn't exist
try:
model_details = client.get_latest_versions("recommendation_model", stages=["Production"])[0]
new_version = int(model_details.version) + 1
except:
# Model doesn't exist yet, register as first version
result = mlflow.register_model(f"runs:/{run_id}/model", "recommendation_model")
new_version = result.version
# Add version description
client.update_model_version(
name="recommendation_model",
version=new_version,
description=version_description
)
# Transition to staging
client.transition_model_version_stage(
name="recommendation_model",
version=new_version,
stage="Staging"
)
return new_version
# Function to promote model to production after validation
def promote_model_to_production(model_name, version, validation_metrics):
# Log promotion decision and validation metrics
with mlflow.start_run(run_name=f"promotion_{model_name}_v{version}"):
for metric_name, metric_value in validation_metrics.items():
mlflow.log_metric(metric_name, metric_value)
mlflow.log_param("promotion_time", datetime.now().strftime("%Y%m%d%H%M%S"))
# Transition to production
client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production"
)
# Archive previous production version
for mv in client.get_latest_versions(model_name, stages=["Production"]):
if mv.version != version:
client.transition_model_version_stage(
name=model_name,
version=mv.version,
stage="Archived"
)
4. Continuous Integration/Deployment Pipeline
Automating the entire incremental learning cycle is key:
# Example CI/CD pipeline for incremental ML using Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
# Define the DAG
incremental_ml_dag = DAG(
'incremental_ml_pipeline',
default_args={
'owner': 'data_science',
'depends_on_past': False,
'email': ['data_science@example.com'],
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Incremental ML training pipeline',
schedule_interval=timedelta(hours=6), # Run every 6 hours
start_date=days_ago(1),
catchup=False,
tags=['ml', 'incremental'],
)
# Define task functions
def extract_new_data(**context):
execution_date = context['execution_date']
previous_execution = execution_date - timedelta(hours=6)
# Get data that arrived between previous and current execution
new_data = fetch_data_for_timerange(previous_execution, execution_date)
# Pass data to next task
return {
'data_path': store_data_temporarily(new_data),
'record_count': len(new_data)
}
def check_data_quality(**context):
ti = context['ti']
data_info = ti.xcom_pull(task_ids='extract_new_data')
# Load the data
data = load_data(data_info['data_path'])
# Run quality checks
quality_metrics = run_data_quality_checks(data)
# Decide whether to proceed with training
if quality_metrics['overall_score'] < 0.8:
raise ValueError(f"Data quality too low: {quality_metrics}")
return {
'data_path': data_info['data_path'],
'quality_metrics': quality_metrics
}
def train_incremental_model(**context):
ti = context['ti']
data_info = ti.xcom_pull(task_ids='check_data_quality')
# Load the new data
new_data = load_data(data_info['data_path'])
# Load current production model
current_model = load_production_model()
# Update model with new data
updated_model, training_metrics = update_model_incrementally(current_model, new_data)
# Save updated model
model_path = save_model_temporarily(updated_model)
return {
'model_path': model_path,
'training_metrics': training_metrics
}
def evaluate_model_performance(**context):
ti = context['ti']
model_info = ti.xcom_pull(task_ids='train_incremental_model')
# Load the updated model
updated_model = load_model(model_info['model_path'])
# Evaluate on holdout test set
evaluation_metrics = evaluate_model(updated_model)
# Compare with current production model
current_model = load_production_model()
current_model_metrics = evaluate_model(current_model)
# Calculate improvement
improvement = calculate_improvement(evaluation_metrics, current_model_metrics)
return {
'model_path': model_info['model_path'],
'evaluation_metrics': evaluation_metrics,
'improvement': improvement
}
def register_and_deploy_model(**context):
ti = context['ti']
eval_info = ti.xcom_pull(task_ids='evaluate_model_performance')
# Only deploy if there's significant improvement or it's been a week since last deployment
if eval_info['improvement'] > 0.05 or is_week_since_last_deployment():
# Load the updated model
updated_model = load_model(eval_info['model_path'])
# Register in model registry
model_version = register_incremental_model(
updated_model,
eval_info['evaluation_metrics'],
get_feature_list(),
f"Incremental update with {eval_info['improvement']:.2%} improvement"
)
# Promote to production
promote_model_to_production(
"recommendation_model",
model_version,
eval_info['evaluation_metrics']
)
# Deploy to serving infrastructure
deploy_model_to_serving("recommendation_model", model_version)
return {
'deployed': True,
'version': model_version,
'improvement': eval_info['improvement']
}
else:
return {
'deployed': False,
'reason': 'Insufficient improvement'
}
# Define the tasks
extract_task = PythonOperator(
task_id='extract_new_data',
python_callable=extract_new_data,
provide_context=True,
dag=incremental_ml_dag,
)
quality_check_task = PythonOperator(
task_id='check_data_quality',
python_callable=check_data_quality,
provide_context=True,
dag=incremental_ml_dag,
)
train_task = PythonOperator(
task_id='train_incremental_model',
python_callable=train_incremental_model,
provide_context=True,
dag=incremental_ml_dag,
)
evaluate_task = PythonOperator(
task_id='evaluate_model_performance',
python_callable=evaluate_model_performance,
provide_context=True,
dag=incremental_ml_dag,
)
deploy_task = PythonOperator(
task_id='register_and_deploy_model',
python_callable=register_and_deploy_model,
provide_context=True,
dag=incremental_ml_dag,
)
# Define task dependencies
extract_task >> quality_check_task >> train_task >> evaluate_task >> deploy_task
Handling Challenges in Incremental Learning
Catastrophic Forgetting
Incremental models may “forget” previously learned patterns when updated with new data:
# Example: Elastic Weight Consolidation to prevent catastrophic forgetting
import torch
import torch.nn as nn
import torch.optim as optim
import copy
class ElasticWeightConsolidation:
def __init__(self, model, fisher_estimation_sample_size=1024):
self.model = model
self.device = next(model.parameters()).device
# Store a copy of the model before updating
self.old_model = copy.deepcopy(model)
self.fisher_matrix = self._calculate_fisher_matrix(fisher_estimation_sample_size)
def _calculate_fisher_matrix(self, sample_size):
# Get samples to estimate Fisher Information Matrix
data_loader = get_data_loader(sample_size=sample_size)
# Initialize Fisher matrix (same shape as model parameters)
fisher = {}
for name, param in self.model.named_parameters():
fisher[name] = torch.zeros_like(param)
# Set model to evaluation mode
self.model.eval()
# Accumulate Fisher Information Matrix
for input_data, _ in data_loader:
input_data = input_data.to(self.device)
# Forward pass
self.model.zero_grad()
output = self.model(input_data)
# Compute log likelihood
log_probs = nn.functional.log_softmax(output, dim=1)
# Sample labels from the model's predictions
sampled_labels = torch.multinomial(torch.exp(log_probs), 1).squeeze()
# Compute loss
loss = nn.functional.nll_loss(log_probs, sampled_labels)
# Backward pass
loss.backward()
# Accumulate gradients
for name, param in self.model.named_parameters():
fisher[name] += param.grad.pow(2).data / sample_size
return fisher
def ewc_loss(self, ewc_lambda=5000):
"""
Compute EWC regularization loss to prevent catastrophic forgetting
"""
loss = 0
for name, param in self.model.named_parameters():
# Get corresponding parameter from old model
old_param = self.old_model.state_dict()[name]
# Get Fisher information
fisher_info = self.fisher_matrix[name]
# Compute EWC loss: Fisher * (theta - theta_old)^2
loss += (fisher_info * (param - old_param).pow(2)).sum()
return ewc_lambda * loss / 2
# Using EWC in incremental training
def train_with_ewc(model, new_data_loader, ewc_lambda=5000):
# Initialize EWC
ewc = ElasticWeightConsolidation(model)
# Setup optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training loop
model.train()
for epoch in range(5):
for inputs, targets in new_data_loader:
# Forward pass
outputs = model(inputs)
# Task loss
task_loss = nn.functional.cross_entropy(outputs, targets)
# EWC loss to prevent forgetting
ewc_loss = ewc.ewc_loss(ewc_lambda)
# Total loss
loss = task_loss + ewc_loss
# Backward pass and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"Task loss: {task_loss.item()}, EWC loss: {ewc_loss.item()}")
return model
Concept Drift Detection
Explicitly handling concept drift helps incremental learning adapt appropriately:
# Example of concept drift detection to trigger model retraining
from river import drift
# Initialize drift detector
drift_detector = drift.ADWIN(delta=0.002)
# Function to monitor errors and detect drift
def monitor_predictions_for_drift(true_values, predictions):
drift_detected = False
for y_true, y_pred in zip(true_values, predictions):
# Calculate error (1 for incorrect, 0 for correct)
error = abs(y_true - y_pred) if isinstance(y_true, (int, float)) else (y_true != y_pred)
# Update drift detector
drift_detector.update(error)
# Check if drift is detected
if drift_detector.drift_detected:
drift_detected = True
# Log drift detection
print(f"Concept drift detected at {datetime.now()}")
# Reset the detector
drift_detector = drift.ADWIN(delta=0.002)
return drift_detected
# In an online learning loop
def online_learning_with_drift_detection(model, data_stream):
for x, y_true in data_stream:
# Make prediction
y_pred = model.predict_one(x)
# Check for drift
if monitor_predictions_for_drift([y_true], [y_pred]):
# Drift detected, take action
print("Adapting to detected concept drift...")
# Options:
# 1. Adjust learning rate
model.learning_rate *= 2 # Learn faster to adapt to new concept
# 2. Forget some history
model.reset_weights() # More aggressive approach
# 3. Switch to a different model
model = select_alternative_model()
# Update the model
model.learn_one(x, y_true)
Performance Monitoring over Time
Tracking incremental model performance is crucial:
# Example of a performance tracking system for incremental models
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
class IncrementalPerformanceTracker:
def __init__(self, metrics_names, window_size=1000):
self.metrics_names = metrics_names
self.window_size = window_size
self.performance_log = []
self.current_window = {metric: [] for metric in metrics_names}
self.window_count = 0
def add_prediction(self, y_true, y_pred, timestamp=None):
if timestamp is None:
timestamp = datetime.now()
# Calculate metrics for this prediction
metrics = calculate_metrics(y_true, y_pred, self.metrics_names)
# Add to current window
for metric_name, metric_value in metrics.items():
self.current_window[metric_name].append(metric_value)
self.window_count += 1
# If window is full, calculate averages and log
if self.window_count >= self.window_size:
window_averages = {
metric: sum(values) / len(values)
for metric, values in self.current_window.items()
}
self.performance_log.append({
'timestamp': timestamp,
**window_averages
})
# Reset window
self.current_window = {metric: [] for metric in self.metrics_names}
self.window_count = 0
def get_performance_trend(self, metric_name, days=7):
# Convert log to DataFrame
if not self.performance_log:
return pd.DataFrame(columns=['timestamp', metric_name])
df = pd.DataFrame(self.performance_log)
# Filter for recent data
cutoff = datetime.now() - timedelta(days=days)
recent_df = df[df['timestamp'] >= cutoff]
return recent_df[['timestamp', metric_name]]
def plot_performance_trend(self, metric_name, days=7):
trend_data = self.get_performance_trend(metric_name, days)
if trend_data.empty:
print(f"No data available for {metric_name} in the last {days} days")
return
plt.figure(figsize=(10, 6))
plt.plot(trend_data['timestamp'], trend_data[metric_name])
plt.title(f"{metric_name} Performance Trend (Last {days} Days)")
plt.xlabel("Date")
plt.ylabel(metric_name)
plt.grid(True)
plt.tight_layout()
# Add regression line to show trend
z = np.polyfit(range(len(trend_data)), trend_data[metric_name], 1)
p = np.poly1d(z)
plt.plot(trend_data['timestamp'], p(range(len(trend_data))), "r--", alpha=0.8)
return plt
# Example usage in a prediction service
performance_tracker = IncrementalPerformanceTracker(
metrics_names=['accuracy', 'f1_score', 'auc'],
window_size=100
)
def prediction_service(request):
# Extract features
features = extract_features(request)
# Make prediction using current model
prediction = current_model.predict(features)
# If ground truth becomes available later
if 'actual_value' in request:
performance_tracker.add_prediction(
request['actual_value'],
prediction
)
return prediction
Practical Incremental Learning Examples
Example 1: Incremental Recommendation System
# Example of incremental learning for a recommendation system
from surprise import SVD, Dataset, Reader, accuracy
from surprise.model_selection import train_test_split
import pandas as pd
from collections import defaultdict
class IncrementalRecommender:
def __init__(self, factors=100, epochs=20):
# Initialize the model
self.model = SVD(n_factors=factors, n_epochs=epochs)
self.reader = Reader(rating_scale=(1, 5))
# Keep track of user-item interactions
self.user_items = defaultdict(set)
# Track performance over time
self.performance_history = []
def initial_fit(self, ratings_df):
"""Initial training with historical data"""
# Prepare data
data = Dataset.load_from_df(
ratings_df[['user_id', 'item_id', 'rating']],
self.reader
)
# Split for evaluation
trainset, testset = train_test_split(data, test_size=0.25)
# Train model
self.model.fit(trainset)
# Evaluate
predictions = self.model.test(testset)
rmse = accuracy.rmse(predictions)
# Record performance
self.performance_history.append({
'timestamp': pd.Timestamp.now(),
'rmse': rmse,
'n_samples': len(trainset.all_ratings())
})
# Update user-item tracking
for user_id, item_id, _ in ratings_df.itertuples(index=False, name=None):
self.user_items[user_id].add(item_id)
return rmse
def incremental_fit(self, new_ratings_df):
"""Update model with new ratings"""
if new_ratings_df.empty:
return None
# Convert to surprise format
new_data = Dataset.load_from_df(
new_ratings_df[['user_id', 'item_id', 'rating']],
self.reader
)
# Build a trainset from all ratings
new_trainset = new_data.build_full_trainset()
# Update model with new data
self.model.fit(new_trainset)
# Hold out some data for evaluation
new_testset = [(uid, iid, rat) for uid, iid, rat in
new_ratings_df.sample(min(len(new_ratings_df), 100)).itertuples(index=False, name=None)]
# Evaluate
predictions = self.model.test(new_testset)
rmse = accuracy.rmse(predictions)
# Record performance
self.performance_history.append({
'timestamp': pd.Timestamp.now(),
'rmse': rmse,
'n_samples': len(new_trainset.all_ratings())
})
# Update user-item tracking
for user_id, item_id, _ in new_ratings_df.itertuples(index=False, name=None):
self.user_items[user_id].add(item_id)
return rmse
def recommend(self, user_id, n=10):
"""Generate recommendations for a user"""
# Get items the user hasn't rated yet
if user_id in self.user_items:
user_rated_items = self.user_items[user_id]
all_items = set([i for u, i, r in self.model.trainset.all_ratings()])
candidate_items = list(all_items - user_rated_items)
else:
# New user - recommend popular items
return self.recommend_popular(n)
# Generate predictions for candidate items
predictions = [self.model.predict(user_id, item_id) for item_id in candidate_items]
# Sort by predicted rating
predictions.sort(key=lambda x: x.est, reverse=True)
# Return top n recommendations
return [pred.iid for pred in predictions[:n]]
def recommend_popular(self, n=10):
"""Recommend popular items for new users"""
# Calculate item popularity
item_ratings = defaultdict(list)
for u, i, r in self.model.trainset.all_ratings():
item_ratings[i].append(r)
# Calculate average rating per item
item_avg = {i: sum(rs)/len(rs) for i, rs in item_ratings.items() if len(rs) >= 10}
# Sort by average rating
popular_items = sorted(item_avg.items(), key=lambda x: x[1], reverse=True)
# Return top n popular items
return [item for item, rating in popular_items[:n]]
def plot_performance_trend(self):
"""Plot RMSE over time"""
if not self.performance_history:
return "No performance data available yet"
perf_df = pd.DataFrame(self.performance_history)
plt.figure(figsize=(12, 6))
plt.subplot(2, 1, 1)
plt.plot(perf_df['timestamp'], perf_df['rmse'])
plt.title('RMSE over Time')
plt.ylabel('RMSE')
plt.grid(True)
plt.subplot(2, 1, 2)
plt.plot(perf_df['timestamp'], perf_df['n_samples'])
plt.title('Training Set Size over Time')
plt.ylabel('Number of Samples')
plt.grid(True)
plt.tight_layout()
return plt
Example 2: Incremental Fraud Detection
# Example of incremental fraud detection system
from river import ensemble, feature_extraction, linear_model, metrics, preprocessing
import pandas as pd
import json
import time
from datetime import datetime
class IncrementalFraudDetector:
def __init__(self):
# Define the feature preprocessing pipeline
self.preprocessor = (
feature_extraction.TargetEncoder() |
preprocessing.StandardScaler()
)
# Define the model
self.model = ensemble.AdaptiveRandomForestClassifier(
n_models=10,
seed=42
)
# Combined pipeline
self.pipeline = self.preprocessor | self.model
# Metrics to track
self.metrics = {
'accuracy': metrics.Accuracy(),
'auc': metrics.ROCAUC(),
'f1': metrics.F1(),
'precision': metrics.Precision(),
'recall': metrics.Recall(),
'confusion_matrix': metrics.ConfusionMatrix()
}
# Store performance history
self.performance_history = []
self.last_log_time = time.time()
self.predictions_since_last_log = 0
self.log_interval = 1000 # Log after every 1000 predictions
def predict(self, transaction):
"""Make a fraud prediction for a single transaction"""
# Make prediction (before learning)
fraud_score = self.pipeline.predict_proba_one(transaction)
# Extract probability of fraud (class 1)
fraud_probability = fraud_score.get(1, 0)
# Apply threshold for decision
is_fraud = fraud_probability >= 0.5
return {
'is_fraud': bool(is_fraud),
'fraud_probability': fraud_probability,
'timestamp': datetime.now().isoformat()
}
def update(self, transaction, is_fraud):
"""Update the model with a labeled transaction"""
# Get prediction before updating
y_pred = self.pipeline.predict_one(transaction)
# Update metrics
for metric_name, metric in self.metrics.items():
metric.update(is_fraud, y_pred)
# Learn from this example
self.pipeline.learn_one(transaction, is_fraud)
# Increment counter
self.predictions_since_last_log += 1
# Log performance periodically
if self.predictions_since_last_log >= self.log_interval:
self._log_performance()
self.predictions_since_last_log = 0
def _log_performance(self):
"""Log current performance metrics"""
current_time = datetime.now()
# Collect metrics
current_metrics = {
name: metric.get() for name, metric in self.metrics.items()
if name != 'confusion_matrix' # Special handling for confusion matrix
}
# Add confusion matrix
cm = self.metrics['confusion_matrix']
current_metrics['true_positives'] = cm.true_positives
current_metrics['false_positives'] = cm.false_positives
current_metrics['true_negatives'] = cm.true_negatives
current_metrics['false_negatives'] = cm.false_negatives
# Add timestamp
current_metrics['timestamp'] = current_time
# Store metrics
self.performance_history.append(current_metrics)
# Log to console
print(f"[{current_time}] Performance metrics:")
for name, value in current_metrics.items():
if name != 'timestamp':
print(f" {name}: {value}")
def get_performance_trend(self, metric_name='auc'):
"""Get performance trend for a specific metric"""
if not self.performance_history:
return pd.DataFrame()
# Convert to DataFrame
perf_df = pd.DataFrame(self.performance_history)
# Ensure metric exists
if metric_name not in perf_df.columns:
raise ValueError(f"Metric {metric_name} not found in performance history")
return perf_df[['timestamp', metric_name]]
def save_model(self, filepath):
"""Save the model to disk"""
# River models can be pickled
import pickle
with open(filepath, 'wb') as f:
pickle.dump(self.pipeline, f)
@classmethod
def load_model(cls, filepath):
"""Load a model from disk"""
import pickle
detector = cls()
with open(filepath, 'rb') as f:
detector.pipeline = pickle.load(f)
return detector
# Example usage in a transaction processing system
fraud_detector = IncrementalFraudDetector()
def process_transaction(transaction_data):
# Make prediction
prediction = fraud_detector.predict(transaction_data)
# Apply business rules
if prediction['fraud_probability'] > 0.9:
# High confidence fraud - block transaction
action = "block"
elif prediction['fraud_probability'] > 0.7:
# Suspicious - require additional verification
action = "verify"
else:
# Likely legitimate - approve
action = "approve"
# Log transaction and decision
log_transaction(transaction_data, prediction, action)
return {
'action': action,
'fraud_score': prediction['fraud_probability'],
'transaction_id': transaction_data.get('transaction_id')
}
def process_feedback(transaction_id, actual_fraud):
"""Process feedback when fraud status is confirmed"""
# Retrieve transaction data
transaction_data = get_transaction_by_id(transaction_id)
if transaction_data:
# Update model with confirmed label
fraud_detector.update(transaction_data, actual_fraud)
# Log feedback
log_feedback(transaction_id, actual_fraud)
return {"status": "success", "message": "Feedback processed"}
else:
return {"status": "error", "message": "Transaction not found"}
Decision Rules
Use this checklist for incremental ML decisions:
- If your data patterns change frequently, incremental learning usually beats periodic retraining
- If catastrophic forgetting is a concern, add regularization to preserve important weights
- If concept drift is suspected, monitor prediction distributions and trigger retraining when drift exceeds thresholds
- If model updates need validation before deployment, use a staging promotion process
- If online learning is too volatile, use mini-batch updates for stability
Incremental learning adds system complexity. Only use it when the alternative (periodic full retraining) is worse.