Model Compression Techniques for Edge Deployment
Edge devices have limited memory and compute. Full-sized ML models often won’t fit or run too slowly. Model compression reduces model size and computational requirements while preserving accuracy.
This article covers compression techniques for edge deployment.
Understanding Edge Deployment Constraints
Before diving into compression techniques, it’s essential to understand the specific constraints that edge deployment presents:
Memory Limitations
Edge devices typically have limited RAM and storage:
| Device Type | Typical RAM | Storage | Example Use Case |
|---|---|---|---|
| Microcontroller | 32KB-1MB | 128KB-16MB | Sensor processing |
| IoT device | 1-4GB | 4-32GB | Smart home devices |
| Smartphone | 4-12GB | 64-512GB | Mobile applications |
| Edge gateway | 8-64GB | 128GB-2TB | Local data processing |
Computational Constraints
Edge devices have limited processing power:
| Device Type | Typical Processor | Compute Capability |
|---|---|---|
| Microcontroller | ARM Cortex-M | 100-400 MHz, no GPU |
| IoT device | ARM Cortex-A | 1-2 GHz, basic GPU |
| Smartphone | Mobile SoC | 1.8-3 GHz, mobile GPU/NPU |
| Edge gateway | Low-power CPU | 1.6-3.5 GHz, optional GPU |
Power Considerations
Edge devices often operate on battery power, making energy efficiency critical:
- Smartphones: Users expect all-day battery life
- IoT sensors: May need to operate for months on a single battery
- Wearables: Must balance functionality with compact battery size
Connectivity Constraints
Edge devices may have intermittent or bandwidth-limited connectivity:
- Rural IoT: Limited network availability
- Mobile devices: Variable connection quality
- Privacy-sensitive applications: Preference for local processing
Core Model Compression Techniques
Let’s explore the primary techniques for compressing ML models:
1. Weight Pruning
Pruning removes less important connections in a neural network, creating a sparse model that requires less memory and computation.
Magnitude-based Pruning
This approach removes weights below a certain threshold:
import torch
import torch.nn as nn
def magnitude_prune(model, pruning_threshold=0.01):
"""Prune model weights below the specified threshold"""
for name, param in model.named_parameters():
if 'weight' in name: # Only prune weights, not biases
# Create a mask for weights with absolute value below threshold
mask = torch.abs(param.data) > pruning_threshold
# Zero out the weights below threshold
param.data = param.data * mask.float()
return model
Structured vs. Unstructured Pruning
- Unstructured pruning removes individual weights but yields irregular sparsity patterns
- Structured pruning removes entire channels, filters, or neurons, creating more hardware-friendly models
def structured_channel_pruning(model, prune_ratio=0.3):
"""Prune entire channels based on L1-norm"""
for name, module in model.named_modules():
if isinstance(module, nn.Conv2d):
# Calculate L1-norm for each channel
weight = module.weight.data
num_channels = weight.size(0)
l1_norm = torch.sum(torch.abs(weight.view(num_channels, -1)), dim=1)
# Determine number of channels to keep
num_keep = int(num_channels * (1 - prune_ratio))
# Find channels with highest L1-norm
_, indices = torch.topk(l1_norm, num_keep)
mask = torch.zeros(num_channels)
mask[indices] = 1
# Apply mask to keep important channels
module.weight.data = module.weight.data * mask.view(-1, 1, 1, 1).expand_as(module.weight.data)
return model
Iterative Pruning
Gradual pruning with retraining often yields better results:
def iterative_pruning(model, train_loader, val_loader, prune_ratio=0.5, steps=5, epochs_per_step=3):
"""Gradually prune model with retraining between pruning steps"""
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
# Calculate per-step pruning rate
step_prune_ratio = prune_ratio / steps
for step in range(steps):
# Prune model
structured_channel_pruning(model, prune_ratio=step_prune_ratio)
# Retrain model
for epoch in range(epochs_per_step):
train_model(model, train_loader, optimizer, criterion)
# Evaluate model
accuracy = evaluate_model(model, val_loader)
print(f"Step {step+1}/{steps}, Pruned: {(step+1)*step_prune_ratio:.2f}, Accuracy: {accuracy:.2f}%")
return model
2. Quantization
Quantization reduces the precision of model weights and activations, typically from 32-bit floating-point to lower-precision formats.
Post-training Quantization
Quantize a model after training without retraining:
import tensorflow as tf
def post_training_quantize(keras_model, quantize_to_int8=True):
"""Apply post-training quantization to a Keras model"""
converter = tf.lite.TFLiteConverter.from_keras_model(keras_model)
if quantize_to_int8:
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [
tf.lite.OpsSet.TFLITE_BUILTINS_INT8
]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
# Representative dataset is required for int8 quantization
def representative_dataset():
# Generate representative data for calibration
for data, _ in train_dataset.take(100):
yield [data]
converter.representative_dataset = representative_dataset
else:
# Apply float16 quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
quantized_model = converter.convert()
return quantized_model
Quantization-Aware Training
Incorporates quantization effects during training:
def quantization_aware_training(model, train_dataset, val_dataset, epochs=5):
"""Train model with quantization awareness"""
# Apply quantization aware training
quantized_model = tf.keras.models.clone_model(model)
# Specify quantization configuration
quantize_model = tfmot.quantization.keras.quantize_model
# Use default quantization configuration
quantized_model = quantize_model(quantized_model)
# Compile the model
quantized_model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Train the model with quantization awareness
quantized_model.fit(
train_dataset,
validation_data=val_dataset,
epochs=epochs
)
return quantized_model
Mixed-precision Quantization
Different layers may use different precision:
def mixed_precision_quantization(model, layer_quantize_config):
"""Apply different quantization schemes to different layers"""
# Clone the model to avoid modifying the original
mixed_precision_model = tf.keras.models.clone_model(model)
# Create a list to hold the quantized layers
quantized_layers = []
# Process each layer according to the configuration
for i, layer in enumerate(model.layers):
layer_name = layer.name
if layer_name in layer_quantize_config:
config = layer_quantize_config[layer_name]
precision = config.get('precision', 'int8')
if precision == 'int8':
# Apply int8 quantization to this layer
quantized_layer = tfmot.quantization.keras.quantize_annotate_layer(layer)
elif precision == 'float16':
# Apply float16 quantization to this layer
# Note: This is a simplified example, actual implementation may vary
quantized_layer = layer
else:
# No quantization
quantized_layer = layer
quantized_layers.append(quantized_layer)
else:
# No specific configuration, keep original
quantized_layers.append(layer)
# Build a new model with the quantized layers
quantized_model = tf.keras.Sequential(quantized_layers)
# Apply quantization
quantization_config = tfmot.quantization.keras.QuantizeConfig(
# Custom configuration based on layer specs
)
quantized_model = tfmot.quantization.keras.quantize_apply(
quantized_model,
quantization_config
)
return quantized_model
3. Knowledge Distillation
Train a smaller “student” model to mimic a larger “teacher” model:
def knowledge_distillation(teacher_model, student_model, train_loader, val_loader,
temperature=5.0, alpha=0.5, epochs=10):
"""Train a student model to mimic a teacher model's outputs"""
optimizer = torch.optim.Adam(student_model.parameters())
# Set teacher model to evaluation mode
teacher_model.eval()
for epoch in range(epochs):
student_model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
# Forward pass through student model
student_output = student_model(data)
# Forward pass through teacher model (no grad needed)
with torch.no_grad():
teacher_output = teacher_model(data)
# Knowledge distillation loss
distillation_loss = nn.KLDivLoss()(
F.log_softmax(student_output / temperature, dim=1),
F.softmax(teacher_output / temperature, dim=1)
) * (temperature * temperature)
# Standard cross-entropy loss
ce_loss = F.cross_entropy(student_output, target)
# Combine losses: alpha controls the balance between distillation and CE
loss = alpha * distillation_loss + (1 - alpha) * ce_loss
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Evaluate
student_model.eval()
accuracy = evaluate_model(student_model, val_loader)
print(f"Epoch {epoch+1}/{epochs}, Accuracy: {accuracy:.2f}%")
return student_model
4. Low-Rank Factorization
Decompose weight matrices into products of smaller matrices:
import numpy as np
from sklearn.decomposition import TruncatedSVD
def apply_low_rank_factorization(model, rank_ratio=0.5):
"""Apply low-rank factorization to fully connected layers"""
for name, module in model.named_modules():
if isinstance(module, nn.Linear):
# Get the weight matrix
weight = module.weight.data.cpu().numpy()
in_features, out_features = weight.shape
# Calculate target rank
target_rank = int(min(in_features, out_features) * rank_ratio)
# Apply SVD decomposition
svd = TruncatedSVD(n_components=target_rank)
svd.fit(weight)
# Get U, S, V components
U = svd.transform(weight)
S = svd.singular_values_
V = svd.components_
# Create two new smaller layers to replace the original
# First layer: in_features -> target_rank
first_layer = nn.Linear(in_features, target_rank, bias=False)
first_layer.weight.data = torch.tensor(V).float().to(module.weight.device)
# Second layer: target_rank -> out_features (with diagonal S matrix)
second_layer = nn.Linear(target_rank, out_features, bias=module.bias is not None)
US = np.dot(U, np.diag(S))
second_layer.weight.data = torch.tensor(US).float().to(module.weight.device)
if module.bias is not None:
second_layer.bias.data = module.bias.data
# Replace the original module with a sequential container of the two new layers
setattr(model, name, nn.Sequential(first_layer, second_layer))
return model
5. Neural Architecture Search (NAS)
Automated search for efficient model architectures:
import keras_tuner as kt
from tensorflow import keras
def build_model(hp):
"""Define a model-building function with hyperparameters"""
model = keras.Sequential()
# Tune the number of convolution filters
filters = hp.Int('filters', min_value=16, max_value=128, step=16)
model.add(keras.layers.Conv2D(filters=filters, kernel_size=3, activation='relu',
input_shape=(32, 32, 3)))
# Tune whether to use max pooling
if hp.Boolean('pooling'):
model.add(keras.layers.MaxPooling2D())
# Tune number of dense layers and units
for i in range(hp.Int('num_dense_layers', 1, 3)):
units = hp.Int(f'units_{i}', min_value=32, max_value=512, step=32)
model.add(keras.layers.Dense(units=units, activation='relu'))
# Tune dropout rate
dropout_rate = hp.Float('dropout', min_value=0.0, max_value=0.5, step=0.1)
model.add(keras.layers.Dropout(rate=dropout_rate))
# Output layer
model.add(keras.layers.Dense(10, activation='softmax'))
# Tune learning rate
learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='log')
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
def neural_architecture_search(train_dataset, val_dataset, max_trials=10, execution_per_trial=2):
"""Run neural architecture search with model size constraints"""
# Define the tuner
tuner = kt.Hyperband(
build_model,
objective=kt.Objective('val_accuracy', direction='max'),
max_epochs=10,
factor=3,
hyperband_iterations=2,
directory='nas_search',
project_name='edge_model_search'
)
# Add callback to restrict model size
class SizeConstraint(keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
# Get model size in MB
model_size = self.model.count_params() * 4 / (1024 * 1024) # assuming float32
# If model is too large, stop training with penalty
if model_size > 5.0: # 5MB limit example
logs['val_accuracy'] = 0.0 # Penalize
self.model.stop_training = True
# Search for best model
tuner.search(
train_dataset,
validation_data=val_dataset,
epochs=30,
callbacks=[SizeConstraint()]
)
# Get best model
best_model = tuner.get_best_models(num_models=1)[0]
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"Best hyperparameters: {best_hps.values}")
return best_model
Implementation Pipeline for Model Compression
A typical workflow for compressing models for edge deployment:
1. Model Selection and Pre-training
Start with an appropriate base model architecture:
# Example MobileNetV3 small model
base_model = tf.keras.applications.MobileNetV3Small(
input_shape=(224, 224, 3),
include_top=True,
weights='imagenet',
classes=1000
)
# Modify for your specific task
x = base_model.layers[-2].output # Take the feature layer
output = keras.layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs=base_model.input, outputs=output)
# Train the model on your task
model.compile(
optimizer=keras.optimizers.Adam(1e-4),
loss='categorical_crossentropy',
metrics=['accuracy']
)
model.fit(
train_dataset,
validation_data=val_dataset,
epochs=10
)
# Evaluate baseline model
baseline_accuracy = model.evaluate(test_dataset)[1]
baseline_size = model.count_params() * 4 / (1024 * 1024) # Size in MB assuming float32
print(f"Baseline Model - Accuracy: {baseline_accuracy:.4f}, Size: {baseline_size:.2f} MB")
2. Progressive Compression
Apply techniques in a sequence to maintain accuracy:
def compress_model_for_edge(model, train_dataset, val_dataset, test_dataset, target_size_mb=5.0):
"""Apply progressive compression until target size is achieved"""
# Save original model for knowledge distillation
teacher_model = tf.keras.models.clone_model(model)
teacher_model.set_weights(model.get_weights())
# Step 1: Architecture optimization - simplify layers
print("Step 1: Applying architecture simplification...")
model = simplify_architecture(model) # Custom function to reduce model complexity
# Evaluate after architecture simplification
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
accuracy = model.evaluate(test_dataset)[1]
current_size = model.count_params() * 4 / (1024 * 1024) # Size in MB assuming float32
print(f"After architecture simplification - Accuracy: {accuracy:.4f}, Size: {current_size:.2f} MB")
# If we've reached the target size, stop here
if current_size <= target_size_mb:
return model
# Step 2: Apply pruning
print("Step 2: Applying pruning...")
pruning_params = {
'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(
initial_sparsity=0.0,
final_sparsity=0.5,
begin_step=0,
end_step=train_dataset.cardinality().numpy() * 5 # 5 epochs worth of steps
)
}
model_for_pruning = tfmot.sparsity.keras.prune_low_magnitude(model, **pruning_params)
# Compile pruned model
model_for_pruning.compile(
optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy']
)
# Train pruned model
callbacks = [
tfmot.sparsity.keras.UpdatePruningStep(),
tfmot.sparsity.keras.PruningSummaries(log_dir='pruning_logs'),
]
model_for_pruning.fit(
train_dataset,
validation_data=val_dataset,
epochs=5,
callbacks=callbacks
)
# Apply mask to get final pruned model
model = tfmot.sparsity.keras.strip_pruning(model_for_pruning)
# Evaluate after pruning
accuracy = model.evaluate(test_dataset)[1]
# Estimate compressed size (assuming 0.5 sparsity)
current_size = model.count_params() * 4 * 0.5 / (1024 * 1024) # Approximate pruned size
print(f"After pruning - Accuracy: {accuracy:.4f}, Size: {current_size:.2f} MB")
# If we've reached the target size, stop here
if current_size <= target_size_mb:
return model
# Step 3: Apply quantization-aware training
print("Step 3: Applying quantization-aware training...")
q_aware_model = tfmot.quantization.keras.quantize_model(model)
# Compile quantized model
q_aware_model.compile(
optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy']
)
# Apply knowledge distillation for quantization-aware training
def distillation_loss(y_true, y_pred):
# Get teacher predictions
teacher_preds = teacher_model(q_aware_model.input)
# Soften predictions with temperature
temperature = 5.0
soft_teacher = tf.nn.softmax(teacher_preds / temperature)
soft_student = tf.nn.softmax(y_pred / temperature)
# KL divergence loss
distill_loss = tf.keras.losses.KLDivergence()(soft_teacher, soft_student)
# Regular cross-entropy loss
ce_loss = tf.keras.losses.CategoricalCrossentropy()(y_true, y_pred)
# Combine losses
alpha = 0.5
return alpha * distill_loss + (1 - alpha) * ce_loss
# Train with quantization awareness and distillation
q_aware_model.compile(
optimizer='adam',
loss=distillation_loss,
metrics=['accuracy']
)
q_aware_model.fit(
train_dataset,
validation_data=val_dataset,
epochs=5
)
# Evaluate quantized model
accuracy = q_aware_model.evaluate(test_dataset)[1]
# Convert to TFLite to get actual size
converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_tflite_model = converter.convert()
# Save to file to check size
with open('quantized_model.tflite', 'wb') as f:
f.write(quantized_tflite_model)
current_size = os.path.getsize('quantized_model.tflite') / (1024 * 1024)
print(f"After quantization - Accuracy: {accuracy:.4f}, Size: {current_size:.2f} MB")
# Return the final model in TFLite format
return quantized_tflite_model
3. Evaluation Framework
Comprehensive evaluation across key metrics:
def evaluate_compressed_model(original_model, compressed_model, test_dataset, device='cpu'):
"""Evaluate compressed model across multiple dimensions"""
results = {}
# Accuracy evaluation
print("Evaluating accuracy...")
original_accuracy = original_model.evaluate(test_dataset)[1]
# For TFLite models
if isinstance(compressed_model, bytes):
interpreter = tf.lite.Interpreter(model_content=compressed_model)
interpreter.allocate_tensors()
input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]
correct = 0
total = 0
for images, labels in test_dataset:
for i in range(len(images)):
interpreter.set_tensor(input_index, images[i:i+1])
interpreter.invoke()
predictions = interpreter.get_tensor(output_index)
predicted_label = np.argmax(predictions[0])
true_label = np.argmax(labels[i])
if predicted_label == true_label:
correct += 1
total += 1
compressed_accuracy = correct / total
else:
compressed_accuracy = compressed_model.evaluate(test_dataset)[1]
results['original_accuracy'] = original_accuracy
results['compressed_accuracy'] = compressed_accuracy
results['accuracy_retention'] = compressed_accuracy / original_accuracy
# Size evaluation
print("Evaluating model size...")
if isinstance(original_model, tf.keras.Model):
original_size = original_model.count_params() * 4 / (1024 * 1024) # float32 size in MB
else:
original_size = len(original_model) / (1024 * 1024) if isinstance(original_model, bytes) else 0
if isinstance(compressed_model, bytes):
compressed_size = len(compressed_model) / (1024 * 1024)
elif isinstance(compressed_model, tf.keras.Model):
compressed_size = compressed_model.count_params() * 4 / (1024 * 1024)
else:
compressed_size = os.path.getsize(compressed_model) / (1024 * 1024)
results['original_size_mb'] = original_size
results['compressed_size_mb'] = compressed_size
results['compression_ratio'] = original_size / compressed_size
# Latency evaluation
print("Evaluating inference latency...")
if device == 'cpu':
# CPU latency test
if isinstance(original_model, tf.keras.Model):
# Warmup
for _ in range(10):
_ = original_model.predict(test_dataset.take(1))
# Measure
start_time = time.time()
for _ in range(100):
_ = original_model.predict(test_dataset.take(1))
original_latency = (time.time() - start_time) / 100
else:
original_latency = 0 # Unable to measure if not Keras model
if isinstance(compressed_model, bytes):
interpreter = tf.lite.Interpreter(model_content=compressed_model)
interpreter.allocate_tensors()
input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]
# Get a sample input
for images, _ in test_dataset.take(1):
sample_input = images[0:1]
break
# Warmup
for _ in range(10):
interpreter.set_tensor(input_index, sample_input)
interpreter.invoke()
_ = interpreter.get_tensor(output_index)
# Measure
start_time = time.time()
for _ in range(100):
interpreter.set_tensor(input_index, sample_input)
interpreter.invoke()
_ = interpreter.get_tensor(output_index)
compressed_latency = (time.time() - start_time) / 100
elif isinstance(compressed_model, tf.keras.Model):
# Warmup
for _ in range(10):
_ = compressed_model.predict(test_dataset.take(1))
# Measure
start_time = time.time()
for _ in range(100):
_ = compressed_model.predict(test_dataset.take(1))
compressed_latency = (time.time() - start_time) / 100
else:
compressed_latency = 0 # Unable to measure
results['original_latency_ms'] = original_latency * 1000
results['compressed_latency_ms'] = compressed_latency * 1000
results['speedup_factor'] = original_latency / compressed_latency if compressed_latency > 0 else 0
# Battery impact estimation (simplified)
print("Estimating power consumption...")
if device == 'mobile':
# These are rough estimates based on model size and complexity
# For actual measurements, device-specific power monitoring is needed
results['estimated_battery_impact_original'] = original_size * 0.01 # Simplified estimate
results['estimated_battery_impact_compressed'] = compressed_size * 0.01 # Simplified estimate
results['estimated_battery_savings'] = 1 - (results['estimated_battery_impact_compressed'] /
results['estimated_battery_impact_original'])
return results
Platform-Specific Optimization Techniques
TensorFlow Lite for Mobile
def optimize_for_tflite(model, train_dataset, quantize=True, optimize_for_inference=True):
"""Optimize model for TensorFlow Lite deployment"""
converter = tf.lite.TFLiteConverter.from_keras_model(model)
if quantize:
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# Optionally, set to int8 quantization
converter.target_spec.supported_ops = [
tf.lite.OpsSet.TFLITE_BUILTINS_INT8
]
# Representative dataset for int8 quantization
def representative_dataset():
for data, _ in train_dataset.take(100):
yield [data]
converter.representative_dataset = representative_dataset
if optimize_for_inference:
# Enable transforms that optimize for inference
converter.target_spec.supported_ops = [
tf.lite.OpsSet.TFLITE_BUILTINS,
tf.lite.OpsSet.SELECT_TF_OPS
]
tflite_model = converter.convert()
return tflite_model
PyTorch Mobile
def optimize_for_pytorch_mobile(model, example_input, quantize=True):
"""Optimize model for PyTorch Mobile deployment"""
# Trace the model with example input
traced_model = torch.jit.trace(model, example_input)
# Optimize the model
optimized_model = torch.jit.optimize_for_inference(traced_model)
if quantize:
# Quantize the model to int8
quantized_model = torch.quantization.quantize_dynamic(
optimized_model,
{torch.nn.Linear, torch.nn.Conv2d}, # Layers to quantize
dtype=torch.qint8
)
return quantized_model
return optimized_model
ONNX Runtime for Cross-Platform Deployment
def optimize_for_onnx(model, input_shape, quantize=True):
"""Optimize model for ONNX Runtime deployment"""
# Export to ONNX format
dummy_input = torch.randn(1, *input_shape)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
opset_version=11,
input_names=["input"],
output_names=["output"],
dynamic_axes={
"input": {0: "batch_size"},
"output": {0: "batch_size"}
}
)
# Optimize with ONNX Runtime
import onnxruntime as ort
from onnxruntime.quantization import quantize_dynamic, QuantType
# Basic model optimization
session_options = ort.SessionOptions()
session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
# Quantize if requested
if quantize:
quantize_dynamic(
"model.onnx",
"model_quantized.onnx",
weight_type=QuantType.QInt8
)
model_path = "model_quantized.onnx"
else:
model_path = "model.onnx"
# Create inference session
session = ort.InferenceSession(model_path, session_options)
return session
TensorRT for NVIDIA Devices
def optimize_for_tensorrt(saved_model_dir, precision='fp16'):
"""Optimize TensorFlow model for TensorRT acceleration"""
import tensorflow as tf
# Load the SavedModel
model = tf.saved_model.load(saved_model_dir)
# Convert to TensorRT
from tensorflow.python.compiler.tensorrt import trt_convert as trt
# Set conversion parameters based on precision
if precision == 'fp16':
conversion_params = trt.DEFAULT_TRT_CONVERSION_PARAMS
conversion_params = conversion_params._replace(
precision_mode=trt.TrtPrecisionMode.FP16,
max_workspace_size_bytes=8000000000
)
elif precision == 'int8':
conversion_params = trt.DEFAULT_TRT_CONVERSION_PARAMS
conversion_params = conversion_params._replace(
precision_mode=trt.TrtPrecisionMode.INT8,
max_workspace_size_bytes=8000000000,
use_calibration=True
)
else: # fp32
conversion_params = trt.DEFAULT_TRT_CONVERSION_PARAMS
conversion_params = conversion_params._replace(
max_workspace_size_bytes=8000000000
)
# Create converter
converter = trt.TrtGraphConverterV2(
input_saved_model_dir=saved_model_dir,
conversion_params=conversion_params
)
# Convert the model
if precision == 'int8':
# Define calibration data generator
def calibration_input_fn():
for i in range(50): # Provide 50 batches for calibration
# Provide a batch of sample data for calibration
yield [np.random.uniform(size=(1, 224, 224, 3)).astype(np.float32)]
# Perform conversion with calibration
converter.convert(calibration_input_fn=calibration_input_fn)
else:
converter.convert()
# Save the converted model
trt_model_dir = f"{saved_model_dir}_trt_{precision}"
converter.save(trt_model_dir)
return trt_model_dir
Real-World Case Studies
MobileNet for On-Device Image Classification
Optimizing MobileNetV3 for smartphone deployment:
| Technique | Model Size | Accuracy | Latency | Battery Impact |
|---|---|---|---|---|
| Original MobileNetV3-Small | 9.7MB | 68.5% | 26ms | 100% |
| Pruned (50% weights) | 4.9MB | 67.8% | 20ms | 81% |
| Int8 Quantization | 2.5MB | 67.1% | 12ms | 55% |
| Int8 + Pruning | 1.3MB | 66.2% | 9ms | 38% |
Implementation details:
- Magnitude-based pruning with 50% sparsity
- Dynamic range quantization to int8
- TensorFlow Lite conversion with op fusion
- On-device latency measured on Pixel 4
BERT for Edge NLP Applications
Compressing BERT for resource-constrained devices:
| Technique | Model Size | GLUE Score | Latency |
|---|---|---|---|
| BERT-base | 440MB | 79.5 | 220ms |
| DistilBERT | 265MB | 77.1 | 134ms |
| DistilBERT + Quantization | 69MB | 76.2 | 72ms |
| TinyBERT | 57MB | 73.8 | 40ms |
| TinyBERT + Quantization | 14.5MB | 72.9 | 21ms |
Implementation details:
- Knowledge distillation using teacher-student training
- ONNX Runtime quantization
- Model pruning (30% of attention heads)
- Weight sharing between layers
Vision Models for IoT Devices
Deploying vision models on microcontrollers:
| Technique | Model Size | Accuracy | Power Consumption |
|---|---|---|---|
| MobileNetV2 | 14MB | 71.8% | Not deployable |
| MicroNet | 5.6MB | 63.7% | 540mW |
| MCUNet | 512KB | 61.2% | 290mW |
| MCUNet + Quantization | 256KB | 58.4% | 180mW |
Implementation details:
- Neural architecture search for microcontroller constraints
- Int8 quantization with calibration
- Specific operator fusion for target hardware
- Specialized memory management
Best Practices and Common Pitfalls
Best Practices
- Start with efficient architectures: Begin with models designed for efficiency (MobileNet, EfficientNet)
- Measure what matters: Focus on the metrics most relevant to your deployment scenario
- Progressive compression: Apply techniques in sequence, retraining after each step
- Hardware-aware optimization: Optimize for specific hardware capabilities
- Test on target devices: Benchmark on actual deployment hardware
Common Pitfalls
- Over-compression: Compressing beyond a model’s intrinsic information capacity
- Ignoring hardware specifics: Not considering target hardware acceleration capabilities
- Neglecting accuracy-critical paths: Applying uniform compression to all parts of a model
- Inappropriate quantization: Using quantization without calibration on representative data
- Inadequate testing: Not testing for edge cases and robustness after compression
Emerging Trends in Model Compression
As edge AI continues to evolve, several promising directions are emerging:
Neural Architecture Search for Edge
Automated discovery of efficient architectures:
# Example of NAS directed specifically at edge constraints
def edge_constrained_nas(train_dataset, val_dataset,
max_model_size_mb=5,
max_latency_ms=50,
target_device='pixel4'):
"""Neural architecture search with edge deployment constraints"""
# Define search space
def build_model(hp):
model = keras.Sequential()
# Use depthwise separable convolutions for efficiency
for i in range(hp.Int('conv_blocks', 1, 5)):
filters = hp.Int(f'filters_{i}', 8, 128, step=8)
# Depthwise separable convolution
model.add(keras.layers.SeparableConv2D(
filters=filters,
kernel_size=hp.Choice(f'kernel_{i}', [3, 5]),
activation='relu',
padding='same'
))
if hp.Boolean(f'batch_norm_{i}'):
model.add(keras.layers.BatchNormalization())
if hp.Boolean(f'pool_{i}'):
pool_type = hp.Choice(f'pool_type_{i}', ['max', 'avg'])
if pool_type == 'max':
model.add(keras.layers.MaxPooling2D())
else:
model.add(keras.layers.AveragePooling2D())
model.add(keras.layers.GlobalAveragePooling2D())
# Add final dense layers
for i in range(hp.Int('dense_blocks', 0, 2)):
units = hp.Int(f'dense_units_{i}', 32, 256, step=32)
model.add(keras.layers.Dense(units, activation='relu'))
model.add(keras.layers.Dropout(hp.Float(f'dropout_{i}', 0, 0.5, step=0.1)))
model.add(keras.layers.Dense(10, activation='softmax'))
# Compile with appropriate optimizer
lr = hp.Float('learning_rate', 1e-4, 1e-2, sampling='log')
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=lr),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
# Create a custom tuner that considers edge constraints
class EdgeConstrainedTuner(kt.Tuner):
def run_trial(self, trial, *args, **kwargs):
hp = trial.hyperparameters
model = self.hypermodel.build(hp)
# Estimate model size
model_size_mb = model.count_params() * 4 / (1024 * 1024) # float32 size
# If model exceeds size constraints, skip training
if model_size_mb > max_model_size_mb:
return {'val_accuracy': float('-inf')}
# Estimate latency (could be a more sophisticated model)
# This is a simplified placeholder - real implementation would use
# a device-specific latency prediction model
estimated_latency = estimate_latency(model, target_device)
if estimated_latency > max_latency_ms:
return {'val_accuracy': float('-inf')}
# If within constraints, train normally
return super(EdgeConstrainedTuner, self).run_trial(trial, *args, **kwargs)
# Set up the tuner
tuner = EdgeConstrainedTuner(
oracle=kt.oracles.BayesianOptimization(
objective=kt.Objective('val_accuracy', direction='max'),
max_trials=100
),
hypermodel=build_model,
directory='edge_nas',
project_name='edge_model_search'
)
# Search for best model
tuner.search(
train_dataset,
validation_data=val_dataset,
epochs=10
)
# Get best model
best_model = tuner.get_best_models(num_models=1)[0]
return best_model
Hardware-Software Co-design
Developing models in tandem with specialized hardware:
# Example of hardware-aware training
def hardware_aware_training(model, train_dataset, val_dataset, target_hardware='edgetpu'):
"""Incorporate hardware-specific constraints during training"""
# Define hardware-specific constraints
hardware_constraints = {
'edgetpu': {
'supported_ops': ['Conv2D', 'DepthwiseConv2D', 'AveragePooling2D', 'MaxPooling2D',
'Reshape', 'Flatten', 'Dense'],
'preferred_ops': ['DepthwiseConv2D', 'AveragePooling2D'], # More efficient on EdgeTPU
'avoided_ops': ['Transpose', 'MatMul'], # Less efficient
'quantization': 'int8',
'memory_constraint': 8 * 1024 * 1024 # 8MB
},
'snapdragon': {
'supported_ops': ['Conv2D', 'DepthwiseConv2D', 'AveragePooling2D', 'MaxPooling2D',
'LSTM', 'Dense'],
'preferred_ops': ['Conv2D+BiasAdd+Relu'], # Fused operations
'avoided_ops': ['CustomOp'],
'quantization': 'fp16',
'memory_constraint': 32 * 1024 * 1024 # 32MB
}
}
constraints = hardware_constraints[target_hardware]
# Create hardware-aware regularizer
class HardwareAwareRegularizer(tf.keras.regularizers.Regularizer):
def __init__(self, constraints):
self.constraints = constraints
def __call__(self, weights):
# Base regularization (e.g., L2)
reg_loss = tf.reduce_sum(tf.square(weights))
# Add hardware-specific penalties
# For example, penalize large weights that would cause quantization issues
if self.constraints['quantization'] == 'int8':
# Encourage weights to have values that quantize well to int8
# This is a simplified approach - more sophisticated methods exist
scaled_weights = weights * 127.0 # Scale to int8 range
quantized = tf.round(scaled_weights)
quant_error = tf.reduce_mean(tf.square(scaled_weights - quantized))
reg_loss += 0.1 * quant_error
return 0.01 * reg_loss
# Apply hardware-aware constraints to model
for layer in model.layers:
if isinstance(layer, tf.keras.layers.Conv2D) or isinstance(layer, tf.keras.layers.Dense):
# Add hardware-aware regularization
layer.kernel_regularizer = HardwareAwareRegularizer(constraints)
# Custom callback to monitor hardware compatibility
class HardwareCompatibilityCallback(tf.keras.callbacks.Callback):
def __init__(self, constraints):
super(HardwareCompatibilityCallback, self).__init__()
self.constraints = constraints
def on_epoch_end(self, epoch, logs=None):
# Check model size against memory constraints
model_size = self.model.count_params() * 4 # Assuming float32
if model_size > self.constraints['memory_constraint']:
print(f"Warning: Model size ({model_size / 1024 / 1024:.2f}MB) exceeds "
f"hardware constraint ({self.constraints['memory_constraint'] / 1024 / 1024}MB)")
# Additional checks could be implemented here
# Compile model
model.compile(
optimizer=tf.keras.optimizers.Adam(1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Train with hardware awareness
model.fit(
train_dataset,
validation_data=val_dataset,
epochs=10,
callbacks=[HardwareCompatibilityCallback(constraints)]
)
# Apply hardware-specific optimizations
if constraints['quantization'] == 'int8':
# Quantize to int8
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
# Representative dataset is required for int8 quantization
def representative_dataset():
for data, _ in train_dataset.take(100):
yield [data]
converter.representative_dataset = representative_dataset
quantized_model = converter.convert()
return quantized_model
elif constraints['quantization'] == 'fp16':
# Quantize to float16
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
quantized_model = converter.convert()
return quantized_model
return model
Federated Learning with On-Device Compression
Privacy-preserving learning with model compression:
# Federated learning with model compression
import tensorflow_federated as tff
def federated_compression_training(train_datasets, validation_dataset, num_clients=10, num_rounds=5):
"""Federated learning with on-device model compression"""
# Define the model-building function
def create_keras_model():
return tf.keras.Sequential([
tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu',
input_shape=(32, 32, 3)),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
# Define model compression for client updates
def compress_gradients(grads):
"""Apply compression to gradients before sending to server"""
compressed_grads = []
for grad in grads:
if grad is not None:
# Apply top-k sparsification (keep only 10% largest gradients)
flattened = tf.reshape(grad, [-1])
k = tf.cast(tf.math.ceil(0.1 * tf.size(flattened, out_type=tf.float32)), tf.int32)
_, indices = tf.math.top_k(tf.abs(flattened), k=k)
# Create sparse representation
sparse_grad = tf.sparse.SparseTensor(
indices=tf.expand_dims(indices, 1),
values=tf.gather(flattened, indices),
dense_shape=tf.shape(flattened, out_type=tf.int64)
)
compressed_grads.append(sparse_grad)
else:
compressed_grads.append(None)
return compressed_grads
# Define client update function with compression
@tf.function
def client_update(model, dataset, lr):
"""Client training with gradient compression"""
optimizer = tf.keras.optimizers.SGD(learning_rate=lr)
# Define loss function
def loss_fn(y_true, y_pred):
return tf.keras.losses.SparseCategoricalCrossentropy()(y_true, y_pred)
# Training loop
for batch in dataset:
with tf.GradientTape() as tape:
outputs = model(batch['x'])
loss = loss_fn(batch['y'], outputs)
# Get gradients and compress them
grads = tape.gradient(loss, model.trainable_variables)
compressed_grads = compress_gradients(grads)
# Decompress gradients (in real federated learning, this happens on server)
decompressed_grads = []
for grad in compressed_grads:
if grad is not None:
if isinstance(grad, tf.sparse.SparseTensor):
decompressed_grads.append(tf.sparse.to_dense(grad))
else:
decompressed_grads.append(grad)
else:
decompressed_grads.append(None)
# Apply gradients
optimizer.apply_gradients(zip(decompressed_grads, model.trainable_variables))
return model
# Simulate federated learning with compression
clients = list(range(num_clients))
client_datasets = [train_datasets[i] for i in range(num_clients)]
# Initialize global model
global_model = create_keras_model()
for round_num in range(num_rounds):
print(f"Round {round_num+1}/{num_rounds}")
# Client updates
client_models = []
for client_id in clients:
# Create client model by copying global model
client_model = create_keras_model()
client_model.set_weights(global_model.get_weights())
# Perform client update
updated_client_model = client_update(
client_model,
client_datasets[client_id],
lr=0.01
)
client_models.append(updated_client_model)
# Aggregate model updates (simple averaging in this example)
# In practice, more sophisticated aggregation methods may be used
new_weights = []
for i in range(len(global_model.get_weights())):
client_weights = [model.get_weights()[i] for model in client_models]
new_weights.append(np.mean(client_weights, axis=0))
# Update global model
global_model.set_weights(new_weights)
# Evaluate global model
test_loss, test_accuracy = global_model.evaluate(validation_dataset)
print(f"Round {round_num+1} validation: Loss = {test_loss:.4f}, Accuracy = {test_accuracy:.4f}")
# Once training is complete, apply final model compression for deployment
final_compressed_model = post_training_quantize(global_model)
return final_compressed_model
Decision Rules
Use this checklist for model compression decisions:
- If your model doesn’t fit in device memory, quantize first - simplest reduction with least accuracy loss
- If latency is the problem, pruning often helps more than quantization
- If you need the best accuracy at small size, knowledge distillation produces better results than pruning alone
- If your target hardware varies, test on actual devices - simulations often don’t match reality
- If compression degrades accuracy too much, consider whether a smaller model architecture fits your task
Compression trades accuracy for efficiency. Measure the trade-off on your actual task.