Skip to main content

Overview

Deploying ML models to production requires careful consideration of serving infrastructure, scalability, monitoring, and reliability. This guide covers end-to-end deployment strategies for AQI prediction systems.

Deployment Architecture

Suitable for scheduled AQI forecasts:
┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│   Data      │────▶│   Model      │────▶│   Results   │
│   Store     │     │   Service    │     │   Database  │
└─────────────┘     └──────────────┘     └─────────────┘
      │                    │                     │
      │                    │                     │
      ▼                    ▼                     ▼
Daily/Hourly         Batch Process         API/Dashboard
Use cases:
  • Daily AQI forecasts
  • Historical reprocessing
  • Bulk predictions for multiple locations

Containerization

Docker Setup

Create a production-ready Docker image:
FROM python:3.10-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY src/ ./src/
COPY models/ ./models/
COPY config/ ./config/

# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run application
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
Use multi-stage builds to reduce image size and separate build-time from runtime dependencies.

Building and Testing

# Build image
docker build -t aqi-predictor:latest .

# Run tests in container
docker run --rm aqi-predictor:latest pytest tests/

# Start services
docker-compose up -d

# View logs
docker-compose logs -f api

# Scale services
docker-compose up -d --scale api=5

# Stop services
docker-compose down

REST API Implementation

FastAPI Service

from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import Counter, Histogram, generate_latest
from pydantic import BaseModel, Field
import joblib
import numpy as np
import logging
from typing import List, Optional
import time

# Initialize app
app = FastAPI(
    title="AQI Predictor API",
    description="Production API for Air Quality Index predictions",
    version="1.0.0"
)

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Metrics
prediction_counter = Counter('predictions_total', 'Total predictions made')
prediction_duration = Histogram('prediction_duration_seconds', 'Prediction latency')
error_counter = Counter('prediction_errors_total', 'Total prediction errors')

# Load model at startup
model = None
feature_names = None

@app.on_event("startup")
async def load_model():
    global model, feature_names
    try:
        model = joblib.load('/app/models/production/model.pkl')
        feature_names = joblib.load('/app/models/production/features.pkl')
        logging.info("Model loaded successfully")
    except Exception as e:
        logging.error(f"Failed to load model: {e}")
        raise

# Request/Response models
class PredictionRequest(BaseModel):
    pm25: float = Field(..., ge=0, le=500, description="PM2.5 concentration")
    pm10: float = Field(..., ge=0, le=600, description="PM10 concentration")
    no2: float = Field(..., ge=0, le=200, description="NO2 concentration")
    so2: float = Field(..., ge=0, le=100, description="SO2 concentration")
    co: float = Field(..., ge=0, le=50, description="CO concentration")
    o3: float = Field(..., ge=0, le=300, description="O3 concentration")
    temperature: float = Field(..., ge=-50, le=60, description="Temperature (°C)")
    humidity: float = Field(..., ge=0, le=100, description="Relative humidity (%)")
    wind_speed: float = Field(..., ge=0, le=50, description="Wind speed (m/s)")
    pressure: float = Field(..., ge=900, le=1100, description="Pressure (hPa)")
    hour: int = Field(..., ge=0, le=23, description="Hour of day")
    day_of_week: int = Field(..., ge=0, le=6, description="Day of week")
    month: int = Field(..., ge=1, le=12, description="Month")

    class Config:
        json_schema_extra = {
            "example": {
                "pm25": 35.5,
                "pm10": 50.2,
                "no2": 25.3,
                "so2": 8.1,
                "co": 0.6,
                "o3": 45.7,
                "temperature": 22.5,
                "humidity": 65.0,
                "wind_speed": 3.2,
                "pressure": 1013.25,
                "hour": 14,
                "day_of_week": 2,
                "month": 6
            }
        }

class PredictionResponse(BaseModel):
    aqi: float = Field(..., description="Predicted AQI value")
    category: str = Field(..., description="AQI category")
    confidence: float = Field(..., description="Prediction confidence score")
    processing_time_ms: float = Field(..., description="Processing time")

class BatchPredictionRequest(BaseModel):
    instances: List[PredictionRequest]

class BatchPredictionResponse(BaseModel):
    predictions: List[PredictionResponse]
    total_processing_time_ms: float

# Helper functions
def get_aqi_category(aqi: float) -> str:
    """Convert AQI value to category."""
    if aqi <= 50:
        return "Good"
    elif aqi <= 100:
        return "Moderate"
    elif aqi <= 150:
        return "Unhealthy for Sensitive Groups"
    elif aqi <= 200:
        return "Unhealthy"
    elif aqi <= 300:
        return "Very Unhealthy"
    else:
        return "Hazardous"

