PyTorch 时间序列预测

时间序列预测概述

时间序列预测是根据历史数据预测未来值的重要任务。PyTorch提供了强大的工具来构建各种时间序列预测模型,从简单的LSTM到复杂的Transformer架构。

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
from torch.utils.data import Dataset, DataLoader

数据预处理

1. 时间序列数据集类

class TimeSeriesDataset(Dataset):
    def __init__(self, data, sequence_length, prediction_length=1):
        """
        时间序列数据集
        Args:
            data: 时间序列数据 (numpy array)
            sequence_length: 输入序列长度
            prediction_length: 预测长度
        """
        self.data = data
        self.sequence_length = sequence_length
        self.prediction_length = prediction_length
        
    def __len__(self):
        return len(self.data) - self.sequence_length - self.prediction_length + 1
    
    def __getitem__(self, idx):
        # 输入序列
        x = self.data[idx:idx + self.sequence_length]
        # 目标序列
        y = self.data[idx + self.sequence_length:idx + self.sequence_length + self.prediction_length]
        
        return torch.FloatTensor(x), torch.FloatTensor(y)

def create_time_series_data(data, sequence_length=60, prediction_length=1, 
                           train_ratio=0.8, val_ratio=0.1):
    """创建时间序列数据集"""
    
    # 数据标准化
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(data.reshape(-1, 1)).flatten()
    
    # 计算分割点
    total_len = len(scaled_data)
    train_len = int(total_len * train_ratio)
    val_len = int(total_len * val_ratio)
    
    # 分割数据
    train_data = scaled_data[:train_len]
    val_data = scaled_data[train_len:train_len + val_len]
    test_data = scaled_data[train_len + val_len:]
    
    # 创建数据集
    train_dataset = TimeSeriesDataset(train_data, sequence_length, prediction_length)
    val_dataset = TimeSeriesDataset(val_data, sequence_length, prediction_length)
    test_dataset = TimeSeriesDataset(test_data, sequence_length, prediction_length)
    
    return train_dataset, val_dataset, test_dataset, scaler

# 生成示例数据
def generate_sine_wave_data(length=1000, frequency=0.1, noise_level=0.1):
    """生成正弦波数据"""
    t = np.linspace(0, length * frequency, length)
    data = np.sin(2 * np.pi * t) + noise_level * np.random.randn(length)
    return data

def generate_stock_like_data(length=1000, trend=0.001, volatility=0.02):
    """生成类似股价的数据"""
    returns = np.random.normal(trend, volatility, length)
    prices = np.exp(np.cumsum(returns)) * 100
    return prices

# 示例数据
sine_data = generate_sine_wave_data(1000)
stock_data = generate_stock_like_data(1000)

# 创建数据集
train_dataset, val_dataset, test_dataset, scaler = create_time_series_data(
    sine_data, sequence_length=60, prediction_length=1
)

print(f"训练集大小: {len(train_dataset)}")
print(f"验证集大小: {len(val_dataset)}")
print(f"测试集大小: {len(test_dataset)}")

2. 多变量时间序列

class MultiVariateTimeSeriesDataset(Dataset):
    def __init__(self, data, sequence_length, prediction_length=1, target_column=0):
        """
        多变量时间序列数据集
        Args:
            data: 多变量时间序列数据 (numpy array, shape: [time_steps, features])
            sequence_length: 输入序列长度
            prediction_length: 预测长度
            target_column: 目标变量的列索引
        """
        self.data = data
        self.sequence_length = sequence_length
        self.prediction_length = prediction_length
        self.target_column = target_column
        
    def __len__(self):
        return len(self.data) - self.sequence_length - self.prediction_length + 1
    
    def __getitem__(self, idx):
        # 输入序列(所有特征)
        x = self.data[idx:idx + self.sequence_length]
        # 目标序列(只有目标变量)
        y = self.data[idx + self.sequence_length:idx + self.sequence_length + self.prediction_length, 
                     self.target_column]
        
        return torch.FloatTensor(x), torch.FloatTensor(y)

