Skip to content

Time Series Prediction

Time series prediction is an important application field in machine learning, involving predicting future values based on historical data. This chapter will demonstrate how to build various time series prediction models using TensorFlow.

Project Overview

We will build multiple time series prediction models, including traditional statistical methods and deep learning methods, applied to scenarios such as stock prices, weather data, and sales forecasting.

Project Goals

  • Understand time series data characteristics
  • Master data preprocessing and feature engineering
  • Build various prediction models
  • Learn model evaluation and optimization
  • Implement real-time prediction systems
python
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import warnings
warnings.filterwarnings('ignore')

# Set random seeds
tf.random.set_seed(42)
np.random.seed(42)

print(f"TensorFlow version: {tf.__version__}")

Data Preparation

Generating Sample Time Series Data

python
def generate_synthetic_timeseries(n_samples=1000, noise_level=0.1):
    """
    Generate synthetic time series data
    """
    # Time index
    time = np.arange(n_samples)

    # Trend component
    trend = 0.02 * time

    # Seasonal component
    seasonal = 10 * np.sin(2 * np.pi * time / 365.25) + 5 * np.sin(2 * np.pi * time / 30.44)

    # Cyclical component
    cyclical = 3 * np.sin(2 * np.pi * time / 7)

    # Noise
    noise = np.random.normal(0, noise_level * 10, n_samples)

    # Combine all components
    series = 100 + trend + seasonal + cyclical + noise

    # Create DataFrame
    dates = pd.date_range(start='2020-01-01', periods=n_samples, freq='D')
    df = pd.DataFrame({
        'date': dates,
        'value': series,
        'trend': trend,
        'seasonal': seasonal,
        'cyclical': cyclical,
        'noise': noise
    })

    return df

def load_stock_data(symbol='AAPL', start_date='2020-01-01', end_date='2023-12-31'):
    """
    Load stock data (example function, actual requires API)
    """
    # Here using simulated data, actual applications can use yfinance and other libraries
    n_days = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days
    dates = pd.date_range(start=start_date, end=end_date, freq='D')

    # Simulate stock prices
    np.random.seed(42)
    returns = np.random.normal(0.001, 0.02, len(dates))
    prices = [100]

    for ret in returns[1:]:
        prices.append(prices[-1] * (1 + ret))

    df = pd.DataFrame({
        'date': dates,
        'close': prices[:len(dates)],
        'volume': np.random.randint(1000000, 10000000, len(dates))
    })

    return df

# Generate sample data
ts_data = generate_synthetic_timeseries(1000)
stock_data = load_stock_data()

print("Time series data shape:", ts_data.shape)
print("Stock data shape:", stock_data.shape)

Data Visualization

python
def plot_timeseries_components(df, value_col='value'):
    """
    Visualize time series components
    """
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # Original time series
    axes[0, 0].plot(df['date'], df[value_col])
    axes[0, 0].set_title('Original Time Series')
    axes[0, 0].set_xlabel('Date')
    axes[0, 0].set_ylabel('Value')

    # Trend component
    if 'trend' in df.columns:
        axes[0, 1].plot(df['date'], df['trend'])
        axes[0, 1].set_title('Trend Component')
        axes[0, 1].set_xlabel('Date')
        axes[0, 1].set_ylabel('Trend')

    # Seasonal component
    if 'seasonal' in df.columns:
        axes[1, 0].plot(df['date'], df['seasonal'])
        axes[1, 0].set_title('Seasonal Component')
        axes[1, 0].set_xlabel('Date')
        axes[1, 0].set_ylabel('Seasonal')

    # Noise component
    if 'noise' in df.columns:
        axes[1, 1].plot(df['date'], df['noise'])
        axes[1, 1].set_title('Noise Component')
        axes[1, 1].set_xlabel('Date')
        axes[1, 1].set_ylabel('Noise')

    plt.tight_layout()
    plt.show()

