Model Compression Techniques for Edge Deployment

Model Compression Techniques for Edge Deployment

Simor Consulting | 22 Aug, 2024 | 13 Mins read

Model Compression Techniques for Edge Deployment

Edge devices have limited memory and compute. Full-sized ML models often won’t fit or run too slowly. Model compression reduces model size and computational requirements while preserving accuracy.

This article covers compression techniques for edge deployment.

Understanding Edge Deployment Constraints

Before diving into compression techniques, it’s essential to understand the specific constraints that edge deployment presents:

Memory Limitations

Edge devices typically have limited RAM and storage:

Device TypeTypical RAMStorageExample Use Case
Microcontroller32KB-1MB128KB-16MBSensor processing
IoT device1-4GB4-32GBSmart home devices
Smartphone4-12GB64-512GBMobile applications
Edge gateway8-64GB128GB-2TBLocal data processing

Computational Constraints

Edge devices have limited processing power:

Device TypeTypical ProcessorCompute Capability
MicrocontrollerARM Cortex-M100-400 MHz, no GPU
IoT deviceARM Cortex-A1-2 GHz, basic GPU
SmartphoneMobile SoC1.8-3 GHz, mobile GPU/NPU
Edge gatewayLow-power CPU1.6-3.5 GHz, optional GPU

Power Considerations

Edge devices often operate on battery power, making energy efficiency critical:

  • Smartphones: Users expect all-day battery life
  • IoT sensors: May need to operate for months on a single battery
  • Wearables: Must balance functionality with compact battery size

Connectivity Constraints

Edge devices may have intermittent or bandwidth-limited connectivity:

  • Rural IoT: Limited network availability
  • Mobile devices: Variable connection quality
  • Privacy-sensitive applications: Preference for local processing

Core Model Compression Techniques

Let’s explore the primary techniques for compressing ML models:

1. Weight Pruning

Pruning removes less important connections in a neural network, creating a sparse model that requires less memory and computation.

Magnitude-based Pruning

This approach removes weights below a certain threshold:

import torch
import torch.nn as nn

def magnitude_prune(model, pruning_threshold=0.01):
    """Prune model weights below the specified threshold"""
    for name, param in model.named_parameters():
        if 'weight' in name:  # Only prune weights, not biases
            # Create a mask for weights with absolute value below threshold
            mask = torch.abs(param.data) > pruning_threshold
            # Zero out the weights below threshold
            param.data = param.data * mask.float()

    return model

Structured vs. Unstructured Pruning

  • Unstructured pruning removes individual weights but yields irregular sparsity patterns
  • Structured pruning removes entire channels, filters, or neurons, creating more hardware-friendly models
def structured_channel_pruning(model, prune_ratio=0.3):
    """Prune entire channels based on L1-norm"""
    for name, module in model.named_modules():
        if isinstance(module, nn.Conv2d):
            # Calculate L1-norm for each channel
            weight = module.weight.data
            num_channels = weight.size(0)
            l1_norm = torch.sum(torch.abs(weight.view(num_channels, -1)), dim=1)

            # Determine number of channels to keep
            num_keep = int(num_channels * (1 - prune_ratio))

            # Find channels with highest L1-norm
            _, indices = torch.topk(l1_norm, num_keep)
            mask = torch.zeros(num_channels)
            mask[indices] = 1

            # Apply mask to keep important channels
            module.weight.data = module.weight.data * mask.view(-1, 1, 1, 1).expand_as(module.weight.data)

    return model

Iterative Pruning

Gradual pruning with retraining often yields better results:

def iterative_pruning(model, train_loader, val_loader, prune_ratio=0.5, steps=5, epochs_per_step=3):
    """Gradually prune model with retraining between pruning steps"""
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()

    # Calculate per-step pruning rate
    step_prune_ratio = prune_ratio / steps

    for step in range(steps):
        # Prune model
        structured_channel_pruning(model, prune_ratio=step_prune_ratio)

        # Retrain model
        for epoch in range(epochs_per_step):
            train_model(model, train_loader, optimizer, criterion)

        # Evaluate model
        accuracy = evaluate_model(model, val_loader)
        print(f"Step {step+1}/{steps}, Pruned: {(step+1)*step_prune_ratio:.2f}, Accuracy: {accuracy:.2f}%")

    return model

2. Quantization

Quantization reduces the precision of model weights and activations, typically from 32-bit floating-point to lower-precision formats.

Post-training Quantization

Quantize a model after training without retraining:

import tensorflow as tf

def post_training_quantize(keras_model, quantize_to_int8=True):
    """Apply post-training quantization to a Keras model"""
    converter = tf.lite.TFLiteConverter.from_keras_model(keras_model)

    if quantize_to_int8:
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.target_spec.supported_ops = [
            tf.lite.OpsSet.TFLITE_BUILTINS_INT8
        ]
        converter.inference_input_type = tf.int8
        converter.inference_output_type = tf.int8

        # Representative dataset is required for int8 quantization
        def representative_dataset():
            # Generate representative data for calibration
            for data, _ in train_dataset.take(100):
                yield [data]

        converter.representative_dataset = representative_dataset
    else:
        # Apply float16 quantization
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.target_spec.supported_types = [tf.float16]

    quantized_model = converter.convert()
    return quantized_model

