Skip to main content

Overview

Proper data preparation is crucial for accurate AQI predictions. This guide covers loading, cleaning, validating, and transforming environmental data into features suitable for machine learning models.

Data Requirements

AQI prediction models typically require hourly or daily measurements of pollutants and meteorological data over an extended period (minimum 6-12 months for reliable training).

Required Features

Your dataset should include:
  • Pollutant concentrations: PM2.5, PM10, NO2, SO2, CO, O3
  • Meteorological data: Temperature, humidity, wind speed, wind direction, pressure
  • Temporal features: Timestamp, day of week, season
  • Target variable: Historical AQI values

Step-by-Step Data Preparation

1

Load Raw Data

Start by loading your environmental data from various sources.
import pandas as pd
import numpy as np
from datetime import datetime

# Load raw data
df = pd.read_csv('environmental_data.csv')

# Parse timestamps
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values('timestamp').reset_index(drop=True)

print(f"Loaded {len(df)} records")
print(df.head())
2

Handle Missing Values

Environmental sensors often produce incomplete data. Handle missing values appropriately.
# Check missing data
missing_summary = df.isnull().sum()
missing_pct = (missing_summary / len(df)) * 100
print("Missing data percentage:")
print(missing_pct[missing_pct > 0])

# Strategy 1: Forward fill for short gaps (< 3 hours)
df_filled = df.copy()
for col in ['pm25', 'pm10', 'no2', 'so2', 'co', 'o3']:
    # Only fill gaps of 1-2 consecutive missing values
    df_filled[col] = df_filled[col].fillna(method='ffill', limit=2)

# Strategy 2: Interpolate for moderate gaps
numeric_cols = df_filled.select_dtypes(include=[np.number]).columns
df_filled[numeric_cols] = df_filled[numeric_cols].interpolate(
    method='time',
    limit=6  # Max 6 hours
)

# Strategy 3: Drop rows with remaining missing critical values
critical_features = ['pm25', 'pm10', 'no2', 'temperature', 'humidity']
df_clean = df_filled.dropna(subset=critical_features)

print(f"Records after cleaning: {len(df_clean)} ({len(df_clean)/len(df)*100:.1f}% retained)")
Avoid filling large gaps with interpolation, as this can introduce artificial patterns. Consider dropping records with more than 20% missing features.
3

Detect and Handle Outliers

Identify and handle anomalous sensor readings.
from scipy import stats

def detect_outliers_iqr(data, column, factor=3.0):
    """Detect outliers using Interquartile Range method"""
    Q1 = data[column].quantile(0.25)
    Q3 = data[column].quantile(0.75)
    IQR = Q3 - Q1
    
    lower_bound = Q1 - factor * IQR
    upper_bound = Q3 + factor * IQR
    
    outliers = (data[column] < lower_bound) | (data[column] > upper_bound)
    return outliers

# Apply physical constraints
constraints = {
    'pm25': (0, 500),
    'pm10': (0, 600),
    'no2': (0, 400),
    'so2': (0, 300),
    'co': (0, 50),
    'o3': (0, 400),
    'temperature': (-40, 60),
    'humidity': (0, 100),
    'wind_speed': (0, 50)
}

df_clean = df_clean.copy()
for col, (min_val, max_val) in constraints.items():
    if col in df_clean.columns:
        # Flag values outside physical limits
        invalid = (df_clean[col] < min_val) | (df_clean[col] > max_val)
        print(f"{col}: {invalid.sum()} invalid values")
        df_clean.loc[invalid, col] = np.nan

# Detect statistical outliers
for col in ['pm25', 'pm10', 'no2', 'so2']:
    outliers = detect_outliers_iqr(df_clean, col, factor=3.0)
    print(f"{col}: {outliers.sum()} statistical outliers")
    # Cap outliers instead of removing
    df_clean.loc[outliers, col] = df_clean[col].quantile(0.99)

# Fill newly created NaNs
df_clean = df_clean.interpolate(method='time', limit=3)
4

Feature Engineering

Create derived features that capture temporal patterns and interactions.
from sklearn.preprocessing import StandardScaler

