Time Series Prediction
Time series prediction is an important application field in machine learning, involving predicting future values based on historical data. This chapter will demonstrate how to build various time series prediction models using TensorFlow.
Project Overview
We will build multiple time series prediction models, including traditional statistical methods and deep learning methods, applied to scenarios such as stock prices, weather data, and sales forecasting.
Project Goals
- Understand time series data characteristics
- Master data preprocessing and feature engineering
- Build various prediction models
- Learn model evaluation and optimization
- Implement real-time prediction systems
python
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import warnings
warnings.filterwarnings('ignore')
# Set random seeds
tf.random.set_seed(42)
np.random.seed(42)
print(f"TensorFlow version: {tf.__version__}")Data Preparation
Generating Sample Time Series Data
python
def generate_synthetic_timeseries(n_samples=1000, noise_level=0.1):
"""
Generate synthetic time series data
"""
# Time index
time = np.arange(n_samples)
# Trend component
trend = 0.02 * time
# Seasonal component
seasonal = 10 * np.sin(2 * np.pi * time / 365.25) + 5 * np.sin(2 * np.pi * time / 30.44)
# Cyclical component
cyclical = 3 * np.sin(2 * np.pi * time / 7)
# Noise
noise = np.random.normal(0, noise_level * 10, n_samples)
# Combine all components
series = 100 + trend + seasonal + cyclical + noise
# Create DataFrame
dates = pd.date_range(start='2020-01-01', periods=n_samples, freq='D')
df = pd.DataFrame({
'date': dates,
'value': series,
'trend': trend,
'seasonal': seasonal,
'cyclical': cyclical,
'noise': noise
})
return df
def load_stock_data(symbol='AAPL', start_date='2020-01-01', end_date='2023-12-31'):
"""
Load stock data (example function, actual requires API)
"""
# Here using simulated data, actual applications can use yfinance and other libraries
n_days = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days
dates = pd.date_range(start=start_date, end=end_date, freq='D')
# Simulate stock prices
np.random.seed(42)
returns = np.random.normal(0.001, 0.02, len(dates))
prices = [100]
for ret in returns[1:]:
prices.append(prices[-1] * (1 + ret))
df = pd.DataFrame({
'date': dates,
'close': prices[:len(dates)],
'volume': np.random.randint(1000000, 10000000, len(dates))
})
return df
# Generate sample data
ts_data = generate_synthetic_timeseries(1000)
stock_data = load_stock_data()
print("Time series data shape:", ts_data.shape)
print("Stock data shape:", stock_data.shape)Data Visualization
python
def plot_timeseries_components(df, value_col='value'):
"""
Visualize time series components
"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Original time series
axes[0, 0].plot(df['date'], df[value_col])
axes[0, 0].set_title('Original Time Series')
axes[0, 0].set_xlabel('Date')
axes[0, 0].set_ylabel('Value')
# Trend component
if 'trend' in df.columns:
axes[0, 1].plot(df['date'], df['trend'])
axes[0, 1].set_title('Trend Component')
axes[0, 1].set_xlabel('Date')
axes[0, 1].set_ylabel('Trend')
# Seasonal component
if 'seasonal' in df.columns:
axes[1, 0].plot(df['date'], df['seasonal'])
axes[1, 0].set_title('Seasonal Component')
axes[1, 0].set_xlabel('Date')
axes[1, 0].set_ylabel('Seasonal')
# Noise component
if 'noise' in df.columns:
axes[1, 1].plot(df['date'], df['noise'])
axes[1, 1].set_title('Noise Component')
axes[1, 1].set_xlabel('Date')
axes[1, 1].set_ylabel('Noise')
plt.tight_layout()
plt.show()
def plot_timeseries_statistics(df, value_col='value', window=30):
"""
Plot time series statistical characteristics
"""
# Calculate moving average and standard deviation
df['rolling_mean'] = df[value_col].rolling(window=window).mean()
df['rolling_std'] = df[value_col].rolling(window=window).std()
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Original data and moving average
axes[0, 0].plot(df['date'], df[value_col], label='Original Data', alpha=0.7)
axes[0, 0].plot(df['date'], df['rolling_mean'], label=f'{window}-day Moving Average', color='red')
axes[0, 0].set_title('Time Series and Moving Average')
axes[0, 0].legend()
# Moving standard deviation
axes[0, 1].plot(df['date'], df['rolling_std'])
axes[0, 1].set_title(f'{window}-day Moving Standard Deviation')
# Autocorrelation plot
from statsmodels.tsa.stattools import acf
lags = 40
autocorr = acf(df[value_col].dropna(), nlags=lags)
axes[1, 0].bar(range(lags+1), autocorr)
axes[1, 0].set_title('Autocorrelation Function')
axes[1, 0].set_xlabel('Lag')
# Distribution histogram
axes[1, 1].hist(df[value_col].dropna(), bins=50, alpha=0.7)
axes[1, 1].set_title('Value Distribution')
axes[1, 1].set_xlabel('Value')
axes[1, 1].set_ylabel('Frequency')
plt.tight_layout()
plt.show()
# Visualize data
plot_timeseries_components(ts_data)
plot_timeseries_statistics(ts_data)Data Preprocessing
python
def create_sequences(data, sequence_length, prediction_length=1):
"""
Create sequence data for training
"""
X, y = [], []
for i in range(len(data) - sequence_length - prediction_length + 1):
# Input sequence
X.append(data[i:(i + sequence_length)])
# Target value
y.append(data[i + sequence_length:i + sequence_length + prediction_length])
return np.array(X), np.array(y)
def prepare_timeseries_data(df, value_col='value', sequence_length=60,
prediction_length=1, test_size=0.2, scale=True):
"""
Prepare time series data for training
"""
# Extract values
values = df[value_col].values.reshape(-1, 1)
# Data scaling
scaler = None
if scale:
scaler = MinMaxScaler()
values = scaler.fit_transform(values)
# Create sequences
X, y = create_sequences(values, sequence_length, prediction_length)
# Split training and test sets
split_idx = int(len(X) * (1 - test_size))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
print(f"Training set shape: X_train={X_train.shape}, y_train={y_train.shape}")
print(f"Test set shape: X_test={X_test.shape}, y_test={y_test.shape}")
return (X_train, y_train), (X_test, y_test), scaler
def add_features(df, value_col='value'):
"""
Add time series features
"""
df = df.copy()
# Lag features
for lag in [1, 2, 3, 7, 14, 30]:
df[f'lag_{lag}'] = df[value_col].shift(lag)
# Moving average features
for window in [3, 7, 14, 30]:
df[f'ma_{window}'] = df[value_col].rolling(window=window).mean()
df[f'std_{window}'] = df[value_col].rolling(window=window).std()
# Time features
df['day_of_week'] = df['date'].dt.dayofweek
df['day_of_month'] = df['date'].dt.day
df['month'] = df['date'].dt.month
df['quarter'] = df['date'].dt.quarter
df['year'] = df['date'].dt.year
# Difference features
df['diff_1'] = df[value_col].diff(1)
df['diff_7'] = df[value_col].diff(7)
# Percentage change
df['pct_change_1'] = df[value_col].pct_change(1)
df['pct_change_7'] = df[value_col].pct_change(7)
return df
# Prepare data
sequence_length = 60
prediction_length = 1
(X_train, y_train), (X_test, y_test), scaler = prepare_timeseries_data(
ts_data, sequence_length=sequence_length, prediction_length=prediction_length
)
# Add features
enhanced_data = add_features(ts_data)
print("Enhanced features:", enhanced_data.columns.tolist())Model Building
LSTM Model
python
def create_lstm_model(sequence_length, n_features=1, lstm_units=50,
dense_units=25, dropout_rate=0.2, prediction_length=1):
"""
Create LSTM time series prediction model
"""
model = keras.Sequential([
keras.layers.LSTM(lstm_units, return_sequences=True,
input_shape=(sequence_length, n_features)),
keras.layers.Dropout(dropout_rate),
keras.layers.LSTM(lstm_units, return_sequences=False),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(dense_units, activation='relu'),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(prediction_length)
])
return model
def create_bidirectional_lstm_model(sequence_length, n_features=1, lstm_units=50,
dense_units=25, dropout_rate=0.2, prediction_length=1):
"""
Create bidirectional LSTM model
"""
model = keras.Sequential([
keras.layers.Bidirectional(
keras.layers.LSTM(lstm_units, return_sequences=True),
input_shape=(sequence_length, n_features)
),
keras.layers.Dropout(dropout_rate),
keras.layers.Bidirectional(
keras.layers.LSTM(lstm_units, return_sequences=False)
),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(dense_units, activation='relu'),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(prediction_length)
])
return model
# Create LSTM model
lstm_model = create_lstm_model(sequence_length, n_features=1)
bilstm_model = create_bidirectional_lstm_model(sequence_length, n_features=1)
print("LSTM model structure:")
lstm_model.summary()GRU Model
python
def create_gru_model(sequence_length, n_features=1, gru_units=50,
dense_units=25, dropout_rate=0.2, prediction_length=1):
"""
Create GRU time series prediction model
"""
model = keras.Sequential([
keras.layers.GRU(gru_units, return_sequences=True,
input_shape=(sequence_length, n_features)),
keras.layers.Dropout(dropout_rate),
keras.layers.GRU(gru_units, return_sequences=False),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(dense_units, activation='relu'),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(prediction_length)
])
return model
def create_stacked_gru_model(sequence_length, n_features=1, gru_units=[50, 50, 25],
dropout_rate=0.2, prediction_length=1):
"""
Create stacked GRU model
"""
model = keras.Sequential()
# First GRU layer
model.add(keras.layers.GRU(
gru_units[0],
return_sequences=True,
input_shape=(sequence_length, n_features)
))
model.add(keras.layers.Dropout(dropout_rate))
# Middle GRU layers
for units in gru_units[1:-1]:
model.add(keras.layers.GRU(units, return_sequences=True))
model.add(keras.layers.Dropout(dropout_rate))
# Last GRU layer
model.add(keras.layers.GRU(gru_units[-1], return_sequences=False))
model.add(keras.layers.Dropout(dropout_rate))
# Output layer
model.add(keras.layers.Dense(prediction_length))
return model
# Create GRU model
gru_model = create_gru_model(sequence_length, n_features=1)
stacked_gru_model = create_stacked_gru_model(sequence_length, n_features=1)CNN-LSTM Hybrid Model
python
def create_cnn_lstm_model(sequence_length, n_features=1, cnn_filters=64,
kernel_size=3, lstm_units=50, dense_units=25,
dropout_rate=0.2, prediction_length=1):
"""
Create CNN-LSTM hybrid model
"""
model = keras.Sequential([
# CNN layers to extract local features
keras.layers.Conv1D(cnn_filters, kernel_size, activation='relu',
input_shape=(sequence_length, n_features)),
keras.layers.MaxPooling1D(pool_size=2),
keras.layers.Dropout(dropout_rate),
keras.layers.Conv1D(cnn_filters//2, kernel_size, activation='relu'),
keras.layers.MaxPooling1D(pool_size=2),
keras.layers.Dropout(dropout_rate),
# LSTM layer to capture temporal dependencies
keras.layers.LSTM(lstm_units, return_sequences=False),
keras.layers.Dropout(dropout_rate),
# Fully connected layers
keras.layers.Dense(dense_units, activation='relu'),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(prediction_length)
])
return model
def create_attention_lstm_model(sequence_length, n_features=1, lstm_units=50,
attention_units=32, dense_units=25,
dropout_rate=0.2, prediction_length=1):
"""
Create LSTM model with attention mechanism
"""
# Attention layer
class AttentionLayer(keras.layers.Layer):
def __init__(self, units):
super(AttentionLayer, self).__init__()
self.units = units
self.W1 = keras.layers.Dense(units)
self.W2 = keras.layers.Dense(units)
self.V = keras.layers.Dense(1)
def call(self, query, values):
# query: (batch_size, hidden_size)
# values: (batch_size, time_steps, hidden_size)
# Expand query dimension to match values
query_with_time_axis = tf.expand_dims(query, 1)
# Calculate attention scores
score = self.V(tf.nn.tanh(
self.W1(query_with_time_axis) + self.W2(values)
))
# Calculate attention weights
attention_weights = tf.nn.softmax(score, axis=1)
# Calculate context vector
context_vector = attention_weights * values
context_vector = tf.reduce_sum(context_vector, axis=1)
return context_vector, attention_weights
# Build model
inputs = keras.layers.Input(shape=(sequence_length, n_features))
# LSTM layer
lstm_out = keras.layers.LSTM(lstm_units, return_sequences=True)(inputs)
lstm_final = keras.layers.LSTM(lstm_units, return_state=True)(lstm_out)
# Attention mechanism
attention = AttentionLayer(attention_units)
context_vector, attention_weights = attention(lstm_final[1], lstm_out)
# Output layer
dense = keras.layers.Dense(dense_units, activation='relu')(context_vector)
dropout = keras.layers.Dropout(dropout_rate)(dense)
outputs = keras.layers.Dense(prediction_length)(dropout)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# Create hybrid models
cnn_lstm_model = create_cnn_lstm_model(sequence_length, n_features=1)
attention_lstm_model = create_attention_lstm_model(sequence_length, n_features=1)
print("CNN-LSTM model structure:")
cnn_lstm_model.summary()Transformer Model
python
def create_transformer_model(sequence_length, n_features=1, d_model=64,
num_heads=4, ff_dim=128, num_layers=2,
dropout_rate=0.1, prediction_length=1):
"""
Create Transformer time series prediction model
"""
inputs = keras.layers.Input(shape=(sequence_length, n_features))
# Project to d_model dimension
x = keras.layers.Dense(d_model)(inputs)
# Positional encoding
positions = tf.range(start=0, limit=sequence_length, delta=1)
position_embedding = keras.layers.Embedding(sequence_length, d_model)(positions)
x = x + position_embedding
# Transformer encoder layers
for _ in range(num_layers):
# Multi-head self-attention
attention_output = keras.layers.MultiHeadAttention(
num_heads=num_heads, key_dim=d_model
)(x, x)
# Residual connection and layer normalization
x = keras.layers.LayerNormalization()(x + attention_output)
# Feed-forward network
ffn_output = keras.layers.Dense(ff_dim, activation='relu')(x)
ffn_output = keras.layers.Dense(d_model)(ffn_output)
# Residual connection and layer normalization
x = keras.layers.LayerNormalization()(x + ffn_output)
# Dropout
x = keras.layers.Dropout(dropout_rate)(x)
# Global average pooling
x = keras.layers.GlobalAveragePooling1D()(x)
# Output layer
x = keras.layers.Dense(64, activation='relu')(x)
x = keras.layers.Dropout(dropout_rate)(x)
outputs = keras.layers.Dense(prediction_length)(x)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# Create Transformer model
transformer_model = create_transformer_model(sequence_length, n_features=1)
print("Transformer model structure:")
transformer_model.summary()Model Training
Training Configuration
python
def compile_timeseries_model(model, learning_rate=0.001):
"""
Compile time series model
"""
optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(
optimizer=optimizer,
loss='mse',
metrics=['mae', 'mape']
)
return model
def create_timeseries_callbacks(model_name, patience=10):
"""
Create time series training callbacks
"""
callbacks = [
keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=patience,
restore_best_weights=True,
verbose=1
),
keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-7,
verbose=1
),
keras.callbacks.ModelCheckpoint(
f'best_{model_name}.h5',
monitor='val_loss',
save_best_only=True,
verbose=1
)
]
return callbacks
def train_timeseries_model(model, X_train, y_train, X_test, y_test,
epochs=100, batch_size=32, model_name='timeseries'):
"""
Train time series model
"""
# Compile model
model = compile_timeseries_model(model)
# Create callbacks
callbacks = create_timeseries_callbacks(model_name)
# Train model
history = model.fit(
X_train, y_train,
epochs=epochs,
batch_size=batch_size,
validation_data=(X_test, y_test),
callbacks=callbacks,
verbose=1
)
return history
# Train LSTM model
print("Training LSTM model...")
lstm_history = train_timeseries_model(
lstm_model, X_train, y_train, X_test, y_test,
epochs=50, model_name='lstm_timeseries'
)Training Visualization
python
def plot_training_history(history, title='Model Training History'):
"""
Plot training history
"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Loss
axes[0, 0].plot(history.history['loss'], label='Training Loss')
axes[0, 0].plot(history.history['val_loss'], label='Validation Loss')
axes[0, 0].set_title('Model Loss')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('MSE')
axes[0, 0].legend()
# MAE
axes[0, 1].plot(history.history['mae'], label='Training MAE')
axes[0, 1].plot(history.history['val_mae'], label='Validation MAE')
axes[0, 1].set_title('Mean Absolute Error')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('MAE')
axes[0, 1].legend()
# MAPE
axes[1, 0].plot(history.history['mape'], label='Training MAPE')
axes[1, 0].plot(history.history['val_mape'], label='Validation MAPE')
axes[1, 0].set_title('Mean Absolute Percentage Error')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('MAPE')
axes[1, 0].legend()
# Learning rate (if recorded)
if 'lr' in history.history:
axes[1, 1].plot(history.history['lr'])
axes[1, 1].set_title('Learning Rate')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('Learning Rate')
axes[1, 1].set_yscale('log')
plt.suptitle(title)
plt.tight_layout()
plt.show()
# Visualize training history
plot_training_history(lstm_history, 'LSTM Model Training History')Model Evaluation
Prediction and Evaluation Metrics
python
def evaluate_timeseries_model(model, X_test, y_test, scaler=None):
"""
Evaluate time series model
"""
# Predictions
y_pred = model.predict(X_test)
# Inverse scaling (if scaling was used)
if scaler is not None:
y_test_orig = scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()
y_pred_orig = scaler.inverse_transform(y_pred.reshape(-1, 1)).flatten()
else:
y_test_orig = y_test.flatten()
y_pred_orig = y_pred.flatten()
# Calculate evaluation metrics
mse = mean_squared_error(y_test_orig, y_pred_orig)
mae = mean_absolute_error(y_test_orig, y_pred_orig)
rmse = np.sqrt(mse)
# MAPE (Mean Absolute Percentage Error)
mape = np.mean(np.abs((y_test_orig - y_pred_orig) / y_test_orig)) * 100
# R² score
r2 = r2_score(y_test_orig, y_pred_orig)
print(f"Evaluation metrics:")
print(f"MSE: {mse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"MAPE: {mape:.2f}%")
print(f"R²: {r2:.4f}")
return {
'mse': mse,
'mae': mae,
'rmse': rmse,
'mape': mape,
'r2': r2,
'y_true': y_test_orig,
'y_pred': y_pred_orig
}
def plot_predictions(y_true, y_pred, title='Prediction Results', n_samples=200):
"""
Plot prediction results
"""
# Only show partial samples for observation
if len(y_true) > n_samples:
indices = np.random.choice(len(y_true), n_samples, replace=False)
indices = np.sort(indices)
y_true_plot = y_true[indices]
y_pred_plot = y_pred[indices]
else:
y_true_plot = y_true
y_pred_plot = y_pred
indices = np.arange(len(y_true))
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Time series comparison
axes[0, 0].plot(indices, y_true_plot, label='True Values', alpha=0.7)
axes[0, 0].plot(indices, y_pred_plot, label='Predicted Values', alpha=0.7)
axes[0, 0].set_title('Predicted vs True Values')
axes[0, 0].set_xlabel('Time')
axes[0, 0].set_ylabel('Value')
axes[0, 0].legend()
# Scatter plot
axes[0, 1].scatter(y_true, y_pred, alpha=0.5)
min_val = min(y_true.min(), y_pred.min())
max_val = max(y_true.max(), y_pred.max())
axes[0, 1].plot([min_val, max_val], [min_val, max_val], 'r--', lw=2)
axes[0, 1].set_xlabel('True Values')
axes[0, 1].set_ylabel('Predicted Values')
axes[0, 1].set_title('Predicted vs True Values Scatter Plot')
# Residual plot
residuals = y_true - y_pred
axes[1, 0].scatter(y_pred, residuals, alpha=0.5)
axes[1, 0].axhline(y=0, color='r', linestyle='--')
axes[1, 0].set_xlabel('Predicted Values')
axes[1, 0].set_ylabel('Residuals')
axes[1, 0].set_title('Residual Plot')
# Residual distribution
axes[1, 1].hist(residuals, bins=30, alpha=0.7)
axes[1, 1].set_xlabel('Residuals')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].set_title('Residual Distribution')
plt.suptitle(title)
plt.tight_layout()
plt.show()
# Evaluate LSTM model
lstm_results = evaluate_timeseries_model(lstm_model, X_test, y_test, scaler)
plot_predictions(lstm_results['y_true'], lstm_results['y_pred'], 'LSTM Model Prediction Results')Multi-step Prediction
python
def multi_step_prediction(model, initial_sequence, n_steps, scaler=None):
"""
Multi-step prediction
"""
predictions = []
current_sequence = initial_sequence.copy()
for _ in range(n_steps):
# Predict next value
next_pred = model.predict(current_sequence.reshape(1, -1, 1), verbose=0)
predictions.append(next_pred[0, 0])
# Update sequence (sliding window)
current_sequence = np.append(current_sequence[1:], next_pred[0, 0])
predictions = np.array(predictions)
# Inverse scaling
if scaler is not None:
predictions = scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()
return predictions
def plot_multi_step_prediction(model, X_test, y_test, scaler, n_steps=30, sample_idx=0):
"""
Visualize multi-step prediction
"""
# Select a test sample
initial_sequence = X_test[sample_idx].flatten()
# Perform multi-step prediction
predictions = multi_step_prediction(model, initial_sequence, n_steps, scaler)
# Get actual subsequent values
if sample_idx + n_steps < len(y_test):
true_values = y_test[sample_idx:sample_idx + n_steps]
if scaler is not None:
true_values = scaler.inverse_transform(true_values.reshape(-1, 1)).flatten()
else:
true_values = None
# Plot
plt.figure(figsize=(12, 6))
# Historical data
if scaler is not None:
history = scaler.inverse_transform(initial_sequence.reshape(-1, 1)).flatten()
else:
history = initial_sequence
history_x = np.arange(-len(history), 0)
pred_x = np.arange(0, n_steps)
plt.plot(history_x, history, label='Historical Data', color='blue')
plt.plot(pred_x, predictions, label='Predicted Values', color='red', marker='o')
if true_values is not None:
plt.plot(pred_x[:len(true_values)], true_values,
label='True Values', color='green', marker='s')
plt.axvline(x=0, color='black', linestyle='--', alpha=0.5)
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.title(f'{n_steps}-step Prediction')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# Multi-step prediction example
plot_multi_step_prediction(lstm_model, X_test, y_test, scaler, n_steps=30)Model Comparison
Multi-model Comparison
python
def compare_models(models, model_names, X_train, y_train, X_test, y_test, scaler=None):
"""
Compare performance of multiple models
"""
results = {}
for model, name in zip(models, model_names):
print(f"\nTraining {name} model...")
# Compile and train model
model = compile_timeseries_model(model)
# Simplified training (reduce epochs to save time)
history = model.fit(
X_train, y_train,
epochs=20,
batch_size=32,
validation_data=(X_test, y_test),
verbose=0
)
# Evaluate model
model_results = evaluate_timeseries_model(model, X_test, y_test, scaler)
results[name] = model_results
return results
def plot_model_comparison(results):
"""
Visualize model comparison results
"""
model_names = list(results.keys())
metrics = ['mse', 'mae', 'rmse', 'mape', 'r2']
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
axes = axes.flatten()
for i, metric in enumerate(metrics):
values = [results[name][metric] for name in model_names]
bars = axes[i].bar(model_names, values)
axes[i].set_title(f'{metric.upper()}')
axes[i].set_ylabel(metric.upper())
# Add value labels
for bar, value in zip(bars, values):
height = bar.get_height()
axes[i].text(bar.get_x() + bar.get_width()/2., height,
f'{value:.3f}', ha='center', va='bottom')
# Rotate x-axis labels
axes[i].tick_params(axis='x', rotation=45)
# Hide last subplot
axes[-1].axis('off')
plt.tight_layout()
plt.show()
# Create multiple models for comparison
models_to_compare = [
create_lstm_model(sequence_length, n_features=1),
create_gru_model(sequence_length, n_features=1),
create_cnn_lstm_model(sequence_length, n_features=1),
]
model_names = ['LSTM', 'GRU', 'CNN-LSTM']
# Compare models (Note: This will retrain models)
# comparison_results = compare_models(models_to_compare, model_names, X_train, y_train, X_test, y_test, scaler)
# plot_model_comparison(comparison_results)Real-time Prediction System
Online Prediction
python
class TimeSeriesPredictor:
"""
Time series predictor class
"""
def __init__(self, model, scaler, sequence_length):
self.model = model
self.scaler = scaler
self.sequence_length = sequence_length
self.history = []
def add_data_point(self, value):
"""
Add new data point
"""
self.history.append(value)
# Maintain history data length
if len(self.history) > self.sequence_length * 2:
self.history = self.history[-self.sequence_length * 2:]
def predict_next(self):
"""
Predict next value
"""
if len(self.history) < self.sequence_length:
raise ValueError(f"Need at least {self.sequence_length} historical data points")
# Prepare input sequence
sequence = np.array(self.history[-self.sequence_length:])
# Scale
if self.scaler is not None:
sequence_scaled = self.scaler.transform(sequence.reshape(-1, 1)).flatten()
else:
sequence_scaled = sequence
# Predict
prediction_scaled = self.model.predict(
sequence_scaled.reshape(1, self.sequence_length, 1),
verbose=0
)[0, 0]
# Inverse scaling
if self.scaler is not None:
prediction = self.scaler.inverse_transform([[prediction_scaled]])[0, 0]
else:
prediction = prediction_scaled
return prediction
def predict_multiple(self, n_steps):
"""
Predict multiple steps
"""
predictions = []
for _ in range(n_steps):
pred = self.predict_next()
predictions.append(pred)
self.add_data_point(pred) # Add predicted value to history
return predictions
def simulate_real_time_prediction(predictor, test_data, n_predictions=50):
"""
Simulate real-time prediction
"""
predictions = []
true_values = []
# Initialize history data
for i in range(predictor.sequence_length):
predictor.add_data_point(test_data[i])
# Perform real-time prediction
for i in range(predictor.sequence_length,
min(len(test_data), predictor.sequence_length + n_predictions)):
# Predict
pred = predictor.predict_next()
predictions.append(pred)
# True value
true_val = test_data[i]
true_values.append(true_val)
# Add true value to history (simulate real-time data arrival)
predictor.add_data_point(true_val)
return np.array(predictions), np.array(true_values)
# Create predictor
predictor = TimeSeriesPredictor(lstm_model, scaler, sequence_length)
# Simulate real-time prediction
if scaler is not None:
test_data_orig = scaler.inverse_transform(
ts_data['value'].values[-200:].reshape(-1, 1)
).flatten()
else:
test_data_orig = ts_data['value'].values[-200:]
rt_predictions, rt_true_values = simulate_real_time_prediction(
predictor, test_data_orig, n_predictions=50
)
# Visualize real-time prediction results
plt.figure(figsize=(12, 6))
plt.plot(rt_true_values, label='True Values', marker='o')
plt.plot(rt_predictions, label='Predicted Values', marker='s')
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.title('Real-time Prediction Results')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# Calculate real-time prediction error
rt_mae = mean_absolute_error(rt_true_values, rt_predictions)
rt_mape = np.mean(np.abs((rt_true_values - rt_predictions) / rt_true_values)) * 100
print(f"Real-time prediction MAE: {rt_mae:.4f}")
print(f"Real-time prediction MAPE: {rt_mape:.2f}%")Model Deployment
Saving and Loading Models
python
def save_timeseries_model(model, scaler, model_path):
"""
Save time series model and preprocessor
"""
import pickle
# Save model
model.save(f'{model_path}.h5')
# Save scaler
if scaler is not None:
with open(f'{model_path}_scaler.pkl', 'wb') as f:
pickle.dump(scaler, f)
print(f"Model saved to: {model_path}")
def load_timeseries_model(model_path):
"""
Load time series model and preprocessor
"""
import pickle
# Load model
model = keras.models.load_model(f'{model_path}.h5')
# Load scaler
try:
with open(f'{model_path}_scaler.pkl', 'rb') as f:
scaler = pickle.load(f)
except FileNotFoundError:
scaler = None
return model, scaler
# Save model
save_timeseries_model(lstm_model, scaler, 'lstm_timeseries_model')Web API Deployment
python
def create_timeseries_api(model, scaler, sequence_length):
"""
Create time series prediction Web API
"""
from flask import Flask, request, jsonify
app = Flask(__name__)
predictor = TimeSeriesPredictor(model, scaler, sequence_length)
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.get_json()
if 'sequence' not in data:
return jsonify({'error': 'Missing sequence field'}), 400
sequence = data['sequence']
if len(sequence) < sequence_length:
return jsonify({
'error': f'Sequence length needs at least {sequence_length}'
}), 400
# Set history data
predictor.history = sequence[-sequence_length:]
# Predict
prediction = predictor.predict_next()
return jsonify({
'prediction': float(prediction),
'sequence_length': len(sequence)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/predict_multiple', methods=['POST'])
def predict_multiple():
try:
data = request.get_json()
if 'sequence' not in data or 'n_steps' not in data:
return jsonify({'error': 'Missing required fields'}), 400
sequence = data['sequence']
n_steps = data['n_steps']
if len(sequence) < sequence_length:
return jsonify({
'error': f'Sequence length needs at least {sequence_length}'
}), 400
# Set history data
predictor.history = sequence[-sequence_length:]
# Multi-step prediction
predictions = predictor.predict_multiple(n_steps)
return jsonify({
'predictions': [float(p) for p in predictions],
'n_steps': n_steps
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
return app
# Create API
# api_app = create_timeseries_api(lstm_model, scaler, sequence_length)
# api_app.run(host='0.0.0.0', port=5000, debug=True)Summary
This chapter demonstrated application of deep learning in time series data analysis through a complete time series prediction project:
Key Points:
- Data Understanding: Trend, seasonality, cyclicity analysis of time series
- Feature Engineering: Lag features, moving averages, time features
- Model Selection: LSTM, GRU, CNN-LSTM, Transformer, etc.
- Evaluation Metrics: MSE, MAE, MAPE, R², etc., time series specific metrics
- Multi-step Prediction: Recursive and direct prediction strategies
- Real-time Systems: Online prediction and streaming processing
- Model Deployment: API services and production environment considerations
Best Practices:
- Fully analyze time series characteristics
- Choose appropriate sequence length and prediction window
- Use appropriate data scaling and normalization
- Consider model computational complexity and prediction latency
- Implement model monitoring and automatic retraining
- Handle missing values and outliers
- Consider external factors and multivariate prediction
Time series prediction has wide applications in finance, retail, manufacturing, and other fields. Mastering these techniques is very important for data scientists.