Quantization-Aware Training

Incorporates quantization effects during training:

def quantization_aware_training(model, train_dataset, val_dataset, epochs=5):
    """Train model with quantization awareness"""

    # Apply quantization aware training
    quantized_model = tf.keras.models.clone_model(model)

    # Specify quantization configuration
    quantize_model = tfmot.quantization.keras.quantize_model

    # Use default quantization configuration
    quantized_model = quantize_model(quantized_model)

    # Compile the model
    quantized_model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    # Train the model with quantization awareness
    quantized_model.fit(
        train_dataset,
        validation_data=val_dataset,
        epochs=epochs
    )

    return quantized_model

Mixed-precision Quantization

Different layers may use different precision:

def mixed_precision_quantization(model, layer_quantize_config):
    """Apply different quantization schemes to different layers"""

    # Clone the model to avoid modifying the original
    mixed_precision_model = tf.keras.models.clone_model(model)

    # Create a list to hold the quantized layers
    quantized_layers = []

    # Process each layer according to the configuration
    for i, layer in enumerate(model.layers):
        layer_name = layer.name

        if layer_name in layer_quantize_config:
            config = layer_quantize_config[layer_name]
            precision = config.get('precision', 'int8')

            if precision == 'int8':
                # Apply int8 quantization to this layer
                quantized_layer = tfmot.quantization.keras.quantize_annotate_layer(layer)
            elif precision == 'float16':
                # Apply float16 quantization to this layer
                # Note: This is a simplified example, actual implementation may vary
                quantized_layer = layer
            else:
                # No quantization
                quantized_layer = layer

            quantized_layers.append(quantized_layer)
        else:
            # No specific configuration, keep original
            quantized_layers.append(layer)

    # Build a new model with the quantized layers
    quantized_model = tf.keras.Sequential(quantized_layers)

    # Apply quantization
    quantization_config = tfmot.quantization.keras.QuantizeConfig(
        # Custom configuration based on layer specs
    )

    quantized_model = tfmot.quantization.keras.quantize_apply(
        quantized_model,
        quantization_config
    )

    return quantized_model

3. Knowledge Distillation

Train a smaller “student” model to mimic a larger “teacher” model:

def knowledge_distillation(teacher_model, student_model, train_loader, val_loader,
                          temperature=5.0, alpha=0.5, epochs=10):
    """Train a student model to mimic a teacher model's outputs"""

    optimizer = torch.optim.Adam(student_model.parameters())

    # Set teacher model to evaluation mode
    teacher_model.eval()

    for epoch in range(epochs):
        student_model.train()

        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)

            # Forward pass through student model
            student_output = student_model(data)

            # Forward pass through teacher model (no grad needed)
            with torch.no_grad():
                teacher_output = teacher_model(data)

            # Knowledge distillation loss
            distillation_loss = nn.KLDivLoss()(
                F.log_softmax(student_output / temperature, dim=1),
                F.softmax(teacher_output / temperature, dim=1)
            ) * (temperature * temperature)

            # Standard cross-entropy loss
            ce_loss = F.cross_entropy(student_output, target)

            # Combine losses: alpha controls the balance between distillation and CE
            loss = alpha * distillation_loss + (1 - alpha) * ce_loss

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Evaluate
        student_model.eval()
        accuracy = evaluate_model(student_model, val_loader)
        print(f"Epoch {epoch+1}/{epochs}, Accuracy: {accuracy:.2f}%")

    return student_model

4. Low-Rank Factorization

Decompose weight matrices into products of smaller matrices:

import numpy as np
from sklearn.decomposition import TruncatedSVD

def apply_low_rank_factorization(model, rank_ratio=0.5):
    """Apply low-rank factorization to fully connected layers"""

    for name, module in model.named_modules():
        if isinstance(module, nn.Linear):
            # Get the weight matrix
            weight = module.weight.data.cpu().numpy()
            in_features, out_features = weight.shape

            # Calculate target rank
            target_rank = int(min(in_features, out_features) * rank_ratio)

            # Apply SVD decomposition
            svd = TruncatedSVD(n_components=target_rank)
            svd.fit(weight)

            # Get U, S, V components
            U = svd.transform(weight)
            S = svd.singular_values_
            V = svd.components_

            # Create two new smaller layers to replace the original
            # First layer: in_features -> target_rank
            first_layer = nn.Linear(in_features, target_rank, bias=False)
            first_layer.weight.data = torch.tensor(V).float().to(module.weight.device)

            # Second layer: target_rank -> out_features (with diagonal S matrix)
            second_layer = nn.Linear(target_rank, out_features, bias=module.bias is not None)
            US = np.dot(U, np.diag(S))
            second_layer.weight.data = torch.tensor(US).float().to(module.weight.device)

            if module.bias is not None:
                second_layer.bias.data = module.bias.data

            # Replace the original module with a sequential container of the two new layers
            setattr(model, name, nn.Sequential(first_layer, second_layer))

    return model

5. Neural Architecture Search (NAS)

Automated search for efficient model architectures:

import keras_tuner as kt
from tensorflow import keras