def plot_timeseries_statistics(df, value_col='value', window=30):
    """
    Plot time series statistical characteristics
    """
    # Calculate moving average and standard deviation
    df['rolling_mean'] = df[value_col].rolling(window=window).mean()
    df['rolling_std'] = df[value_col].rolling(window=window).std()

    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # Original data and moving average
    axes[0, 0].plot(df['date'], df[value_col], label='Original Data', alpha=0.7)
    axes[0, 0].plot(df['date'], df['rolling_mean'], label=f'{window}-day Moving Average', color='red')
    axes[0, 0].set_title('Time Series and Moving Average')
    axes[0, 0].legend()

    # Moving standard deviation
    axes[0, 1].plot(df['date'], df['rolling_std'])
    axes[0, 1].set_title(f'{window}-day Moving Standard Deviation')

    # Autocorrelation plot
    from statsmodels.tsa.stattools import acf
    lags = 40
    autocorr = acf(df[value_col].dropna(), nlags=lags)
    axes[1, 0].bar(range(lags+1), autocorr)
    axes[1, 0].set_title('Autocorrelation Function')
    axes[1, 0].set_xlabel('Lag')

    # Distribution histogram
    axes[1, 1].hist(df[value_col].dropna(), bins=50, alpha=0.7)
    axes[1, 1].set_title('Value Distribution')
    axes[1, 1].set_xlabel('Value')
    axes[1, 1].set_ylabel('Frequency')

    plt.tight_layout()
    plt.show()

# Visualize data
plot_timeseries_components(ts_data)
plot_timeseries_statistics(ts_data)

Data Preprocessing

python
def create_sequences(data, sequence_length, prediction_length=1):
    """
    Create sequence data for training
    """
    X, y = [], []

    for i in range(len(data) - sequence_length - prediction_length + 1):
        # Input sequence
        X.append(data[i:(i + sequence_length)])
        # Target value
        y.append(data[i + sequence_length:i + sequence_length + prediction_length])

    return np.array(X), np.array(y)

def prepare_timeseries_data(df, value_col='value', sequence_length=60,
                           prediction_length=1, test_size=0.2, scale=True):
    """
    Prepare time series data for training
    """
    # Extract values
    values = df[value_col].values.reshape(-1, 1)

    # Data scaling
    scaler = None
    if scale:
        scaler = MinMaxScaler()
        values = scaler.fit_transform(values)

    # Create sequences
    X, y = create_sequences(values, sequence_length, prediction_length)

    # Split training and test sets
    split_idx = int(len(X) * (1 - test_size))

    X_train, X_test = X[:split_idx], X[split_idx:]
    y_train, y_test = y[:split_idx], y[split_idx:]

    print(f"Training set shape: X_train={X_train.shape}, y_train={y_train.shape}")
    print(f"Test set shape: X_test={X_test.shape}, y_test={y_test.shape}")

    return (X_train, y_train), (X_test, y_test), scaler

def add_features(df, value_col='value'):
    """
    Add time series features
    """
    df = df.copy()

    # Lag features
    for lag in [1, 2, 3, 7, 14, 30]:
        df[f'lag_{lag}'] = df[value_col].shift(lag)

    # Moving average features
    for window in [3, 7, 14, 30]:
        df[f'ma_{window}'] = df[value_col].rolling(window=window).mean()
        df[f'std_{window}'] = df[value_col].rolling(window=window).std()

    # Time features
    df['day_of_week'] = df['date'].dt.dayofweek
    df['day_of_month'] = df['date'].dt.day
    df['month'] = df['date'].dt.month
    df['quarter'] = df['date'].dt.quarter
    df['year'] = df['date'].dt.year

    # Difference features
    df['diff_1'] = df[value_col].diff(1)
    df['diff_7'] = df[value_col].diff(7)

    # Percentage change
    df['pct_change_1'] = df[value_col].pct_change(1)
    df['pct_change_7'] = df[value_col].pct_change(7)

    return df

# Prepare data
sequence_length = 60
prediction_length = 1

(X_train, y_train), (X_test, y_test), scaler = prepare_timeseries_data(
    ts_data, sequence_length=sequence_length, prediction_length=prediction_length
)

# Add features
enhanced_data = add_features(ts_data)
print("Enhanced features:", enhanced_data.columns.tolist())

Model Building

LSTM Model

python
def create_lstm_model(sequence_length, n_features=1, lstm_units=50,
                     dense_units=25, dropout_rate=0.2, prediction_length=1):
    """
    Create LSTM time series prediction model
    """
    model = keras.Sequential([
        keras.layers.LSTM(lstm_units, return_sequences=True,
                         input_shape=(sequence_length, n_features)),
        keras.layers.Dropout(dropout_rate),

        keras.layers.LSTM(lstm_units, return_sequences=False),
        keras.layers.Dropout(dropout_rate),

        keras.layers.Dense(dense_units, activation='relu'),
        keras.layers.Dropout(dropout_rate),

        keras.layers.Dense(prediction_length)
    ])

    return model