def create_multivariate_data(n_samples=1000, n_features=5):
    """创建多变量时间序列数据"""
    t = np.linspace(0, 10, n_samples)
    
    # 创建相关的多变量数据
    data = np.zeros((n_samples, n_features))
    
    # 主要趋势
    trend = 0.1 * t + np.sin(0.5 * t)
    
    for i in range(n_features):
        # 每个特征都与主要趋势相关,但有不同的相位和噪声
        phase = i * np.pi / n_features
        noise = 0.1 * np.random.randn(n_samples)
        data[:, i] = trend + 0.5 * np.sin(t + phase) + noise
    
    return data

# 创建多变量数据
multivar_data = create_multivariate_data(1000, 5)
multivar_dataset = MultiVariateTimeSeriesDataset(
    multivar_data, sequence_length=60, prediction_length=1, target_column=0
)

LSTM时间序列模型

1. 基础LSTM模型

class LSTMPredictor(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, 
                 output_size=1, dropout=0.2):
        super(LSTMPredictor, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 输出层
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # LSTM前向传播
        lstm_out, (hidden, cell) = self.lstm(x)
        
        # 使用最后一个时间步的输出
        last_output = lstm_out[:, -1, :]
        
        # 应用dropout和全连接层
        output = self.dropout(last_output)
        output = self.fc(output)
        
        return output

# 创建模型
model = LSTMPredictor(input_size=1, hidden_size=50, num_layers=2, output_size=1)

# 测试模型
sample_input = torch.randn(32, 60, 1)  # (batch_size, sequence_length, input_size)
sample_output = model(sample_input)
print(f"输入形状: {sample_input.shape}")
print(f"输出形状: {sample_output.shape}")

2. 双向LSTM模型

class BiLSTMPredictor(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, 
                 output_size=1, dropout=0.2):
        super(BiLSTMPredictor, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # 双向LSTM
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True
        )
        
        # 注意力机制
        self.attention = nn.Linear(hidden_size * 2, 1)
        
        # 输出层
        self.fc = nn.Linear(hidden_size * 2, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # 双向LSTM
        lstm_out, _ = self.lstm(x)  # (batch_size, seq_len, hidden_size * 2)
        
        # 注意力机制
        attention_weights = torch.softmax(self.attention(lstm_out), dim=1)
        context_vector = torch.sum(attention_weights * lstm_out, dim=1)
        
        # 输出
        output = self.dropout(context_vector)
        output = self.fc(output)
        
        return output

3. 多步预测LSTM

class MultiStepLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, 
                 output_size=10, dropout=0.2):
        super(MultiStepLSTM, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size
        
        # 编码器LSTM
        self.encoder_lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 解码器LSTM
        self.decoder_lstm = nn.LSTM(
            input_size=1,  # 解码器输入维度
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 输出层
        self.fc = nn.Linear(hidden_size, 1)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, target_length=None):
        batch_size = x.size(0)
        
        # 编码器
        encoder_out, (hidden, cell) = self.encoder_lstm(x)
        
        # 解码器
        if target_length is None:
            target_length = self.output_size
        
        decoder_input = torch.zeros(batch_size, 1, 1, device=x.device)
        decoder_hidden = (hidden, cell)
        
        outputs = []
        
        for _ in range(target_length):
            decoder_out, decoder_hidden = self.decoder_lstm(decoder_input, decoder_hidden)
            output = self.fc(self.dropout(decoder_out))
            outputs.append(output)
            
            # 使用当前输出作为下一步的输入
            decoder_input = output
        
        # 拼接所有输出
        outputs = torch.cat(outputs, dim=1)  # (batch_size, target_length, 1)
        
        return outputs.squeeze(-1)  # (batch_size, target_length)

GRU时间序列模型

1. GRU预测器

class GRUPredictor(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, 
                 output_size=1, dropout=0.2):
        super(GRUPredictor, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # GRU层
        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 输出层
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # GRU前向传播
        gru_out, hidden = self.gru(x)
        
        # 使用最后一个时间步的输出
        last_output = gru_out[:, -1, :]
        
        # 输出
        output = self.dropout(last_output)
        output = self.fc(output)
        
        return output

Transformer时间序列模型

1. 时间序列Transformer

class TimeSeriesTransformer(nn.Module):
    def __init__(self, input_size=1, d_model=64, nhead=8, num_layers=6, 
                 output_size=1, max_seq_length=1000, dropout=0.1):
        super(TimeSeriesTransformer, self).__init__()
        
        self.d_model = d_model
        self.input_projection = nn.Linear(input_size, d_model)
        
        # 位置编码
        self.pos_encoding = PositionalEncoding(d_model, max_seq_length)
        
        # Transformer编码器
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=d_model * 4,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        # 输出层
        self.output_projection = nn.Linear(d_model, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # 输入投影
        x = self.input_projection(x) * math.sqrt(self.d_model)
        
        # 位置编码
        x = self.pos_encoding(x)
        
        # Transformer编码
        transformer_out = self.transformer(x)
        
        # 使用最后一个时间步的输出
        last_output = transformer_out[:, -1, :]
        
        # 输出投影
        output = self.dropout(last_output)
        output = self.output_projection(output)
        
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                           (-math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        return x + self.pe[:x.size(1), :].transpose(0, 1)

训练框架

1. 时间序列训练器

class TimeSeriesTrainer:
    def __init__(self, model, train_loader, val_loader, device, learning_rate=0.001):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.device = device
        
        # 损失函数和优化器
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            self.optimizer, mode='min', patience=10, factor=0.5
        )
        
        # 训练历史
        self.train_losses = []
        self.val_losses = []
        self.best_val_loss = float('inf')
        
    def train_epoch(self):
        """训练一个epoch"""
        self.model.train()
        total_loss = 0
        
        for batch_idx, (data, target) in enumerate(self.train_loader):
            data, target = data.to(self.device), target.to(self.device)
            
            self.optimizer.zero_grad()
            
            # 前向传播
            output = self.model(data)
            
            # 确保输出和目标的形状匹配
            if output.dim() != target.dim():
                if target.dim() == 1:
                    target = target.unsqueeze(-1)
                elif output.dim() == 1:
                    output = output.unsqueeze(-1)
            
            loss = self.criterion(output, target)
            
            # 反向传播
            loss.backward()
            
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            
            self.optimizer.step()
            
            total_loss += loss.item()
        
        return total_loss / len(self.train_loader)
    
    def validate_epoch(self):
        """验证一个epoch"""
        self.model.eval()
        total_loss = 0
        
        with torch.no_grad():
            for data, target in self.val_loader:
                data, target = data.to(self.device), target.to(self.device)
                
                output = self.model(data)
                
                # 确保形状匹配
                if output.dim() != target.dim():
                    if target.dim() == 1:
                        target = target.unsqueeze(-1)
                    elif output.dim() == 1:
                        output = output.unsqueeze(-1)
                
                loss = self.criterion(output, target)
                total_loss += loss.item()
        
        return total_loss / len(self.val_loader)
    
    def train(self, num_epochs):
        """完整训练流程"""
        print(f"开始训练,共{num_epochs}个epoch")
        
        for epoch in range(num_epochs):
            # 训练
            train_loss = self.train_epoch()
            
            # 验证
            val_loss = self.validate_epoch()
            
            # 更新学习率
            self.scheduler.step(val_loss)
            
            # 记录历史
            self.train_losses.append(train_loss)
            self.val_losses.append(val_loss)
            
            # 保存最佳模型
            if val_loss < self.best_val_loss:
                self.best_val_loss = val_loss
                torch.save(self.model.state_dict(), 'best_model.pth')
            
            # 打印进度
            if (epoch + 1) % 10 == 0:
                print(f'Epoch {epoch+1}/{num_epochs}:')
                print(f'  Train Loss: {train_loss:.6f}')
                print(f'  Val Loss: {val_loss:.6f}')
                print(f'  LR: {self.optimizer.param_groups[0]["lr"]:.8f}')
        
        print(f'训练完成! 最佳验证损失: {self.best_val_loss:.6f}')
        
        return self.train_losses, self.val_losses

# 使用示例
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# 创建模型
model = LSTMPredictor(input_size=1, hidden_size=50, num_layers=2, output_size=1)

# 创建训练器
trainer = TimeSeriesTrainer(model, train_loader, val_loader, device)

# 训练模型
train_losses, val_losses = trainer.train(num_epochs=100)

模型评估

1. 预测和评估

def evaluate_model(model, test_loader, scaler, device):
    """评估模型性能"""
    model.eval()
    predictions = []
    actuals = []
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            
            output = model(data)
            
            # 转换回CPU并添加到列表
            predictions.extend(output.cpu().numpy())
            actuals.extend(target.cpu().numpy())
    
    # 转换为numpy数组
    predictions = np.array(predictions)
    actuals = np.array(actuals)
    
    # 反标准化
    predictions = scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()
    actuals = scaler.inverse_transform(actuals.reshape(-1, 1)).flatten()
    
    # 计算指标
    mse = mean_squared_error(actuals, predictions)
    mae = mean_absolute_error(actuals, predictions)
    rmse = np.sqrt(mse)
    
    # 计算MAPE
    mape = np.mean(np.abs((actuals - predictions) / actuals)) * 100
    
    print(f"评估结果:")
    print(f"  MSE: {mse:.6f}")
    print(f"  MAE: {mae:.6f}")
    print(f"  RMSE: {rmse:.6f}")
    print(f"  MAPE: {mape:.2f}%")
    
    return predictions, actuals, {
        'mse': mse, 'mae': mae, 'rmse': rmse, 'mape': mape
    }

# 评估模型
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
model.load_state_dict(torch.load('best_model.pth'))

predictions, actuals, metrics = evaluate_model(model, test_loader, scaler, device)

2. 可视化结果

def plot_predictions(actuals, predictions, title="时间序列预测结果"):
    """可视化预测结果"""
    plt.figure(figsize=(15, 6))
    
    # 只显示前200个点以便观察
    n_points = min(200, len(actuals))
    
    plt.plot(range(n_points), actuals[:n_points], label='实际值', color='blue', alpha=0.7)
    plt.plot(range(n_points), predictions[:n_points], label='预测值', color='red', alpha=0.7)
    
    plt.title(title)
    plt.xlabel('时间步')
    plt.ylabel('值')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

def plot_training_history(train_losses, val_losses):
    """可视化训练历史"""
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='训练损失')
    plt.plot(val_losses, label='验证损失')
    plt.title('训练历史')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 2, 2)
    plt.plot(train_losses, label='训练损失')
    plt.plot(val_losses, label='验证损失')
    plt.title('训练历史 (对数尺度)')
    plt.xlabel('Epoch')
    plt.ylabel('Loss (log scale)')
    plt.yscale('log')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# 可视化结果
plot_predictions(actuals, predictions)
plot_training_history(train_losses, val_losses)

高级技术

1. 注意力机制

class AttentionLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, 
                 output_size=1, dropout=0.2):
        super(AttentionLSTM, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 注意力机制
        self.attention = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, 1)
        )
        
        # 输出层
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # LSTM输出
        lstm_out, _ = self.lstm(x)  # (batch_size, seq_len, hidden_size)
        
        # 计算注意力权重
        attention_scores = self.attention(lstm_out)  # (batch_size, seq_len, 1)
        attention_weights = torch.softmax(attention_scores, dim=1)
        
        # 加权求和
        context_vector = torch.sum(attention_weights * lstm_out, dim=1)  # (batch_size, hidden_size)
        
        # 输出
        output = self.dropout(context_vector)
        output = self.fc(output)
        
        return output, attention_weights