def build_model(hp):
    """Define a model-building function with hyperparameters"""
    model = keras.Sequential()

    # Tune the number of convolution filters
    filters = hp.Int('filters', min_value=16, max_value=128, step=16)
    model.add(keras.layers.Conv2D(filters=filters, kernel_size=3, activation='relu',
                                 input_shape=(32, 32, 3)))

    # Tune whether to use max pooling
    if hp.Boolean('pooling'):
        model.add(keras.layers.MaxPooling2D())

    # Tune number of dense layers and units
    for i in range(hp.Int('num_dense_layers', 1, 3)):
        units = hp.Int(f'units_{i}', min_value=32, max_value=512, step=32)
        model.add(keras.layers.Dense(units=units, activation='relu'))

    # Tune dropout rate
    dropout_rate = hp.Float('dropout', min_value=0.0, max_value=0.5, step=0.1)
    model.add(keras.layers.Dropout(rate=dropout_rate))

    # Output layer
    model.add(keras.layers.Dense(10, activation='softmax'))

    # Tune learning rate
    learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='log')
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

def neural_architecture_search(train_dataset, val_dataset, max_trials=10, execution_per_trial=2):
    """Run neural architecture search with model size constraints"""

    # Define the tuner
    tuner = kt.Hyperband(
        build_model,
        objective=kt.Objective('val_accuracy', direction='max'),
        max_epochs=10,
        factor=3,
        hyperband_iterations=2,
        directory='nas_search',
        project_name='edge_model_search'
    )

    # Add callback to restrict model size
    class SizeConstraint(keras.callbacks.Callback):
        def on_epoch_end(self, epoch, logs=None):
            # Get model size in MB
            model_size = self.model.count_params() * 4 / (1024 * 1024)  # assuming float32

            # If model is too large, stop training with penalty
            if model_size > 5.0:  # 5MB limit example
                logs['val_accuracy'] = 0.0  # Penalize
                self.model.stop_training = True

    # Search for best model
    tuner.search(
        train_dataset,
        validation_data=val_dataset,
        epochs=30,
        callbacks=[SizeConstraint()]
    )

    # Get best model
    best_model = tuner.get_best_models(num_models=1)[0]
    best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

    print(f"Best hyperparameters: {best_hps.values}")

    return best_model

Implementation Pipeline for Model Compression

A typical workflow for compressing models for edge deployment:

1. Model Selection and Pre-training

Start with an appropriate base model architecture:

# Example MobileNetV3 small model
base_model = tf.keras.applications.MobileNetV3Small(
    input_shape=(224, 224, 3),
    include_top=True,
    weights='imagenet',
    classes=1000
)

# Modify for your specific task
x = base_model.layers[-2].output  # Take the feature layer
output = keras.layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs=base_model.input, outputs=output)

# Train the model on your task
model.compile(
    optimizer=keras.optimizers.Adam(1e-4),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=10
)

# Evaluate baseline model
baseline_accuracy = model.evaluate(test_dataset)[1]
baseline_size = model.count_params() * 4 / (1024 * 1024)  # Size in MB assuming float32
print(f"Baseline Model - Accuracy: {baseline_accuracy:.4f}, Size: {baseline_size:.2f} MB")

2. Progressive Compression

Apply techniques in a sequence to maintain accuracy:

def compress_model_for_edge(model, train_dataset, val_dataset, test_dataset, target_size_mb=5.0):
    """Apply progressive compression until target size is achieved"""

    # Save original model for knowledge distillation
    teacher_model = tf.keras.models.clone_model(model)
    teacher_model.set_weights(model.get_weights())

    # Step 1: Architecture optimization - simplify layers
    print("Step 1: Applying architecture simplification...")
    model = simplify_architecture(model)  # Custom function to reduce model complexity

    # Evaluate after architecture simplification
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    accuracy = model.evaluate(test_dataset)[1]
    current_size = model.count_params() * 4 / (1024 * 1024)  # Size in MB assuming float32
    print(f"After architecture simplification - Accuracy: {accuracy:.4f}, Size: {current_size:.2f} MB")

    # If we've reached the target size, stop here
    if current_size <= target_size_mb:
        return model

    # Step 2: Apply pruning
    print("Step 2: Applying pruning...")
    pruning_params = {
        'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(
            initial_sparsity=0.0,
            final_sparsity=0.5,
            begin_step=0,
            end_step=train_dataset.cardinality().numpy() * 5  # 5 epochs worth of steps
        )
    }

    model_for_pruning = tfmot.sparsity.keras.prune_low_magnitude(model, **pruning_params)

    # Compile pruned model
    model_for_pruning.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    # Train pruned model
    callbacks = [
        tfmot.sparsity.keras.UpdatePruningStep(),
        tfmot.sparsity.keras.PruningSummaries(log_dir='pruning_logs'),
    ]

    model_for_pruning.fit(
        train_dataset,
        validation_data=val_dataset,
        epochs=5,
        callbacks=callbacks
    )

    # Apply mask to get final pruned model
    model = tfmot.sparsity.keras.strip_pruning(model_for_pruning)

    # Evaluate after pruning
    accuracy = model.evaluate(test_dataset)[1]

    # Estimate compressed size (assuming 0.5 sparsity)
    current_size = model.count_params() * 4 * 0.5 / (1024 * 1024)  # Approximate pruned size
    print(f"After pruning - Accuracy: {accuracy:.4f}, Size: {current_size:.2f} MB")

    # If we've reached the target size, stop here
    if current_size <= target_size_mb:
        return model

    # Step 3: Apply quantization-aware training
    print("Step 3: Applying quantization-aware training...")
    q_aware_model = tfmot.quantization.keras.quantize_model(model)

    # Compile quantized model
    q_aware_model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    # Apply knowledge distillation for quantization-aware training
    def distillation_loss(y_true, y_pred):
        # Get teacher predictions
        teacher_preds = teacher_model(q_aware_model.input)
        # Soften predictions with temperature
        temperature = 5.0
        soft_teacher = tf.nn.softmax(teacher_preds / temperature)
        soft_student = tf.nn.softmax(y_pred / temperature)
        # KL divergence loss
        distill_loss = tf.keras.losses.KLDivergence()(soft_teacher, soft_student)
        # Regular cross-entropy loss
        ce_loss = tf.keras.losses.CategoricalCrossentropy()(y_true, y_pred)
        # Combine losses
        alpha = 0.5
        return alpha * distill_loss + (1 - alpha) * ce_loss

    # Train with quantization awareness and distillation
    q_aware_model.compile(
        optimizer='adam',
        loss=distillation_loss,
        metrics=['accuracy']
    )

    q_aware_model.fit(
        train_dataset,
        validation_data=val_dataset,
        epochs=5
    )

    # Evaluate quantized model
    accuracy = q_aware_model.evaluate(test_dataset)[1]

    # Convert to TFLite to get actual size
    converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    quantized_tflite_model = converter.convert()

    # Save to file to check size
    with open('quantized_model.tflite', 'wb') as f:
        f.write(quantized_tflite_model)

    current_size = os.path.getsize('quantized_model.tflite') / (1024 * 1024)
    print(f"After quantization - Accuracy: {accuracy:.4f}, Size: {current_size:.2f} MB")

    # Return the final model in TFLite format
    return quantized_tflite_model