def calculate_confidence(prediction: float, features: np.ndarray) -> float:
    """Calculate prediction confidence based on feature values."""
    # Simplified confidence calculation
    # In production, use prediction intervals or model uncertainty
    return 0.85 if 0 <= prediction <= 500 else 0.60

# Endpoints
@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {
        "status": "healthy",
        "model_loaded": model is not None,
        "timestamp": time.time()
    }

@app.get("/ready")
async def readiness_check():
    """Readiness check endpoint."""
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    return {"status": "ready"}

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Single prediction endpoint."""
    start_time = time.time()
    
    try:
        # Prepare features
        features = np.array([[
            request.pm25, request.pm10, request.no2, request.so2,
            request.co, request.o3, request.temperature, request.humidity,
            request.wind_speed, request.pressure, request.hour,
            request.day_of_week, request.month
        ]])
        
        # Make prediction
        aqi = float(model.predict(features)[0])
        category = get_aqi_category(aqi)
        confidence = calculate_confidence(aqi, features)
        
        # Record metrics
        prediction_counter.inc()
        processing_time = (time.time() - start_time) * 1000
        prediction_duration.observe(processing_time / 1000)
        
        return PredictionResponse(
            aqi=round(aqi, 2),
            category=category,
            confidence=round(confidence, 3),
            processing_time_ms=round(processing_time, 2)
        )
        
    except Exception as e:
        error_counter.inc()
        logging.error(f"Prediction error: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
    """Batch prediction endpoint."""
    start_time = time.time()
    
    try:
        predictions = []
        
        for instance in request.instances:
            features = np.array([[
                instance.pm25, instance.pm10, instance.no2, instance.so2,
                instance.co, instance.o3, instance.temperature, instance.humidity,
                instance.wind_speed, instance.pressure, instance.hour,
                instance.day_of_week, instance.month
            ]])
            
            aqi = float(model.predict(features)[0])
            category = get_aqi_category(aqi)
            confidence = calculate_confidence(aqi, features)
            
            predictions.append(PredictionResponse(
                aqi=round(aqi, 2),
                category=category,
                confidence=round(confidence, 3),
                processing_time_ms=0
            ))
        
        prediction_counter.inc(len(predictions))
        total_time = (time.time() - start_time) * 1000
        
        return BatchPredictionResponse(
            predictions=predictions,
            total_processing_time_ms=round(total_time, 2)
        )
        
    except Exception as e:
        error_counter.inc()
        logging.error(f"Batch prediction error: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint."""
    return generate_latest()

@app.get("/model/info")
async def model_info():
    """Model metadata endpoint."""
    return {
        "model_type": type(model).__name__,
        "features": feature_names,
        "n_features": len(feature_names) if feature_names else 0
    }
Always implement authentication and rate limiting in production. Use API keys, OAuth, or JWT tokens.

Kubernetes Deployment

Kubernetes Manifests

apiVersion: apps/v1
kind: Deployment
metadata:
  name: aqi-predictor
  namespace: ml-services
  labels:
    app: aqi-predictor
    version: v1
spec:
  replicas: 3
  selector:
    matchLabels:
      app: aqi-predictor
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  template:
    metadata:
      labels:
        app: aqi-predictor
        version: v1
    spec:
      containers:
      - name: api
        image: your-registry/aqi-predictor:v1.0.0
        imagePullPolicy: Always
        ports:
        - containerPort: 8000
          name: http
          protocol: TCP
        env:
        - name: MODEL_PATH
          value: /models/production
        - name: LOG_LEVEL
          value: info
        - name: WORKERS
          value: "4"
        resources:
          requests:
            cpu: 1000m
            memory: 2Gi
          limits:
            cpu: 2000m
            memory: 4Gi
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 3
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 5
          timeoutSeconds: 3
          failureThreshold: 2
        volumeMounts:
        - name: model-volume
          mountPath: /models
          readOnly: true
      volumes:
      - name: model-volume
        persistentVolumeClaim:
          claimName: model-pvc

Deployment Commands

# Create namespace
kubectl create namespace ml-services

# Apply manifests
kubectl apply -f deployment.yaml
kubectl apply -f service.yaml
kubectl apply -f ingress.yaml
kubectl apply -f hpa.yaml

