Overview
This guide covers how to load trained models, prepare input data, and make AQI predictions in various deployment scenarios, from batch processing to real-time API endpoints.Prerequisites
Ensure you have:
- Trained models saved from the training guide
- Feature scaler from data preparation
- Clear understanding of required input features
Loading Trained Models
Load Model and Artifacts
Load your saved model and preprocessing artifacts.
import xgboost as xgb
import joblib
import json
# Load XGBoost model
xgb_model = xgb.Booster()
xgb_model.load_model('models/xgboost_model.json')
# Load feature scaler
scaler = joblib.load('models/feature_scaler.pkl')
# Load metadata
with open('models/metadata.json', 'r') as f:
metadata = json.load(f)
feature_names = metadata['features']
print(f"Model loaded: {len(feature_names)} features")
print(f"Expected features: {feature_names[:5]}...")
Create Prediction Pipeline
Build a reusable pipeline for preprocessing and prediction.
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class AQIPredictor:
def __init__(self, model, scaler, feature_names):
self.model = model
self.scaler = scaler
self.feature_names = feature_names
self.model_type = self._detect_model_type()
def _detect_model_type(self):
"""Detect model framework"""
if hasattr(self.model, 'get_score'):
return 'xgboost'
elif hasattr(self.model, 'num_trees'):
return 'lightgbm'
elif hasattr(self.model, 'predict_proba'):
return 'sklearn'
elif hasattr(self.model, 'layers'):
return 'keras'
else:
return 'unknown'
def prepare_features(self, raw_data, historical_data=None):
"""
Transform raw input into model features
Args:
raw_data: Dict with current readings
historical_data: DataFrame with past readings for lag features
"""
df = pd.DataFrame([raw_data])
# Add temporal features
if 'timestamp' in raw_data:
ts = pd.to_datetime(raw_data['timestamp'])
else:
ts = pd.Timestamp.now()
df['hour'] = ts.hour
df['day_of_week'] = ts.dayofweek
df['month'] = ts.month
df['is_weekend'] = int(ts.dayofweek in [5, 6])
# Cyclical encoding
df['hour_sin'] = np.sin(2 * np.pi * ts.hour / 24)
df['hour_cos'] = np.cos(2 * np.pi * ts.hour / 24)
# Season
season_map = {
12: 0, 1: 0, 2: 0,
3: 1, 4: 1, 5: 1,
6: 2, 7: 2, 8: 2,
9: 3, 10: 3, 11: 3
}
df['season'] = season_map[ts.month]
# Add lag features if historical data provided
if historical_data is not None:
for lag in [1, 3, 6, 12, 24]:
if len(historical_data) >= lag:
df[f'pm25_lag_{lag}h'] = historical_data['pm25'].iloc[-lag]
df[f'pm10_lag_{lag}h'] = historical_data['pm10'].iloc[-lag]
else:
# Use current value if insufficient history
df[f'pm25_lag_{lag}h'] = raw_data.get('pm25', 0)
df[f'pm10_lag_{lag}h'] = raw_data.get('pm10', 0)
# Rolling statistics
for window in [3, 6, 12, 24]:
if len(historical_data) >= window:
df[f'pm25_rolling_mean_{window}h'] = historical_data['pm25'].tail(window).mean()
df[f'pm25_rolling_std_{window}h'] = historical_data['pm25'].tail(window).std()
else:
df[f'pm25_rolling_mean_{window}h'] = raw_data.get('pm25', 0)
df[f'pm25_rolling_std_{window}h'] = 0
else:
# Initialize with current values
for lag in [1, 3, 6, 12, 24]:
df[f'pm25_lag_{lag}h'] = raw_data.get('pm25', 0)
df[f'pm10_lag_{lag}h'] = raw_data.get('pm10', 0)
for window in [3, 6, 12, 24]:
df[f'pm25_rolling_mean_{window}h'] = raw_data.get('pm25', 0)
df[f'pm25_rolling_std_{window}h'] = 0
# Interaction features
df['pm_ratio'] = df['pm25'] / (df['pm10'] + 1e-5)
df['temp_humidity'] = raw_data.get('temperature', 20) * raw_data.get('humidity', 50)
df['pollutant_index'] = (
raw_data.get('pm25', 0) * 0.3 +
raw_data.get('pm10', 0) * 0.2 +
raw_data.get('no2', 0) * 0.2 +
raw_data.get('so2', 0) * 0.15 +
raw_data.get('co', 0) * 0.15
)
# Ensure all expected features are present
for feat in self.feature_names:
if feat not in df.columns:
df[feat] = 0
# Select and order features
df = df[self.feature_names]
return df
def predict(self, raw_data, historical_data=None, return_details=False):
"""
Make AQI prediction
Args:
raw_data: Dict with current sensor readings
historical_data: Optional DataFrame with historical readings
return_details: If True, return additional info
Returns:
Predicted AQI value or dict with details
"""
# Prepare features
features_df = self.prepare_features(raw_data, historical_data)
# Scale features
features_scaled = self.scaler.transform(features_df)
# Predict based on model type
if self.model_type == 'xgboost':
dmatrix = xgb.DMatrix(features_scaled)
prediction = self.model.predict(dmatrix)[0]
elif self.model_type == 'lightgbm':
prediction = self.model.predict(features_scaled)[0]
elif self.model_type in ['sklearn', 'keras']:
prediction = self.model.predict(features_scaled)[0]
if isinstance(prediction, np.ndarray):
prediction = prediction[0]
else:
raise ValueError(f"Unsupported model type: {self.model_type}")
# Clip to valid AQI range
prediction = np.clip(prediction, 0, 500)
if return_details:
return {
'aqi': float(prediction),
'category': self._get_aqi_category(prediction),
'timestamp': datetime.now().isoformat(),
'input_features': raw_data
}
return float(prediction)
@staticmethod
def _get_aqi_category(aqi_value):
"""Map AQI value to category"""
if aqi_value <= 50:
return {'level': 'Good', 'color': 'green'}
elif aqi_value <= 100:
return {'level': 'Moderate', 'color': 'yellow'}
elif aqi_value <= 150:
return {'level': 'Unhealthy for Sensitive Groups', 'color': 'orange'}
elif aqi_value <= 200:
return {'level': 'Unhealthy', 'color': 'red'}
elif aqi_value <= 300:
return {'level': 'Very Unhealthy', 'color': 'purple'}
else:
return {'level': 'Hazardous', 'color': 'maroon'}
# Initialize predictor
predictor = AQIPredictor(xgb_model, scaler, feature_names)
print("Prediction pipeline ready!")
Making Predictions
Single Prediction
Predict AQI for current conditions.# Current sensor readings
current_data = {
'timestamp': '2024-03-15 14:00:00',
'pm25': 35.2,
'pm10': 58.7,
'no2': 42.1,
'so2': 12.3,
'co': 0.8,
'o3': 65.4,
'temperature': 22.5,
'humidity': 55.0,
'wind_speed': 3.2,
'wind_direction': 180,
'pressure': 1013.2
}
# Make prediction
aqi_prediction = predictor.predict(current_data, return_details=True)
print(f"Predicted AQI: {aqi_prediction['aqi']:.1f}")
print(f"Category: {aqi_prediction['category']['level']}")
print(f"Color: {aqi_prediction['category']['color']}")
Batch Predictions
Process multiple predictions efficiently.def batch_predict(predictor, data_list):
"""
Make predictions for multiple data points
Args:
predictor: AQIPredictor instance
data_list: List of dicts with sensor readings
Returns:
List of predictions
"""
predictions = []
for i, data in enumerate(data_list):
try:
pred = predictor.predict(data, return_details=True)
predictions.append(pred)
except Exception as e:
print(f"Error predicting sample {i}: {str(e)}")
predictions.append(None)
return predictions
# Example: Load data from CSV
input_df = pd.read_csv('data/new_readings.csv')
data_list = input_df.to_dict('records')
# Batch predict
predictions = batch_predict(predictor, data_list)
# Add predictions to dataframe
input_df['predicted_aqi'] = [p['aqi'] if p else None for p in predictions]
input_df['aqi_category'] = [p['category']['level'] if p else None for p in predictions]
# Save results
input_df.to_csv('data/predictions.csv', index=False)
print(f"Processed {len(predictions)} predictions")
Time Series Forecasting
Predict future AQI values.def forecast_aqi(predictor, initial_data, historical_data, hours_ahead=24):
"""
Forecast AQI for multiple hours ahead
Args:
predictor: AQIPredictor instance
initial_data: Current sensor readings
historical_data: Recent historical readings
hours_ahead: Number of hours to forecast
Returns:
List of forecasted AQI values
"""
forecasts = []
current_data = initial_data.copy()
history = historical_data.copy()
for hour in range(hours_ahead):
# Predict next hour
aqi_pred = predictor.predict(current_data, history)
# Store forecast
forecast_time = pd.to_datetime(current_data['timestamp']) + timedelta(hours=hour+1)
forecasts.append({
'timestamp': forecast_time,
'aqi': aqi_pred
})
# Update for next iteration
# (In production, you'd update with actual new readings or use predicted values)
current_data['timestamp'] = forecast_time
# Append prediction to history
new_row = pd.DataFrame([{
'pm25': current_data['pm25'],
'pm10': current_data['pm10']
}])
history = pd.concat([history, new_row], ignore_index=True)
return pd.DataFrame(forecasts)
# Load recent historical data
historical = pd.read_parquet('data/processed/recent_readings.parquet')
# Forecast next 24 hours
forecast_df = forecast_aqi(predictor, current_data, historical, hours_ahead=24)
print("24-Hour Forecast:")
print(forecast_df.head(10))
Deployment Scenarios
REST API Endpoint
Deploy as a web service using Flask or FastAPI.from flask import Flask, request, jsonify
import traceback
app = Flask(__name__)
# Initialize predictor globally
predictor = AQIPredictor(xgb_model, scaler, feature_names)
@app.route('/predict', methods=['POST'])
def predict():
"""
Predict AQI from JSON input
Request body:
{
"pm25": 35.2,
"pm10": 58.7,
"no2": 42.1,
...
}
"""
try:
data = request.get_json()
# Validate input
required_fields = ['pm25', 'pm10', 'no2', 'so2', 'co', 'o3',
'temperature', 'humidity']
missing = [f for f in required_fields if f not in data]
if missing:
return jsonify({
'error': f'Missing required fields: {missing}'
}), 400
# Make prediction
result = predictor.predict(data, return_details=True)
return jsonify(result), 200
except Exception as e:
return jsonify({
'error': str(e),
'traceback': traceback.format_exc()
}), 500
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Streaming Predictions
Process real-time sensor data streams.import time
from collections import deque
class StreamingPredictor:
def __init__(self, predictor, buffer_size=24):
self.predictor = predictor
self.buffer = deque(maxlen=buffer_size)
def process_reading(self, reading):
"""
Process incoming sensor reading
"""
# Add to buffer
self.buffer.append(reading)
# Create historical dataframe
if len(self.buffer) > 1:
history = pd.DataFrame(list(self.buffer)[:-1])
else:
history = None
# Make prediction
prediction = self.predictor.predict(reading, history, return_details=True)
return prediction
# Initialize streaming predictor
streaming = StreamingPredictor(predictor, buffer_size=24)
# Simulate real-time stream
def simulate_sensor_stream():
"""Simulate real-time sensor readings"""
while True:
# Get new reading (from sensor API, message queue, etc.)
reading = {
'timestamp': datetime.now().isoformat(),
'pm25': np.random.uniform(20, 80),
'pm10': np.random.uniform(30, 120),
'no2': np.random.uniform(10, 60),
'so2': np.random.uniform(5, 30),
'co': np.random.uniform(0.3, 2.0),
'o3': np.random.uniform(30, 100),
'temperature': np.random.uniform(15, 30),
'humidity': np.random.uniform(40, 80)
}
# Process and predict
result = streaming.process_reading(reading)
print(f"[{result['timestamp']}] AQI: {result['aqi']:.1f} - {result['category']['level']}")
time.sleep(3600) # Wait 1 hour
# Run simulation
# simulate_sensor_stream()
For production deployments, consider using message queues (Kafka, RabbitMQ) for handling high-volume streaming data.
Error Handling and Validation
class ValidationError(Exception):
pass
def validate_sensor_reading(data):
"""
Validate sensor reading data
"""
# Check required fields
required = ['pm25', 'pm10', 'no2', 'so2', 'co', 'o3', 'temperature', 'humidity']
missing = [f for f in required if f not in data or data[f] is None]
if missing:
raise ValidationError(f"Missing required fields: {missing}")
# Check value ranges
ranges = {
'pm25': (0, 500),
'pm10': (0, 600),
'no2': (0, 400),
'so2': (0, 300),
'co': (0, 50),
'o3': (0, 400),
'temperature': (-50, 60),
'humidity': (0, 100)
}
for field, (min_val, max_val) in ranges.items():
if field in data:
value = data[field]
if not min_val <= value <= max_val:
raise ValidationError(
f"{field}={value} outside valid range [{min_val}, {max_val}]"
)
return True
# Use in prediction
try:
validate_sensor_reading(current_data)
prediction = predictor.predict(current_data)
print(f"AQI: {prediction}")
except ValidationError as e:
print(f"Validation error: {e}")
except Exception as e:
print(f"Prediction error: {e}")
Always validate input data before making predictions. Invalid sensor readings can produce nonsensical predictions.
Performance Optimization
import time
def benchmark_prediction(predictor, test_data, n_iterations=1000):
"""
Benchmark prediction latency
"""
times = []
for _ in range(n_iterations):
start = time.time()
predictor.predict(test_data)
elapsed = time.time() - start
times.append(elapsed)
times = np.array(times) * 1000 # Convert to ms
print(f"Prediction Latency:")
print(f" Mean: {times.mean():.2f} ms")
print(f" Median: {np.median(times):.2f} ms")
print(f" P95: {np.percentile(times, 95):.2f} ms")
print(f" P99: {np.percentile(times, 99):.2f} ms")
benchmark_prediction(predictor, current_data)