3. Evaluation Framework

Comprehensive evaluation across key metrics:

def evaluate_compressed_model(original_model, compressed_model, test_dataset, device='cpu'):
    """Evaluate compressed model across multiple dimensions"""

    results = {}

    # Accuracy evaluation
    print("Evaluating accuracy...")
    original_accuracy = original_model.evaluate(test_dataset)[1]

    # For TFLite models
    if isinstance(compressed_model, bytes):
        interpreter = tf.lite.Interpreter(model_content=compressed_model)
        interpreter.allocate_tensors()

        input_index = interpreter.get_input_details()[0]["index"]
        output_index = interpreter.get_output_details()[0]["index"]

        correct = 0
        total = 0

        for images, labels in test_dataset:
            for i in range(len(images)):
                interpreter.set_tensor(input_index, images[i:i+1])
                interpreter.invoke()
                predictions = interpreter.get_tensor(output_index)
                predicted_label = np.argmax(predictions[0])
                true_label = np.argmax(labels[i])
                if predicted_label == true_label:
                    correct += 1
                total += 1

        compressed_accuracy = correct / total
    else:
        compressed_accuracy = compressed_model.evaluate(test_dataset)[1]

    results['original_accuracy'] = original_accuracy
    results['compressed_accuracy'] = compressed_accuracy
    results['accuracy_retention'] = compressed_accuracy / original_accuracy

    # Size evaluation
    print("Evaluating model size...")
    if isinstance(original_model, tf.keras.Model):
        original_size = original_model.count_params() * 4 / (1024 * 1024)  # float32 size in MB
    else:
        original_size = len(original_model) / (1024 * 1024) if isinstance(original_model, bytes) else 0

    if isinstance(compressed_model, bytes):
        compressed_size = len(compressed_model) / (1024 * 1024)
    elif isinstance(compressed_model, tf.keras.Model):
        compressed_size = compressed_model.count_params() * 4 / (1024 * 1024)
    else:
        compressed_size = os.path.getsize(compressed_model) / (1024 * 1024)

    results['original_size_mb'] = original_size
    results['compressed_size_mb'] = compressed_size
    results['compression_ratio'] = original_size / compressed_size

    # Latency evaluation
    print("Evaluating inference latency...")

    if device == 'cpu':
        # CPU latency test
        if isinstance(original_model, tf.keras.Model):
            # Warmup
            for _ in range(10):
                _ = original_model.predict(test_dataset.take(1))

            # Measure
            start_time = time.time()
            for _ in range(100):
                _ = original_model.predict(test_dataset.take(1))
            original_latency = (time.time() - start_time) / 100
        else:
            original_latency = 0  # Unable to measure if not Keras model

        if isinstance(compressed_model, bytes):
            interpreter = tf.lite.Interpreter(model_content=compressed_model)
            interpreter.allocate_tensors()

            input_index = interpreter.get_input_details()[0]["index"]
            output_index = interpreter.get_output_details()[0]["index"]

            # Get a sample input
            for images, _ in test_dataset.take(1):
                sample_input = images[0:1]
                break

            # Warmup
            for _ in range(10):
                interpreter.set_tensor(input_index, sample_input)
                interpreter.invoke()
                _ = interpreter.get_tensor(output_index)

            # Measure
            start_time = time.time()
            for _ in range(100):
                interpreter.set_tensor(input_index, sample_input)
                interpreter.invoke()
                _ = interpreter.get_tensor(output_index)
            compressed_latency = (time.time() - start_time) / 100
        elif isinstance(compressed_model, tf.keras.Model):
            # Warmup
            for _ in range(10):
                _ = compressed_model.predict(test_dataset.take(1))

            # Measure
            start_time = time.time()
            for _ in range(100):
                _ = compressed_model.predict(test_dataset.take(1))
            compressed_latency = (time.time() - start_time) / 100
        else:
            compressed_latency = 0  # Unable to measure

        results['original_latency_ms'] = original_latency * 1000
        results['compressed_latency_ms'] = compressed_latency * 1000
        results['speedup_factor'] = original_latency / compressed_latency if compressed_latency > 0 else 0

    # Battery impact estimation (simplified)
    print("Estimating power consumption...")
    if device == 'mobile':
        # These are rough estimates based on model size and complexity
        # For actual measurements, device-specific power monitoring is needed
        results['estimated_battery_impact_original'] = original_size * 0.01  # Simplified estimate
        results['estimated_battery_impact_compressed'] = compressed_size * 0.01  # Simplified estimate
        results['estimated_battery_savings'] = 1 - (results['estimated_battery_impact_compressed'] /
                                                  results['estimated_battery_impact_original'])

    return results