2. 残差连接

class ResidualLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=4, 
                 output_size=1, dropout=0.2):
        super(ResidualLSTM, self).__init__()
        
        self.input_projection = nn.Linear(input_size, hidden_size)
        
        # 多个LSTM层与残差连接
        self.lstm_layers = nn.ModuleList()
        for i in range(num_layers):
            self.lstm_layers.append(
                nn.LSTM(hidden_size, hidden_size, 1, batch_first=True, dropout=0)
            )
        
        self.layer_norms = nn.ModuleList([
            nn.LayerNorm(hidden_size) for _ in range(num_layers)
        ])
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        # 输入投影
        x = self.input_projection(x)
        
        # 通过多个LSTM层
        for i, (lstm, norm) in enumerate(zip(self.lstm_layers, self.layer_norms)):
            residual = x
            lstm_out, _ = lstm(x)
            
            # 残差连接和层归一化
            x = norm(lstm_out + residual)
            x = self.dropout(x)
        
        # 输出
        last_output = x[:, -1, :]
        output = self.fc(last_output)
        
        return output

3. 多尺度特征提取

class MultiScaleLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, output_size=1, 
                 scales=[1, 3, 5], dropout=0.2):
        super(MultiScaleLSTM, self).__init__()
        
        self.scales = scales
        
        # 不同尺度的LSTM
        self.lstm_layers = nn.ModuleList()
        for scale in scales:
            self.lstm_layers.append(
                nn.LSTM(input_size, hidden_size, 2, batch_first=True, dropout=dropout)
            )
        
        # 融合层
        self.fusion = nn.Linear(len(scales) * hidden_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        scale_outputs = []
        
        for i, (scale, lstm) in enumerate(zip(self.scales, self.lstm_layers)):
            # 多尺度采样
            if scale > 1:
                # 简单的下采样
                sampled_x = x[:, ::scale, :]
            else:
                sampled_x = x
            
            # LSTM处理
            lstm_out, _ = lstm(sampled_x)
            last_output = lstm_out[:, -1, :]
            scale_outputs.append(last_output)
        
        # 融合不同尺度的特征
        fused = torch.cat(scale_outputs, dim=1)
        fused = self.fusion(fused)
        fused = torch.relu(fused)
        fused = self.dropout(fused)
        
        # 输出
        output = self.fc(fused)
        
        return output

实际应用示例

1. 股价预测

def create_stock_prediction_pipeline():
    """创建股价预测管道"""
    
    # 生成模拟股价数据
    stock_data = generate_stock_like_data(2000, trend=0.0005, volatility=0.02)
    
    # 创建数据集
    train_dataset, val_dataset, test_dataset, scaler = create_time_series_data(
        stock_data, sequence_length=60, prediction_length=1
    )
    
    # 创建数据加载器
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
    
    # 创建模型
    model = AttentionLSTM(input_size=1, hidden_size=64, num_layers=3, output_size=1)
    
    # 训练
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    trainer = TimeSeriesTrainer(model, train_loader, val_loader, device, learning_rate=0.001)
    
    train_losses, val_losses = trainer.train(num_epochs=100)
    
    # 评估
    model.load_state_dict(torch.load('best_model.pth'))
    predictions, actuals, metrics = evaluate_model(model, test_loader, scaler, device)
    
    return model, predictions, actuals, metrics

# 运行股价预测示例
# model, predictions, actuals, metrics = create_stock_prediction_pipeline()

2. 多步预测

def multi_step_prediction_example():
    """多步预测示例"""
    
    # 创建多步预测数据集
    class MultiStepDataset(Dataset):
        def __init__(self, data, input_length, output_length):
            self.data = data
            self.input_length = input_length
            self.output_length = output_length
            
        def __len__(self):
            return len(self.data) - self.input_length - self.output_length + 1
        
        def __getitem__(self, idx):
            x = self.data[idx:idx + self.input_length]
            y = self.data[idx + self.input_length:idx + self.input_length + self.output_length]
            return torch.FloatTensor(x), torch.FloatTensor(y)
    
    # 生成数据
    data = generate_sine_wave_data(1000)
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(data.reshape(-1, 1)).flatten()
    
    # 创建数据集
    dataset = MultiStepDataset(scaled_data, input_length=60, output_length=10)
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    
    # 创建多步预测模型
    model = MultiStepLSTM(input_size=1, hidden_size=64, num_layers=2, output_size=10)
    
    # 训练
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    trainer = TimeSeriesTrainer(model, train_loader, val_loader, device)
    
    train_losses, val_losses = trainer.train(num_epochs=50)
    
    return model, scaler

# 运行多步预测示例
# multi_step_model, multi_step_scaler = multi_step_prediction_example()

总结

时间序列预测是PyTorch的重要应用领域,本章介绍了:

  1. 数据预处理:时间序列数据集的创建和预处理技术
  2. 经典模型:LSTM、GRU等循环神经网络模型
  3. 现代架构:Transformer等注意力机制模型
  4. 训练框架:完整的训练、验证、评估流程
  5. 高级技术:注意力机制、残差连接、多尺度特征提取
  6. 实际应用:股价预测、多步预测等具体案例

掌握这些技术将帮助你在金融预测、需求预测、异常检测等时间序列相关任务中取得成功!