def create_bidirectional_lstm_model(sequence_length, n_features=1, lstm_units=50,
                                   dense_units=25, dropout_rate=0.2, prediction_length=1):
    """
    Create bidirectional LSTM model
    """
    model = keras.Sequential([
        keras.layers.Bidirectional(
            keras.layers.LSTM(lstm_units, return_sequences=True),
            input_shape=(sequence_length, n_features)
        ),
        keras.layers.Dropout(dropout_rate),

        keras.layers.Bidirectional(
            keras.layers.LSTM(lstm_units, return_sequences=False)
        ),
        keras.layers.Dropout(dropout_rate),

        keras.layers.Dense(dense_units, activation='relu'),
        keras.layers.Dropout(dropout_rate),

        keras.layers.Dense(prediction_length)
    ])

    return model

# Create LSTM model
lstm_model = create_lstm_model(sequence_length, n_features=1)
bilstm_model = create_bidirectional_lstm_model(sequence_length, n_features=1)

print("LSTM model structure:")
lstm_model.summary()

GRU Model

python
def create_gru_model(sequence_length, n_features=1, gru_units=50,
                    dense_units=25, dropout_rate=0.2, prediction_length=1):
    """
    Create GRU time series prediction model
    """
    model = keras.Sequential([
        keras.layers.GRU(gru_units, return_sequences=True,
                        input_shape=(sequence_length, n_features)),
        keras.layers.Dropout(dropout_rate),

        keras.layers.GRU(gru_units, return_sequences=False),
        keras.layers.Dropout(dropout_rate),

        keras.layers.Dense(dense_units, activation='relu'),
        keras.layers.Dropout(dropout_rate),

        keras.layers.Dense(prediction_length)
    ])

    return model

def create_stacked_gru_model(sequence_length, n_features=1, gru_units=[50, 50, 25],
                            dropout_rate=0.2, prediction_length=1):
    """
    Create stacked GRU model
    """
    model = keras.Sequential()

    # First GRU layer
    model.add(keras.layers.GRU(
        gru_units[0],
        return_sequences=True,
        input_shape=(sequence_length, n_features)
    ))
    model.add(keras.layers.Dropout(dropout_rate))

    # Middle GRU layers
    for units in gru_units[1:-1]:
        model.add(keras.layers.GRU(units, return_sequences=True))
        model.add(keras.layers.Dropout(dropout_rate))

    # Last GRU layer
    model.add(keras.layers.GRU(gru_units[-1], return_sequences=False))
    model.add(keras.layers.Dropout(dropout_rate))

    # Output layer
    model.add(keras.layers.Dense(prediction_length))

    return model

# Create GRU model
gru_model = create_gru_model(sequence_length, n_features=1)
stacked_gru_model = create_stacked_gru_model(sequence_length, n_features=1)

CNN-LSTM Hybrid Model