Platform-Specific Optimization Techniques

TensorFlow Lite for Mobile

def optimize_for_tflite(model, train_dataset, quantize=True, optimize_for_inference=True):
    """Optimize model for TensorFlow Lite deployment"""
    converter = tf.lite.TFLiteConverter.from_keras_model(model)

    if quantize:
        converter.optimizations = [tf.lite.Optimize.DEFAULT]

        # Optionally, set to int8 quantization
        converter.target_spec.supported_ops = [
            tf.lite.OpsSet.TFLITE_BUILTINS_INT8
        ]

        # Representative dataset for int8 quantization
        def representative_dataset():
            for data, _ in train_dataset.take(100):
                yield [data]

        converter.representative_dataset = representative_dataset

    if optimize_for_inference:
        # Enable transforms that optimize for inference
        converter.target_spec.supported_ops = [
            tf.lite.OpsSet.TFLITE_BUILTINS,
            tf.lite.OpsSet.SELECT_TF_OPS
        ]

    tflite_model = converter.convert()

    return tflite_model

PyTorch Mobile

def optimize_for_pytorch_mobile(model, example_input, quantize=True):
    """Optimize model for PyTorch Mobile deployment"""
    # Trace the model with example input
    traced_model = torch.jit.trace(model, example_input)

    # Optimize the model
    optimized_model = torch.jit.optimize_for_inference(traced_model)

    if quantize:
        # Quantize the model to int8
        quantized_model = torch.quantization.quantize_dynamic(
            optimized_model,
            {torch.nn.Linear, torch.nn.Conv2d},  # Layers to quantize
            dtype=torch.qint8
        )
        return quantized_model

    return optimized_model

ONNX Runtime for Cross-Platform Deployment

def optimize_for_onnx(model, input_shape, quantize=True):
    """Optimize model for ONNX Runtime deployment"""
    # Export to ONNX format
    dummy_input = torch.randn(1, *input_shape)
    torch.onnx.export(
        model,
        dummy_input,
        "model.onnx",
        opset_version=11,
        input_names=["input"],
        output_names=["output"],
        dynamic_axes={
            "input": {0: "batch_size"},
            "output": {0: "batch_size"}
        }
    )

    # Optimize with ONNX Runtime
    import onnxruntime as ort
    from onnxruntime.quantization import quantize_dynamic, QuantType

    # Basic model optimization
    session_options = ort.SessionOptions()
    session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL

    # Quantize if requested
    if quantize:
        quantize_dynamic(
            "model.onnx",
            "model_quantized.onnx",
            weight_type=QuantType.QInt8
        )
        model_path = "model_quantized.onnx"
    else:
        model_path = "model.onnx"

    # Create inference session
    session = ort.InferenceSession(model_path, session_options)

    return session

TensorRT for NVIDIA Devices

def optimize_for_tensorrt(saved_model_dir, precision='fp16'):
    """Optimize TensorFlow model for TensorRT acceleration"""
    import tensorflow as tf

    # Load the SavedModel
    model = tf.saved_model.load(saved_model_dir)

    # Convert to TensorRT
    from tensorflow.python.compiler.tensorrt import trt_convert as trt

    # Set conversion parameters based on precision
    if precision == 'fp16':
        conversion_params = trt.DEFAULT_TRT_CONVERSION_PARAMS
        conversion_params = conversion_params._replace(
            precision_mode=trt.TrtPrecisionMode.FP16,
            max_workspace_size_bytes=8000000000
        )
    elif precision == 'int8':
        conversion_params = trt.DEFAULT_TRT_CONVERSION_PARAMS
        conversion_params = conversion_params._replace(
            precision_mode=trt.TrtPrecisionMode.INT8,
            max_workspace_size_bytes=8000000000,
            use_calibration=True
        )
    else:  # fp32
        conversion_params = trt.DEFAULT_TRT_CONVERSION_PARAMS
        conversion_params = conversion_params._replace(
            max_workspace_size_bytes=8000000000
        )

    # Create converter
    converter = trt.TrtGraphConverterV2(
        input_saved_model_dir=saved_model_dir,
        conversion_params=conversion_params
    )

    # Convert the model
    if precision == 'int8':
        # Define calibration data generator
        def calibration_input_fn():
            for i in range(50):  # Provide 50 batches for calibration
                # Provide a batch of sample data for calibration
                yield [np.random.uniform(size=(1, 224, 224, 3)).astype(np.float32)]

        # Perform conversion with calibration
        converter.convert(calibration_input_fn=calibration_input_fn)
    else:
        converter.convert()

    # Save the converted model
    trt_model_dir = f"{saved_model_dir}_trt_{precision}"
    converter.save(trt_model_dir)

    return trt_model_dir

Real-World Case Studies

MobileNet for On-Device Image Classification

Optimizing MobileNetV3 for smartphone deployment:

TechniqueModel SizeAccuracyLatencyBattery Impact
Original MobileNetV3-Small9.7MB68.5%26ms100%
Pruned (50% weights)4.9MB67.8%20ms81%
Int8 Quantization2.5MB67.1%12ms55%
Int8 + Pruning1.3MB66.2%9ms38%

Implementation details:

  • Magnitude-based pruning with 50% sparsity
  • Dynamic range quantization to int8
  • TensorFlow Lite conversion with op fusion
  • On-device latency measured on Pixel 4

BERT for Edge NLP Applications

Compressing BERT for resource-constrained devices:

TechniqueModel SizeGLUE ScoreLatency
BERT-base440MB79.5220ms
DistilBERT265MB77.1134ms
DistilBERT + Quantization69MB76.272ms
TinyBERT57MB73.840ms
TinyBERT + Quantization14.5MB72.921ms

Implementation details:

  • Knowledge distillation using teacher-student training
  • ONNX Runtime quantization
  • Model pruning (30% of attention heads)
  • Weight sharing between layers

Vision Models for IoT Devices

Deploying vision models on microcontrollers:

TechniqueModel SizeAccuracyPower Consumption
MobileNetV214MB71.8%Not deployable
MicroNet5.6MB63.7%540mW
MCUNet512KB61.2%290mW
MCUNet + Quantization256KB58.4%180mW

Implementation details:

  • Neural architecture search for microcontroller constraints
  • Int8 quantization with calibration
  • Specific operator fusion for target hardware
  • Specialized memory management

Best Practices and Common Pitfalls

Best Practices

  1. Start with efficient architectures: Begin with models designed for efficiency (MobileNet, EfficientNet)
  2. Measure what matters: Focus on the metrics most relevant to your deployment scenario
  3. Progressive compression: Apply techniques in sequence, retraining after each step
  4. Hardware-aware optimization: Optimize for specific hardware capabilities
  5. Test on target devices: Benchmark on actual deployment hardware

Common Pitfalls

  1. Over-compression: Compressing beyond a model’s intrinsic information capacity
  2. Ignoring hardware specifics: Not considering target hardware acceleration capabilities
  3. Neglecting accuracy-critical paths: Applying uniform compression to all parts of a model
  4. Inappropriate quantization: Using quantization without calibration on representative data
  5. Inadequate testing: Not testing for edge cases and robustness after compression

As edge AI continues to evolve, several promising directions are emerging:

Neural Architecture Search for Edge

Automated discovery of efficient architectures:

# Example of NAS directed specifically at edge constraints
def edge_constrained_nas(train_dataset, val_dataset,
                         max_model_size_mb=5,
                         max_latency_ms=50,
                         target_device='pixel4'):
    """Neural architecture search with edge deployment constraints"""

    # Define search space
    def build_model(hp):
        model = keras.Sequential()

        # Use depthwise separable convolutions for efficiency
        for i in range(hp.Int('conv_blocks', 1, 5)):
            filters = hp.Int(f'filters_{i}', 8, 128, step=8)

            # Depthwise separable convolution
            model.add(keras.layers.SeparableConv2D(
                filters=filters,
                kernel_size=hp.Choice(f'kernel_{i}', [3, 5]),
                activation='relu',
                padding='same'
            ))

            if hp.Boolean(f'batch_norm_{i}'):
                model.add(keras.layers.BatchNormalization())

            if hp.Boolean(f'pool_{i}'):
                pool_type = hp.Choice(f'pool_type_{i}', ['max', 'avg'])
                if pool_type == 'max':
                    model.add(keras.layers.MaxPooling2D())
                else:
                    model.add(keras.layers.AveragePooling2D())

        model.add(keras.layers.GlobalAveragePooling2D())

        # Add final dense layers
        for i in range(hp.Int('dense_blocks', 0, 2)):
            units = hp.Int(f'dense_units_{i}', 32, 256, step=32)
            model.add(keras.layers.Dense(units, activation='relu'))
            model.add(keras.layers.Dropout(hp.Float(f'dropout_{i}', 0, 0.5, step=0.1)))

        model.add(keras.layers.Dense(10, activation='softmax'))

        # Compile with appropriate optimizer
        lr = hp.Float('learning_rate', 1e-4, 1e-2, sampling='log')
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=lr),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

        return model

    # Create a custom tuner that considers edge constraints
    class EdgeConstrainedTuner(kt.Tuner):
        def run_trial(self, trial, *args, **kwargs):
            hp = trial.hyperparameters
            model = self.hypermodel.build(hp)

            # Estimate model size
            model_size_mb = model.count_params() * 4 / (1024 * 1024)  # float32 size

            # If model exceeds size constraints, skip training
            if model_size_mb > max_model_size_mb:
                return {'val_accuracy': float('-inf')}

            # Estimate latency (could be a more sophisticated model)
            # This is a simplified placeholder - real implementation would use
            # a device-specific latency prediction model
            estimated_latency = estimate_latency(model, target_device)

            if estimated_latency > max_latency_ms:
                return {'val_accuracy': float('-inf')}

            # If within constraints, train normally
            return super(EdgeConstrainedTuner, self).run_trial(trial, *args, **kwargs)

    # Set up the tuner
    tuner = EdgeConstrainedTuner(
        oracle=kt.oracles.BayesianOptimization(
            objective=kt.Objective('val_accuracy', direction='max'),
            max_trials=100
        ),
        hypermodel=build_model,
        directory='edge_nas',
        project_name='edge_model_search'
    )

    # Search for best model
    tuner.search(
        train_dataset,
        validation_data=val_dataset,
        epochs=10
    )

    # Get best model
    best_model = tuner.get_best_models(num_models=1)[0]

    return best_model

Hardware-Software Co-design

Developing models in tandem with specialized hardware:

# Example of hardware-aware training
def hardware_aware_training(model, train_dataset, val_dataset, target_hardware='edgetpu'):
    """Incorporate hardware-specific constraints during training"""

    # Define hardware-specific constraints
    hardware_constraints = {
        'edgetpu': {
            'supported_ops': ['Conv2D', 'DepthwiseConv2D', 'AveragePooling2D', 'MaxPooling2D',
                             'Reshape', 'Flatten', 'Dense'],
            'preferred_ops': ['DepthwiseConv2D', 'AveragePooling2D'],  # More efficient on EdgeTPU
            'avoided_ops': ['Transpose', 'MatMul'],  # Less efficient
            'quantization': 'int8',
            'memory_constraint': 8 * 1024 * 1024  # 8MB
        },
        'snapdragon': {
            'supported_ops': ['Conv2D', 'DepthwiseConv2D', 'AveragePooling2D', 'MaxPooling2D',
                             'LSTM', 'Dense'],
            'preferred_ops': ['Conv2D+BiasAdd+Relu'],  # Fused operations
            'avoided_ops': ['CustomOp'],
            'quantization': 'fp16',
            'memory_constraint': 32 * 1024 * 1024  # 32MB
        }
    }

    constraints = hardware_constraints[target_hardware]

    # Create hardware-aware regularizer
    class HardwareAwareRegularizer(tf.keras.regularizers.Regularizer):
        def __init__(self, constraints):
            self.constraints = constraints

        def __call__(self, weights):
            # Base regularization (e.g., L2)
            reg_loss = tf.reduce_sum(tf.square(weights))

            # Add hardware-specific penalties
            # For example, penalize large weights that would cause quantization issues
            if self.constraints['quantization'] == 'int8':
                # Encourage weights to have values that quantize well to int8
                # This is a simplified approach - more sophisticated methods exist
                scaled_weights = weights * 127.0  # Scale to int8 range
                quantized = tf.round(scaled_weights)
                quant_error = tf.reduce_mean(tf.square(scaled_weights - quantized))
                reg_loss += 0.1 * quant_error

            return 0.01 * reg_loss

    # Apply hardware-aware constraints to model
    for layer in model.layers:
        if isinstance(layer, tf.keras.layers.Conv2D) or isinstance(layer, tf.keras.layers.Dense):
            # Add hardware-aware regularization
            layer.kernel_regularizer = HardwareAwareRegularizer(constraints)

    # Custom callback to monitor hardware compatibility
    class HardwareCompatibilityCallback(tf.keras.callbacks.Callback):
        def __init__(self, constraints):
            super(HardwareCompatibilityCallback, self).__init__()
            self.constraints = constraints

        def on_epoch_end(self, epoch, logs=None):
            # Check model size against memory constraints
            model_size = self.model.count_params() * 4  # Assuming float32
            if model_size > self.constraints['memory_constraint']:
                print(f"Warning: Model size ({model_size / 1024 / 1024:.2f}MB) exceeds "
                      f"hardware constraint ({self.constraints['memory_constraint'] / 1024 / 1024}MB)")

            # Additional checks could be implemented here

    # Compile model
    model.compile(
        optimizer=tf.keras.optimizers.Adam(1e-3),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    # Train with hardware awareness
    model.fit(
        train_dataset,
        validation_data=val_dataset,
        epochs=10,
        callbacks=[HardwareCompatibilityCallback(constraints)]
    )

    # Apply hardware-specific optimizations
    if constraints['quantization'] == 'int8':
        # Quantize to int8
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
        converter.inference_input_type = tf.int8
        converter.inference_output_type = tf.int8

        # Representative dataset is required for int8 quantization
        def representative_dataset():
            for data, _ in train_dataset.take(100):
                yield [data]

        converter.representative_dataset = representative_dataset

        quantized_model = converter.convert()
        return quantized_model

    elif constraints['quantization'] == 'fp16':
        # Quantize to float16
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.target_spec.supported_types = [tf.float16]

        quantized_model = converter.convert()
        return quantized_model

    return model

Federated Learning with On-Device Compression

Privacy-preserving learning with model compression:

# Federated learning with model compression
import tensorflow_federated as tff

def federated_compression_training(train_datasets, validation_dataset, num_clients=10, num_rounds=5):
    """Federated learning with on-device model compression"""

    # Define the model-building function
    def create_keras_model():
        return tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu',
                                  input_shape=(32, 32, 3)),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax')
        ])

    # Define model compression for client updates
    def compress_gradients(grads):
        """Apply compression to gradients before sending to server"""
        compressed_grads = []
        for grad in grads:
            if grad is not None:
                # Apply top-k sparsification (keep only 10% largest gradients)
                flattened = tf.reshape(grad, [-1])
                k = tf.cast(tf.math.ceil(0.1 * tf.size(flattened, out_type=tf.float32)), tf.int32)
                _, indices = tf.math.top_k(tf.abs(flattened), k=k)

                # Create sparse representation
                sparse_grad = tf.sparse.SparseTensor(
                    indices=tf.expand_dims(indices, 1),
                    values=tf.gather(flattened, indices),
                    dense_shape=tf.shape(flattened, out_type=tf.int64)
                )

                compressed_grads.append(sparse_grad)
            else:
                compressed_grads.append(None)
        return compressed_grads

    # Define client update function with compression
    @tf.function
    def client_update(model, dataset, lr):
        """Client training with gradient compression"""
        optimizer = tf.keras.optimizers.SGD(learning_rate=lr)

        # Define loss function
        def loss_fn(y_true, y_pred):
            return tf.keras.losses.SparseCategoricalCrossentropy()(y_true, y_pred)

        # Training loop
        for batch in dataset:
            with tf.GradientTape() as tape:
                outputs = model(batch['x'])
                loss = loss_fn(batch['y'], outputs)

            # Get gradients and compress them
            grads = tape.gradient(loss, model.trainable_variables)
            compressed_grads = compress_gradients(grads)

            # Decompress gradients (in real federated learning, this happens on server)
            decompressed_grads = []
            for grad in compressed_grads:
                if grad is not None:
                    if isinstance(grad, tf.sparse.SparseTensor):
                        decompressed_grads.append(tf.sparse.to_dense(grad))
                    else:
                        decompressed_grads.append(grad)
                else:
                    decompressed_grads.append(None)

            # Apply gradients
            optimizer.apply_gradients(zip(decompressed_grads, model.trainable_variables))

        return model

    # Simulate federated learning with compression
    clients = list(range(num_clients))
    client_datasets = [train_datasets[i] for i in range(num_clients)]

    # Initialize global model
    global_model = create_keras_model()

    for round_num in range(num_rounds):
        print(f"Round {round_num+1}/{num_rounds}")

        # Client updates
        client_models = []
        for client_id in clients:
            # Create client model by copying global model
            client_model = create_keras_model()
            client_model.set_weights(global_model.get_weights())

            # Perform client update
            updated_client_model = client_update(
                client_model,
                client_datasets[client_id],
                lr=0.01
            )

            client_models.append(updated_client_model)

        # Aggregate model updates (simple averaging in this example)
        # In practice, more sophisticated aggregation methods may be used
        new_weights = []
        for i in range(len(global_model.get_weights())):
            client_weights = [model.get_weights()[i] for model in client_models]
            new_weights.append(np.mean(client_weights, axis=0))

        # Update global model
        global_model.set_weights(new_weights)

        # Evaluate global model
        test_loss, test_accuracy = global_model.evaluate(validation_dataset)
        print(f"Round {round_num+1} validation: Loss = {test_loss:.4f}, Accuracy = {test_accuracy:.4f}")

    # Once training is complete, apply final model compression for deployment
    final_compressed_model = post_training_quantize(global_model)

    return final_compressed_model

Decision Rules

Use this checklist for model compression decisions:

  1. If your model doesn’t fit in device memory, quantize first - simplest reduction with least accuracy loss
  2. If latency is the problem, pruning often helps more than quantization
  3. If you need the best accuracy at small size, knowledge distillation produces better results than pruning alone
  4. If your target hardware varies, test on actual devices - simulations often don’t match reality
  5. If compression degrades accuracy too much, consider whether a smaller model architecture fits your task

Compression trades accuracy for efficiency. Measure the trade-off on your actual task.

Ready to Implement These AI Data Engineering Solutions?

Get a comprehensive AI Readiness Assessment to determine the best approach for your organization's data infrastructure and AI implementation needs.

Similar Articles

Edge AI Pipelines: Streaming Data from Sensors to Micro-Models
Edge AI Pipelines: Streaming Data from Sensors to Micro-Models
13 Jun, 2025 | 06 Mins read

A turbine failed catastrophically at a wind farm. Vibration sensors had detected anomalies weeks earlier. By the time sensor data traveled from remote turbines to central data centers, got processed b