# Verify deployment
kubectl get pods -n ml-services
kubectl get svc -n ml-services
kubectl get ingress -n ml-services

# View logs
kubectl logs -f deployment/aqi-predictor -n ml-services

# Rolling update
kubectl set image deployment/aqi-predictor \
  api=your-registry/aqi-predictor:v1.1.0 \
  -n ml-services

# Rollback
kubectl rollout undo deployment/aqi-predictor -n ml-services

# Scale manually
kubectl scale deployment/aqi-predictor --replicas=5 -n ml-services

Model Serving Platforms

# Export model in SavedModel format
import tensorflow as tf

tf.saved_model.save(model, '/models/aqi_predictor/1')

# Run TF Serving
docker run -p 8501:8501 \
  --mount type=bind,source=/models/aqi_predictor,target=/models/aqi_predictor \
  -e MODEL_NAME=aqi_predictor \
  -t tensorflow/serving

# Make prediction
curl -X POST http://localhost:8501/v1/models/aqi_predictor:predict \
  -H 'Content-Type: application/json' \
  -d '{"instances": [[35.5, 50.2, 25.3, 8.1, 0.6, 45.7, 22.5, 65.0, 3.2, 1013.25, 14, 2, 6]]}'

CI/CD Pipeline

name: Build and Deploy

on:
  push:
    branches:
      - main
    tags:
      - 'v*'

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov
      
      - name: Run tests
        run: pytest tests/ --cov=src --cov-report=xml
      
      - name: Upload coverage
        uses: codecov/codecov-action@v3

  build:
    needs: test
    runs-on: ubuntu-latest
    permissions:
      contents: read
      packages: write
    steps:
      - uses: actions/checkout@v3
      
      - name: Log in to registry
        uses: docker/login-action@v2
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}
      
      - name: Extract metadata
        id: meta
        uses: docker/metadata-action@v4
        with:
          images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
      
      - name: Build and push
        uses: docker/build-push-action@v4
        with:
          context: .
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}

  deploy:
    needs: build
    runs-on: ubuntu-latest
    if: startsWith(github.ref, 'refs/tags/v')
    steps:
      - uses: actions/checkout@v3
      
      - name: Configure kubectl
        uses: azure/k8s-set-context@v3
        with:
          kubeconfig: ${{ secrets.KUBECONFIG }}
      
      - name: Deploy to Kubernetes
        run: |
          kubectl set image deployment/aqi-predictor \
            api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.ref_name }} \
            -n ml-services
          
          kubectl rollout status deployment/aqi-predictor -n ml-services

Load Testing

import locust
from locust import HttpUser, task, between
import random

class AQIPredictorUser(HttpUser):
    wait_time = between(1, 3)
    
    def generate_sample(self):
        return {
            "pm25": random.uniform(0, 200),
            "pm10": random.uniform(0, 300),
            "no2": random.uniform(0, 100),
            "so2": random.uniform(0, 50),
            "co": random.uniform(0, 10),
            "o3": random.uniform(0, 150),
            "temperature": random.uniform(-10, 40),
            "humidity": random.uniform(20, 95),
            "wind_speed": random.uniform(0, 20),
            "pressure": random.uniform(990, 1030),
            "hour": random.randint(0, 23),
            "day_of_week": random.randint(0, 6),
            "month": random.randint(1, 12)
        }
    
    @task(10)
    def predict_single(self):
        self.client.post("/predict", json=self.generate_sample())
    
    @task(1)
    def predict_batch(self):
        instances = [self.generate_sample() for _ in range(10)]
        self.client.post("/predict/batch", json={"instances": instances})
    
    @task(1)
    def health_check(self):
        self.client.get("/health")

# Run: locust -f load_test.py --host=http://localhost:8000

Best Practices

  • Deploy multiple replicas across availability zones
  • Implement health checks and readiness probes
  • Use circuit breakers for downstream dependencies
  • Configure automatic restarts and rollbacks
  • Use model caching (Redis, Memcached)
  • Batch predictions when possible
  • Optimize model size (quantization, pruning)
  • Enable GPU acceleration for large models
  • Use async processing for non-critical predictions
  • Implement authentication (API keys, OAuth)
  • Use TLS/SSL for all communications
  • Rate limit API requests
  • Validate and sanitize all inputs
  • Scan container images for vulnerabilities
  • Use non-root users in containers
  • Right-size compute resources
  • Use horizontal pod autoscaling
  • Implement request batching
  • Cache frequent predictions
  • Use spot instances for batch workloads

Next Steps