python
def create_cnn_lstm_model(sequence_length, n_features=1, cnn_filters=64,
                         kernel_size=3, lstm_units=50, dense_units=25,
                         dropout_rate=0.2, prediction_length=1):
    """
    Create CNN-LSTM hybrid model
    """
    model = keras.Sequential([
        # CNN layers to extract local features
        keras.layers.Conv1D(cnn_filters, kernel_size, activation='relu',
                           input_shape=(sequence_length, n_features)),
        keras.layers.MaxPooling1D(pool_size=2),
        keras.layers.Dropout(dropout_rate),

        keras.layers.Conv1D(cnn_filters//2, kernel_size, activation='relu'),
        keras.layers.MaxPooling1D(pool_size=2),
        keras.layers.Dropout(dropout_rate),

        # LSTM layer to capture temporal dependencies
        keras.layers.LSTM(lstm_units, return_sequences=False),
        keras.layers.Dropout(dropout_rate),

        # Fully connected layers
        keras.layers.Dense(dense_units, activation='relu'),
        keras.layers.Dropout(dropout_rate),

        keras.layers.Dense(prediction_length)
    ])

    return model

def create_attention_lstm_model(sequence_length, n_features=1, lstm_units=50,
                               attention_units=32, dense_units=25,
                               dropout_rate=0.2, prediction_length=1):
    """
    Create LSTM model with attention mechanism
    """
    # Attention layer
    class AttentionLayer(keras.layers.Layer):
        def __init__(self, units):
            super(AttentionLayer, self).__init__()
            self.units = units
            self.W1 = keras.layers.Dense(units)
            self.W2 = keras.layers.Dense(units)
            self.V = keras.layers.Dense(1)

        def call(self, query, values):
            # query: (batch_size, hidden_size)
            # values: (batch_size, time_steps, hidden_size)

            # Expand query dimension to match values
            query_with_time_axis = tf.expand_dims(query, 1)

            # Calculate attention scores
            score = self.V(tf.nn.tanh(
                self.W1(query_with_time_axis) + self.W2(values)
            ))

            # Calculate attention weights
            attention_weights = tf.nn.softmax(score, axis=1)

            # Calculate context vector
            context_vector = attention_weights * values
            context_vector = tf.reduce_sum(context_vector, axis=1)

            return context_vector, attention_weights

    # Build model
    inputs = keras.layers.Input(shape=(sequence_length, n_features))

    # LSTM layer
    lstm_out = keras.layers.LSTM(lstm_units, return_sequences=True)(inputs)
    lstm_final = keras.layers.LSTM(lstm_units, return_state=True)(lstm_out)

    # Attention mechanism
    attention = AttentionLayer(attention_units)
    context_vector, attention_weights = attention(lstm_final[1], lstm_out)

    # Output layer
    dense = keras.layers.Dense(dense_units, activation='relu')(context_vector)
    dropout = keras.layers.Dropout(dropout_rate)(dense)
    outputs = keras.layers.Dense(prediction_length)(dropout)

    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

# Create hybrid models
cnn_lstm_model = create_cnn_lstm_model(sequence_length, n_features=1)
attention_lstm_model = create_attention_lstm_model(sequence_length, n_features=1)

print("CNN-LSTM model structure:")
cnn_lstm_model.summary()

Transformer Model

python
def create_transformer_model(sequence_length, n_features=1, d_model=64,
                           num_heads=4, ff_dim=128, num_layers=2,
                           dropout_rate=0.1, prediction_length=1):
    """
    Create Transformer time series prediction model
    """
    inputs = keras.layers.Input(shape=(sequence_length, n_features))

    # Project to d_model dimension
    x = keras.layers.Dense(d_model)(inputs)

    # Positional encoding
    positions = tf.range(start=0, limit=sequence_length, delta=1)
    position_embedding = keras.layers.Embedding(sequence_length, d_model)(positions)
    x = x + position_embedding

    # Transformer encoder layers
    for _ in range(num_layers):
        # Multi-head self-attention
        attention_output = keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=d_model
        )(x, x)

        # Residual connection and layer normalization
        x = keras.layers.LayerNormalization()(x + attention_output)

        # Feed-forward network
        ffn_output = keras.layers.Dense(ff_dim, activation='relu')(x)
        ffn_output = keras.layers.Dense(d_model)(ffn_output)

        # Residual connection and layer normalization
        x = keras.layers.LayerNormalization()(x + ffn_output)

        # Dropout
        x = keras.layers.Dropout(dropout_rate)(x)

    # Global average pooling
    x = keras.layers.GlobalAveragePooling1D()(x)

    # Output layer
    x = keras.layers.Dense(64, activation='relu')(x)
    x = keras.layers.Dropout(dropout_rate)(x)
    outputs = keras.layers.Dense(prediction_length)(x)

    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

# Create Transformer model
transformer_model = create_transformer_model(sequence_length, n_features=1)
print("Transformer model structure:")
transformer_model.summary()

Model Training

Training Configuration

python
def compile_timeseries_model(model, learning_rate=0.001):
    """
    Compile time series model
    """
    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)

    model.compile(
        optimizer=optimizer,
        loss='mse',
        metrics=['mae', 'mape']
    )

    return model

def create_timeseries_callbacks(model_name, patience=10):
    """
    Create time series training callbacks
    """
    callbacks = [
        keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=patience,
            restore_best_weights=True,
            verbose=1
        ),

        keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-7,
            verbose=1
        ),

        keras.callbacks.ModelCheckpoint(
            f'best_{model_name}.h5',
            monitor='val_loss',
            save_best_only=True,
            verbose=1
        )
    ]

    return callbacks

def train_timeseries_model(model, X_train, y_train, X_test, y_test,
                          epochs=100, batch_size=32, model_name='timeseries'):
    """
    Train time series model
    """
    # Compile model
    model = compile_timeseries_model(model)

    # Create callbacks
    callbacks = create_timeseries_callbacks(model_name)

    # Train model
    history = model.fit(
        X_train, y_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(X_test, y_test),
        callbacks=callbacks,
        verbose=1
    )

    return history

# Train LSTM model
print("Training LSTM model...")
lstm_history = train_timeseries_model(
    lstm_model, X_train, y_train, X_test, y_test,
    epochs=50, model_name='lstm_timeseries'
)

Training Visualization

python
def plot_training_history(history, title='Model Training History'):
    """
    Plot training history
    """
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # Loss
    axes[0, 0].plot(history.history['loss'], label='Training Loss')
    axes[0, 0].plot(history.history['val_loss'], label='Validation Loss')
    axes[0, 0].set_title('Model Loss')
    axes[0, 0].set_xlabel('Epoch')
    axes[0, 0].set_ylabel('MSE')
    axes[0, 0].legend()

    # MAE
    axes[0, 1].plot(history.history['mae'], label='Training MAE')
    axes[0, 1].plot(history.history['val_mae'], label='Validation MAE')
    axes[0, 1].set_title('Mean Absolute Error')
    axes[0, 1].set_xlabel('Epoch')
    axes[0, 1].set_ylabel('MAE')
    axes[0, 1].legend()

    # MAPE
    axes[1, 0].plot(history.history['mape'], label='Training MAPE')
    axes[1, 0].plot(history.history['val_mape'], label='Validation MAPE')
    axes[1, 0].set_title('Mean Absolute Percentage Error')
    axes[1, 0].set_xlabel('Epoch')
    axes[1, 0].set_ylabel('MAPE')
    axes[1, 0].legend()

    # Learning rate (if recorded)
    if 'lr' in history.history:
        axes[1, 1].plot(history.history['lr'])
        axes[1, 1].set_title('Learning Rate')
        axes[1, 1].set_xlabel('Epoch')
        axes[1, 1].set_ylabel('Learning Rate')
        axes[1, 1].set_yscale('log')

    plt.suptitle(title)
    plt.tight_layout()
    plt.show()

# Visualize training history
plot_training_history(lstm_history, 'LSTM Model Training History')

Model Evaluation

Prediction and Evaluation Metrics

python
def evaluate_timeseries_model(model, X_test, y_test, scaler=None):
    """
    Evaluate time series model
    """
    # Predictions
    y_pred = model.predict(X_test)

    # Inverse scaling (if scaling was used)
    if scaler is not None:
        y_test_orig = scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()
        y_pred_orig = scaler.inverse_transform(y_pred.reshape(-1, 1)).flatten()
    else:
        y_test_orig = y_test.flatten()
        y_pred_orig = y_pred.flatten()

    # Calculate evaluation metrics
    mse = mean_squared_error(y_test_orig, y_pred_orig)
    mae = mean_absolute_error(y_test_orig, y_pred_orig)
    rmse = np.sqrt(mse)

    # MAPE (Mean Absolute Percentage Error)
    mape = np.mean(np.abs((y_test_orig - y_pred_orig) / y_test_orig)) * 100

    # R² score
    r2 = r2_score(y_test_orig, y_pred_orig)

    print(f"Evaluation metrics:")
    print(f"MSE: {mse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAPE: {mape:.2f}%")
    print(f"R²: {r2:.4f}")

    return {
        'mse': mse,
        'mae': mae,
        'rmse': rmse,
        'mape': mape,
        'r2': r2,
        'y_true': y_test_orig,
        'y_pred': y_pred_orig
    }

def plot_predictions(y_true, y_pred, title='Prediction Results', n_samples=200):
    """
    Plot prediction results
    """
    # Only show partial samples for observation
    if len(y_true) > n_samples:
        indices = np.random.choice(len(y_true), n_samples, replace=False)
        indices = np.sort(indices)
        y_true_plot = y_true[indices]
        y_pred_plot = y_pred[indices]
    else:
        y_true_plot = y_true
        y_pred_plot = y_pred
        indices = np.arange(len(y_true))

    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # Time series comparison
    axes[0, 0].plot(indices, y_true_plot, label='True Values', alpha=0.7)
    axes[0, 0].plot(indices, y_pred_plot, label='Predicted Values', alpha=0.7)
    axes[0, 0].set_title('Predicted vs True Values')
    axes[0, 0].set_xlabel('Time')
    axes[0, 0].set_ylabel('Value')
    axes[0, 0].legend()

    # Scatter plot
    axes[0, 1].scatter(y_true, y_pred, alpha=0.5)
    min_val = min(y_true.min(), y_pred.min())
    max_val = max(y_true.max(), y_pred.max())
    axes[0, 1].plot([min_val, max_val], [min_val, max_val], 'r--', lw=2)
    axes[0, 1].set_xlabel('True Values')
    axes[0, 1].set_ylabel('Predicted Values')
    axes[0, 1].set_title('Predicted vs True Values Scatter Plot')

    # Residual plot
    residuals = y_true - y_pred
    axes[1, 0].scatter(y_pred, residuals, alpha=0.5)
    axes[1, 0].axhline(y=0, color='r', linestyle='--')
    axes[1, 0].set_xlabel('Predicted Values')
    axes[1, 0].set_ylabel('Residuals')
    axes[1, 0].set_title('Residual Plot')

    # Residual distribution
    axes[1, 1].hist(residuals, bins=30, alpha=0.7)
    axes[1, 1].set_xlabel('Residuals')
    axes[1, 1].set_ylabel('Frequency')
    axes[1, 1].set_title('Residual Distribution')

    plt.suptitle(title)
    plt.tight_layout()
    plt.show()

# Evaluate LSTM model
lstm_results = evaluate_timeseries_model(lstm_model, X_test, y_test, scaler)
plot_predictions(lstm_results['y_true'], lstm_results['y_pred'], 'LSTM Model Prediction Results')

Multi-step Prediction

python
def multi_step_prediction(model, initial_sequence, n_steps, scaler=None):
    """
    Multi-step prediction
    """
    predictions = []
    current_sequence = initial_sequence.copy()

    for _ in range(n_steps):
        # Predict next value
        next_pred = model.predict(current_sequence.reshape(1, -1, 1), verbose=0)
        predictions.append(next_pred[0, 0])

        # Update sequence (sliding window)
        current_sequence = np.append(current_sequence[1:], next_pred[0, 0])

    predictions = np.array(predictions)

    # Inverse scaling
    if scaler is not None:
        predictions = scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()

    return predictions

def plot_multi_step_prediction(model, X_test, y_test, scaler, n_steps=30, sample_idx=0):
    """
    Visualize multi-step prediction
    """
    # Select a test sample
    initial_sequence = X_test[sample_idx].flatten()

    # Perform multi-step prediction
    predictions = multi_step_prediction(model, initial_sequence, n_steps, scaler)

    # Get actual subsequent values
    if sample_idx + n_steps < len(y_test):
        true_values = y_test[sample_idx:sample_idx + n_steps]
        if scaler is not None:
            true_values = scaler.inverse_transform(true_values.reshape(-1, 1)).flatten()
    else:
        true_values = None

    # Plot
    plt.figure(figsize=(12, 6))

    # Historical data
    if scaler is not None:
        history = scaler.inverse_transform(initial_sequence.reshape(-1, 1)).flatten()
    else:
        history = initial_sequence

    history_x = np.arange(-len(history), 0)
    pred_x = np.arange(0, n_steps)

    plt.plot(history_x, history, label='Historical Data', color='blue')
    plt.plot(pred_x, predictions, label='Predicted Values', color='red', marker='o')

    if true_values is not None:
        plt.plot(pred_x[:len(true_values)], true_values,
                label='True Values', color='green', marker='s')

    plt.axvline(x=0, color='black', linestyle='--', alpha=0.5)
    plt.xlabel('Time Step')
    plt.ylabel('Value')
    plt.title(f'{n_steps}-step Prediction')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

# Multi-step prediction example
plot_multi_step_prediction(lstm_model, X_test, y_test, scaler, n_steps=30)

Model Comparison

Multi-model Comparison

python
def compare_models(models, model_names, X_train, y_train, X_test, y_test, scaler=None):
    """
    Compare performance of multiple models
    """
    results = {}

    for model, name in zip(models, model_names):
        print(f"\nTraining {name} model...")

        # Compile and train model
        model = compile_timeseries_model(model)

        # Simplified training (reduce epochs to save time)
        history = model.fit(
            X_train, y_train,
            epochs=20,
            batch_size=32,
            validation_data=(X_test, y_test),
            verbose=0
        )

        # Evaluate model
        model_results = evaluate_timeseries_model(model, X_test, y_test, scaler)
        results[name] = model_results

    return results

def plot_model_comparison(results):
    """
    Visualize model comparison results
    """
    model_names = list(results.keys())
    metrics = ['mse', 'mae', 'rmse', 'mape', 'r2']

    fig, axes = plt.subplots(2, 3, figsize=(18, 10))
    axes = axes.flatten()

    for i, metric in enumerate(metrics):
        values = [results[name][metric] for name in model_names]

        bars = axes[i].bar(model_names, values)
        axes[i].set_title(f'{metric.upper()}')
        axes[i].set_ylabel(metric.upper())

        # Add value labels
        for bar, value in zip(bars, values):
            height = bar.get_height()
            axes[i].text(bar.get_x() + bar.get_width()/2., height,
                        f'{value:.3f}', ha='center', va='bottom')

        # Rotate x-axis labels
        axes[i].tick_params(axis='x', rotation=45)

    # Hide last subplot
    axes[-1].axis('off')

    plt.tight_layout()
    plt.show()

# Create multiple models for comparison
models_to_compare = [
    create_lstm_model(sequence_length, n_features=1),
    create_gru_model(sequence_length, n_features=1),
    create_cnn_lstm_model(sequence_length, n_features=1),
]

model_names = ['LSTM', 'GRU', 'CNN-LSTM']

# Compare models (Note: This will retrain models)
# comparison_results = compare_models(models_to_compare, model_names, X_train, y_train, X_test, y_test, scaler)
# plot_model_comparison(comparison_results)

Real-time Prediction System

Online Prediction

python
class TimeSeriesPredictor:
    """
    Time series predictor class
    """
    def __init__(self, model, scaler, sequence_length):
        self.model = model
        self.scaler = scaler
        self.sequence_length = sequence_length
        self.history = []

    def add_data_point(self, value):
        """
        Add new data point
        """
        self.history.append(value)

        # Maintain history data length
        if len(self.history) > self.sequence_length * 2:
            self.history = self.history[-self.sequence_length * 2:]

    def predict_next(self):
        """
        Predict next value
        """
        if len(self.history) < self.sequence_length:
            raise ValueError(f"Need at least {self.sequence_length} historical data points")

        # Prepare input sequence
        sequence = np.array(self.history[-self.sequence_length:])

        # Scale
        if self.scaler is not None:
            sequence_scaled = self.scaler.transform(sequence.reshape(-1, 1)).flatten()
        else:
            sequence_scaled = sequence

        # Predict
        prediction_scaled = self.model.predict(
            sequence_scaled.reshape(1, self.sequence_length, 1),
            verbose=0
        )[0, 0]

        # Inverse scaling
        if self.scaler is not None:
            prediction = self.scaler.inverse_transform([[prediction_scaled]])[0, 0]
        else:
            prediction = prediction_scaled

        return prediction

    def predict_multiple(self, n_steps):
        """
        Predict multiple steps
        """
        predictions = []

        for _ in range(n_steps):
            pred = self.predict_next()
            predictions.append(pred)
            self.add_data_point(pred)  # Add predicted value to history

        return predictions

def simulate_real_time_prediction(predictor, test_data, n_predictions=50):
    """
    Simulate real-time prediction
    """
    predictions = []
    true_values = []

    # Initialize history data
    for i in range(predictor.sequence_length):
        predictor.add_data_point(test_data[i])

    # Perform real-time prediction
    for i in range(predictor.sequence_length,
                  min(len(test_data), predictor.sequence_length + n_predictions)):

        # Predict
        pred = predictor.predict_next()
        predictions.append(pred)

        # True value
        true_val = test_data[i]
        true_values.append(true_val)

        # Add true value to history (simulate real-time data arrival)
        predictor.add_data_point(true_val)

    return np.array(predictions), np.array(true_values)

# Create predictor
predictor = TimeSeriesPredictor(lstm_model, scaler, sequence_length)

# Simulate real-time prediction
if scaler is not None:
    test_data_orig = scaler.inverse_transform(
        ts_data['value'].values[-200:].reshape(-1, 1)
    ).flatten()
else:
    test_data_orig = ts_data['value'].values[-200:]

rt_predictions, rt_true_values = simulate_real_time_prediction(
    predictor, test_data_orig, n_predictions=50
)

# Visualize real-time prediction results
plt.figure(figsize=(12, 6))
plt.plot(rt_true_values, label='True Values', marker='o')
plt.plot(rt_predictions, label='Predicted Values', marker='s')
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.title('Real-time Prediction Results')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Calculate real-time prediction error
rt_mae = mean_absolute_error(rt_true_values, rt_predictions)
rt_mape = np.mean(np.abs((rt_true_values - rt_predictions) / rt_true_values)) * 100
print(f"Real-time prediction MAE: {rt_mae:.4f}")
print(f"Real-time prediction MAPE: {rt_mape:.2f}%")

Model Deployment

Saving and Loading Models

python
def save_timeseries_model(model, scaler, model_path):
    """
    Save time series model and preprocessor
    """
    import pickle

    # Save model
    model.save(f'{model_path}.h5')

    # Save scaler
    if scaler is not None:
        with open(f'{model_path}_scaler.pkl', 'wb') as f:
            pickle.dump(scaler, f)

    print(f"Model saved to: {model_path}")

def load_timeseries_model(model_path):
    """
    Load time series model and preprocessor
    """
    import pickle

    # Load model
    model = keras.models.load_model(f'{model_path}.h5')

    # Load scaler
    try:
        with open(f'{model_path}_scaler.pkl', 'rb') as f:
            scaler = pickle.load(f)
    except FileNotFoundError:
        scaler = None

    return model, scaler

# Save model
save_timeseries_model(lstm_model, scaler, 'lstm_timeseries_model')

Web API Deployment

python
def create_timeseries_api(model, scaler, sequence_length):
    """
    Create time series prediction Web API
    """
    from flask import Flask, request, jsonify

    app = Flask(__name__)
    predictor = TimeSeriesPredictor(model, scaler, sequence_length)

    @app.route('/predict', methods=['POST'])
    def predict():
        try:
            data = request.get_json()

            if 'sequence' not in data:
                return jsonify({'error': 'Missing sequence field'}), 400

            sequence = data['sequence']

            if len(sequence) < sequence_length:
                return jsonify({
                    'error': f'Sequence length needs at least {sequence_length}'
                }), 400

            # Set history data
            predictor.history = sequence[-sequence_length:]

            # Predict
            prediction = predictor.predict_next()

            return jsonify({
                'prediction': float(prediction),
                'sequence_length': len(sequence)
            })

        except Exception as e:
            return jsonify({'error': str(e)}), 500

    @app.route('/predict_multiple', methods=['POST'])
    def predict_multiple():
        try:
            data = request.get_json()

            if 'sequence' not in data or 'n_steps' not in data:
                return jsonify({'error': 'Missing required fields'}), 400

            sequence = data['sequence']
            n_steps = data['n_steps']

            if len(sequence) < sequence_length:
                return jsonify({
                    'error': f'Sequence length needs at least {sequence_length}'
                }), 400

            # Set history data
            predictor.history = sequence[-sequence_length:]

            # Multi-step prediction
            predictions = predictor.predict_multiple(n_steps)

            return jsonify({
                'predictions': [float(p) for p in predictions],
                'n_steps': n_steps
            })

        except Exception as e:
            return jsonify({'error': str(e)}), 500

    @app.route('/health', methods=['GET'])
    def health():
        return jsonify({'status': 'healthy'})

    return app

# Create API
# api_app = create_timeseries_api(lstm_model, scaler, sequence_length)
# api_app.run(host='0.0.0.0', port=5000, debug=True)

Summary

This chapter demonstrated application of deep learning in time series data analysis through a complete time series prediction project:

Key Points:

  1. Data Understanding: Trend, seasonality, cyclicity analysis of time series
  2. Feature Engineering: Lag features, moving averages, time features
  3. Model Selection: LSTM, GRU, CNN-LSTM, Transformer, etc.
  4. Evaluation Metrics: MSE, MAE, MAPE, R², etc., time series specific metrics
  5. Multi-step Prediction: Recursive and direct prediction strategies
  6. Real-time Systems: Online prediction and streaming processing
  7. Model Deployment: API services and production environment considerations

Best Practices:

  • Fully analyze time series characteristics
  • Choose appropriate sequence length and prediction window
  • Use appropriate data scaling and normalization
  • Consider model computational complexity and prediction latency
  • Implement model monitoring and automatic retraining
  • Handle missing values and outliers
  • Consider external factors and multivariate prediction

Time series prediction has wide applications in finance, retail, manufacturing, and other fields. Mastering these techniques is very important for data scientists.

Content is for learning and research only.