def engineer_features(df):
    """Create temporal and interaction features"""
    df = df.copy()
    
    # Temporal features
    df['hour'] = df['timestamp'].dt.hour
    df['day_of_week'] = df['timestamp'].dt.dayofweek
    df['month'] = df['timestamp'].dt.month
    df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
    
    # Cyclical encoding for hour
    df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
    df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
    
    # Season encoding
    df['season'] = df['month'].map({
        12: 0, 1: 0, 2: 0,  # Winter
        3: 1, 4: 1, 5: 1,   # Spring
        6: 2, 7: 2, 8: 2,   # Summer
        9: 3, 10: 3, 11: 3  # Fall
    })
    
    # Lagged features (previous hours)
    for lag in [1, 3, 6, 12, 24]:
        df[f'pm25_lag_{lag}h'] = df['pm25'].shift(lag)
        df[f'pm10_lag_{lag}h'] = df['pm10'].shift(lag)
    
    # Rolling statistics (moving averages)
    for window in [3, 6, 12, 24]:
        df[f'pm25_rolling_mean_{window}h'] = df['pm25'].rolling(window).mean()
        df[f'pm25_rolling_std_{window}h'] = df['pm25'].rolling(window).std()
    
    # Interaction features
    df['pm_ratio'] = df['pm25'] / (df['pm10'] + 1e-5)
    df['temp_humidity'] = df['temperature'] * df['humidity']
    
    # Pollutant index
    df['pollutant_index'] = (
        df['pm25'] * 0.3 + 
        df['pm10'] * 0.2 + 
        df['no2'] * 0.2 + 
        df['so2'] * 0.15 + 
        df['co'] * 0.15
    )
    
    return df

df_features = engineer_features(df_clean)

# Drop rows with NaN from lagged/rolling features
df_features = df_features.dropna()

print(f"Feature engineering complete: {len(df_features.columns)} features")
Lagged features and rolling statistics are particularly important for time series prediction, as they capture recent pollution trends.
5

Normalize Features

Scale features to ensure consistent ranges for model training.
from sklearn.preprocessing import StandardScaler, RobustScaler
import joblib

# Separate features and target
feature_cols = [col for col in df_features.columns 
                if col not in ['timestamp', 'aqi']]
X = df_features[feature_cols]
y = df_features['aqi']

# Use RobustScaler (less sensitive to outliers)
scaler = RobustScaler()
X_scaled = scaler.fit_transform(X)
X_scaled = pd.DataFrame(X_scaled, columns=feature_cols, index=X.index)

# Save scaler for inference
joblib.dump(scaler, 'models/feature_scaler.pkl')
print("Feature scaling complete")

# Verify scaling
print("\nScaled feature statistics:")
print(X_scaled.describe())
6

Split Data

Create training, validation, and test sets with temporal awareness.
from sklearn.model_selection import train_test_split

def temporal_train_test_split(X, y, timestamps, train_size=0.7, val_size=0.15):
    """Split time series data chronologically"""
    n = len(X)
    train_end = int(n * train_size)
    val_end = int(n * (train_size + val_size))
    
    X_train = X.iloc[:train_end]
    y_train = y.iloc[:train_end]
    
    X_val = X.iloc[train_end:val_end]
    y_val = y.iloc[train_end:val_end]
    
    X_test = X.iloc[val_end:]
    y_test = y.iloc[val_end:]
    
    print(f"Train: {len(X_train)} samples ({timestamps.iloc[0]} to {timestamps.iloc[train_end-1]})")
    print(f"Val: {len(X_val)} samples ({timestamps.iloc[train_end]} to {timestamps.iloc[val_end-1]})")
    print(f"Test: {len(X_test)} samples ({timestamps.iloc[val_end]} to {timestamps.iloc[-1]})")
    
    return X_train, X_val, X_test, y_train, y_val, y_test

X_train, X_val, X_test, y_train, y_val, y_test = temporal_train_test_split(
    X_scaled, y, df_features['timestamp']
)
Never use random shuffling for time series data. Always split chronologically to avoid data leakage from future to past.
7

Save Processed Data

Export the prepared datasets for model training.
import os

# Create output directory
os.makedirs('data/processed', exist_ok=True)

# Save datasets
train_data = pd.DataFrame(X_train, columns=feature_cols)
train_data['aqi'] = y_train.values
train_data.to_parquet('data/processed/train.parquet', index=False)

val_data = pd.DataFrame(X_val, columns=feature_cols)
val_data['aqi'] = y_val.values
val_data.to_parquet('data/processed/val.parquet', index=False)

test_data = pd.DataFrame(X_test, columns=feature_cols)
test_data['aqi'] = y_test.values
test_data.to_parquet('data/processed/test.parquet', index=False)

# Save feature names
with open('data/processed/feature_names.txt', 'w') as f:
    f.write('\n'.join(feature_cols))

print("\nData preparation complete!")
print(f"Train: {len(train_data)} samples")
print(f"Val: {len(val_data)} samples")
print(f"Test: {len(test_data)} samples")

Data Validation Checklist

Before proceeding to training, verify:
  • No missing values in critical features
  • All values within physically realistic ranges
  • Sufficient data volume (minimum 10,000 samples recommended)
  • Temporal coverage includes all seasons
  • No data leakage between train/val/test splits
  • Features are properly scaled
  • Target variable (AQI) distribution is reasonable

Next Steps

With your data prepared, you’re ready to: