Skip to content

Pandas 性能优化

概述

在处理大规模数据时,Pandas 的性能优化变得至关重要。本章将深入探讨各种优化技巧,包括内存优化、计算优化、数据类型优化、并行处理等,帮助你构建高效的数据处理流程。

1. 内存优化

1.1 数据类型优化

python
import pandas as pd
import numpy as np
import time
import psutil
import os
from memory_profiler import profile

print("=== 数据类型优化 ===")

def memory_usage_analysis():
    """内存使用分析"""
    
    # 创建示例数据
    n = 1000000
    np.random.seed(42)
    
    # 原始数据(未优化)
    df_original = pd.DataFrame({
        'int_col': np.random.randint(0, 100, n),
        'float_col': np.random.random(n),
        'category_col': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
        'bool_col': np.random.choice([True, False], n),
        'date_col': pd.date_range('2020-01-01', periods=n, freq='1min')
    })
    
    print("原始数据内存使用:")
    print(df_original.info(memory_usage='deep'))
    original_memory = df_original.memory_usage(deep=True).sum() / 1024**2
    print(f"总内存使用: {original_memory:.2f} MB\n")
    
    # 优化后的数据
    df_optimized = df_original.copy()
    
    # 1. 整数类型优化
    df_optimized['int_col'] = pd.to_numeric(df_optimized['int_col'], downcast='integer')
    
    # 2. 浮点数类型优化
    df_optimized['float_col'] = pd.to_numeric(df_optimized['float_col'], downcast='float')
    
    # 3. 分类数据优化
    df_optimized['category_col'] = df_optimized['category_col'].astype('category')
    
    # 4. 布尔类型已经是最优的
    
    print("优化后数据内存使用:")
    print(df_optimized.info(memory_usage='deep'))
    optimized_memory = df_optimized.memory_usage(deep=True).sum() / 1024**2
    print(f"总内存使用: {optimized_memory:.2f} MB")
    print(f"内存节省: {((original_memory - optimized_memory) / original_memory * 100):.1f}%\n")
    
    return df_original, df_optimized

df_orig, df_opt = memory_usage_analysis()

1.2 分块处理大文件

python
print("=== 分块处理大文件 ===")

def chunked_processing_demo():
    """分块处理演示"""
    
    # 创建大型CSV文件用于演示
    large_file = 'large_dataset.csv'
    
    # 生成大型数据集
    print("生成大型数据集...")
    chunk_size = 50000
    total_rows = 500000
    
    # 分批写入大文件
    for i in range(0, total_rows, chunk_size):
        chunk_data = pd.DataFrame({
            'id': range(i, min(i + chunk_size, total_rows)),
            'value1': np.random.randint(1, 1000, min(chunk_size, total_rows - i)),
            'value2': np.random.random(min(chunk_size, total_rows - i)),
            'category': np.random.choice(['A', 'B', 'C', 'D'], min(chunk_size, total_rows - i)),
            'date': pd.date_range('2020-01-01', periods=min(chunk_size, total_rows - i), freq='1H')
        })
        
        # 第一次写入包含header,后续追加
        chunk_data.to_csv(large_file, mode='w' if i == 0 else 'a', 
                          header=(i == 0), index=False)
    
    print(f"生成文件大小: {os.path.getsize(large_file) / 1024**2:.2f} MB")
    
    # 方法1: 一次性读取(内存密集)
    print("\n方法1: 一次性读取")
    start_time = time.time()
    try:
        df_full = pd.read_csv(large_file)
        full_load_time = time.time() - start_time
        print(f"读取时间: {full_load_time:.2f} 秒")
        print(f"内存使用: {df_full.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
        
        # 执行聚合操作
        result_full = df_full.groupby('category')['value1'].sum()
        print(f"聚合结果: {result_full.to_dict()}")
        
    except MemoryError:
        print("内存不足,无法一次性读取")
    
    # 方法2: 分块读取(内存友好)
    print("\n方法2: 分块读取")
    start_time = time.time()
    
    chunk_results = []
    chunk_reader = pd.read_csv(large_file, chunksize=10000)
    
    for i, chunk in enumerate(chunk_reader):
        # 对每个块进行处理
        chunk_result = chunk.groupby('category')['value1'].sum()
        chunk_results.append(chunk_result)
        
        if i % 10 == 0:
            print(f"处理了 {(i+1)*10000} 行")
    
    # 合并所有块的结果
    final_result = pd.concat(chunk_results).groupby(level=0).sum()
    chunk_load_time = time.time() - start_time
    
    print(f"分块处理时间: {chunk_load_time:.2f} 秒")
    print(f"聚合结果: {final_result.to_dict()}")
    
    # 清理临时文件
    if os.path.exists(large_file):
        os.remove(large_file)
    
    return final_result

chunk_result = chunked_processing_demo()

1.3 内存监控和分析

python
print("\n=== 内存监控和分析 ===")

def memory_monitoring():
    """内存监控工具"""
    
    def get_memory_usage():
        """获取当前内存使用情况"""
        process = psutil.Process(os.getpid())
        memory_info = process.memory_info()
        return memory_info.rss / 1024**2  # MB
    
    def memory_profiler_demo():
        """内存分析演示"""
        print(f"初始内存使用: {get_memory_usage():.2f} MB")
        
        # 创建大型DataFrame
        n = 100000
        df = pd.DataFrame({
            'A': np.random.randint(0, 1000, n),
            'B': np.random.random(n),
            'C': ['category_' + str(i % 100) for i in range(n)]
        })
        
        print(f"创建DataFrame后内存使用: {get_memory_usage():.2f} MB")
        
        # 数据类型优化
        df['A'] = pd.to_numeric(df['A'], downcast='integer')
        df['B'] = pd.to_numeric(df['B'], downcast='float')
        df['C'] = df['C'].astype('category')
        
        print(f"优化后内存使用: {get_memory_usage():.2f} MB")
        
        # 删除DataFrame释放内存
        del df
        
        print(f"删除DataFrame后内存使用: {get_memory_usage():.2f} MB")
        
        return True
    
    # 内存使用分析工具
    def analyze_dataframe_memory(df, name="DataFrame"):
        """分析DataFrame内存使用"""
        print(f"\n{name} 内存分析:")
        print("-" * 40)
        
        memory_usage = df.memory_usage(deep=True)
        total_memory = memory_usage.sum() / 1024**2
        
        print(f"总内存使用: {total_memory:.2f} MB")
        print(f"行数: {len(df):,}")
        print(f"列数: {len(df.columns)}")
        print(f"平均每行内存: {total_memory * 1024 / len(df):.2f} KB")
        
        print("\n各列内存使用:")
        for col in df.columns:
            col_memory = df[col].memory_usage(deep=True) / 1024**2
            print(f"  {col}: {col_memory:.2f} MB ({col_memory/total_memory*100:.1f}%)")
        
        print("\n数据类型:")
        for col in df.columns:
            print(f"  {col}: {df[col].dtype}")
    
    # 演示内存分析
    memory_profiler_demo()
    
    # 创建示例数据进行分析
    sample_df = pd.DataFrame({
        'integers': np.random.randint(0, 100, 10000),
        'floats': np.random.random(10000),
        'categories': np.random.choice(['A', 'B', 'C'], 10000),
        'strings': ['string_' + str(i) for i in range(10000)],
        'dates': pd.date_range('2020-01-01', periods=10000, freq='1H')
    })
    
    analyze_dataframe_memory(sample_df, "原始数据")
    
    # 优化数据类型
    sample_df_opt = sample_df.copy()
    sample_df_opt['integers'] = pd.to_numeric(sample_df_opt['integers'], downcast='integer')
    sample_df_opt['floats'] = pd.to_numeric(sample_df_opt['floats'], downcast='float')
    sample_df_opt['categories'] = sample_df_opt['categories'].astype('category')
    
    analyze_dataframe_memory(sample_df_opt, "优化后数据")
    
    return sample_df_opt

optimized_df = memory_monitoring()

2. 计算性能优化

2.1 向量化操作优化

python
print("\n=== 计算性能优化 ===")

def vectorization_optimization():
    """向量化操作优化"""
    
    # 创建测试数据
    n = 1000000
    np.random.seed(42)
    
    df = pd.DataFrame({
        'A': np.random.randint(1, 100, n),
        'B': np.random.randint(1, 100, n),
        'C': np.random.random(n),
        'D': np.random.choice(['X', 'Y', 'Z'], n)
    })
    
    print(f"测试数据: {len(df):,} 行")
    
    # 性能测试函数
    def time_operation(func, name, *args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{name}: {end_time - start_time:.4f} 秒")
        return result
    
    print("\n=== 条件计算性能对比 ===")
    
    # 方法1: 使用apply(慢)
    def apply_method(df):
        return df.apply(lambda row: row['A'] * row['C'] if row['A'] > row['B'] else row['B'] * row['C'], axis=1)
    
    # 方法2: 使用numpy.where(快)
    def numpy_where_method(df):
        return np.where(df['A'] > df['B'], df['A'] * df['C'], df['B'] * df['C'])
    
    # 方法3: 使用pandas.where(快)
    def pandas_where_method(df):
        return (df['A'] * df['C']).where(df['A'] > df['B'], df['B'] * df['C'])
    
    # 方法4: 使用布尔索引(快)
    def boolean_indexing_method(df):
        result = pd.Series(index=df.index, dtype=float)
        mask = df['A'] > df['B']
        result[mask] = df.loc[mask, 'A'] * df.loc[mask, 'C']
        result[~mask] = df.loc[~mask, 'B'] * df.loc[~mask, 'C']
        return result
    
    # 性能测试
    result1 = time_operation(apply_method, "Apply方法", df)
    result2 = time_operation(numpy_where_method, "NumPy where", df)
    result3 = time_operation(pandas_where_method, "Pandas where", df)
    result4 = time_operation(boolean_indexing_method, "布尔索引", df)
    
    print("\n=== 聚合操作性能对比 ===")
    
    # 方法1: 标准groupby
    def standard_groupby(df):
        return df.groupby('D').agg({'A': 'sum', 'B': 'mean', 'C': 'std'})
    
    # 方法2: 使用transform进行组内计算
    def transform_method(df):
        df_copy = df.copy()
        df_copy['A_sum'] = df_copy.groupby('D')['A'].transform('sum')
        df_copy['B_mean'] = df_copy.groupby('D')['B'].transform('mean')
        df_copy['C_std'] = df_copy.groupby('D')['C'].transform('std')
        return df_copy[['D', 'A_sum', 'B_mean', 'C_std']].drop_duplicates()
    
    # 方法3: 使用pivot_table
    def pivot_table_method(df):
        return pd.pivot_table(df, values=['A', 'B', 'C'], index='D', 
                             aggfunc={'A': 'sum', 'B': 'mean', 'C': 'std'})
    
    agg_result1 = time_operation(standard_groupby, "标准GroupBy", df)
    agg_result2 = time_operation(transform_method, "Transform方法", df)
    agg_result3 = time_operation(pivot_table_method, "Pivot Table", df)
    
    print("\nGroupBy结果:")
    print(agg_result1)
    
    return df

perf_df = vectorization_optimization()

2.2 索引优化

python
print("\n=== 索引优化 ===")

def index_optimization():
    """索引优化演示"""
    
    # 创建大型数据集
    n = 500000
    np.random.seed(42)
    
    df = pd.DataFrame({
        'id': range(n),
        'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
        'value': np.random.randint(1, 1000, n),
        'date': pd.date_range('2020-01-01', periods=n, freq='1min')
    })
    
    print(f"数据集大小: {len(df):,} 行")
    
    # 性能测试函数
    def time_query(func, name, *args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{name}: {end_time - start_time:.4f} 秒 (结果: {len(result)} 行)")
        return result
    
    print("\n=== 查询性能对比 ===")
    
    # 无索引查询
    def query_without_index(df, category):
        return df[df['category'] == category]
    
    # 设置索引后查询
    df_indexed = df.set_index('category')
    
    def query_with_index(df, category):
        return df.loc[category]
    
    # 多级索引查询
    df_multi_indexed = df.set_index(['category', 'date'])
    
    def query_multi_index(df, category, start_date, end_date):
        return df.loc[(category, slice(start_date, end_date)), :]
    
    # 性能测试
    target_category = 'A'
    start_date = '2020-01-01'
    end_date = '2020-01-02'
    
    print("1. 单列查询性能:")
    result1 = time_query(query_without_index, "无索引查询", df, target_category)
    result2 = time_query(query_with_index, "有索引查询", df_indexed, target_category)
    
    print("\n2. 范围查询性能:")
    
    def range_query_without_index(df, start_date, end_date):
        return df[(df['date'] >= start_date) & (df['date'] <= end_date)]
    
    df_date_indexed = df.set_index('date')
    
    def range_query_with_index(df, start_date, end_date):
        return df.loc[start_date:end_date]
    
    result3 = time_query(range_query_without_index, "日期范围查询(无索引)", df, start_date, end_date)
    result4 = time_query(range_query_with_index, "日期范围查询(有索引)", df_date_indexed, start_date, end_date)
    
    print("\n3. 复合查询性能:")
    result5 = time_query(query_multi_index, "多级索引查询", df_multi_indexed, target_category, start_date, end_date)
    
    # 索引内存使用分析
    print("\n=== 索引内存使用分析 ===")
    
    def analyze_index_memory(df, name):
        total_memory = df.memory_usage(deep=True).sum() / 1024**2
        index_memory = df.index.memory_usage(deep=True) / 1024**2
        print(f"{name}:")
        print(f"  总内存: {total_memory:.2f} MB")
        print(f"  索引内存: {index_memory:.2f} MB ({index_memory/total_memory*100:.1f}%)")
    
    analyze_index_memory(df, "原始DataFrame")
    analyze_index_memory(df_indexed, "单级索引DataFrame")
    analyze_index_memory(df_multi_indexed, "多级索引DataFrame")
    
    return df_indexed

indexed_df = index_optimization()

2.3 数据类型和存储优化

python
print("\n=== 数据类型和存储优化 ===")

def dtype_storage_optimization():
    """数据类型和存储优化"""
    
    # 创建测试数据
    n = 100000
    np.random.seed(42)
    
    # 原始数据(未优化的数据类型)
    df_original = pd.DataFrame({
        'small_int': np.random.randint(0, 10, n),  # 可以用int8
        'medium_int': np.random.randint(0, 1000, n),  # 可以用int16
        'large_int': np.random.randint(0, 100000, n),  # 需要int32
        'float_data': np.random.random(n),  # 可以用float32
        'category_data': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),  # 可以用category
        'boolean_data': np.random.choice([True, False], n),  # 已经是最优
        'sparse_data': np.random.choice([0, 1, 2, np.nan], n, p=[0.7, 0.1, 0.1, 0.1])  # 可以用sparse
    })
    
    print("原始数据类型和内存使用:")
    print(df_original.dtypes)
    print(f"总内存使用: {df_original.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n")
    
    # 优化数据类型
    df_optimized = df_original.copy()
    
    # 1. 整数类型优化
    df_optimized['small_int'] = df_optimized['small_int'].astype('int8')
    df_optimized['medium_int'] = df_optimized['medium_int'].astype('int16')
    df_optimized['large_int'] = df_optimized['large_int'].astype('int32')
    
    # 2. 浮点数类型优化
    df_optimized['float_data'] = df_optimized['float_data'].astype('float32')
    
    # 3. 分类数据优化
    df_optimized['category_data'] = df_optimized['category_data'].astype('category')
    
    # 4. 稀疏数据优化
    df_optimized['sparse_data'] = pd.arrays.SparseArray(df_optimized['sparse_data'])
    
    print("优化后数据类型和内存使用:")
    print(df_optimized.dtypes)
    optimized_memory = df_optimized.memory_usage(deep=True).sum() / 1024**2
    original_memory = df_original.memory_usage(deep=True).sum() / 1024**2
    print(f"总内存使用: {optimized_memory:.2f} MB")
    print(f"内存节省: {((original_memory - optimized_memory) / original_memory * 100):.1f}%\n")
    
    # 存储格式优化
    print("=== 存储格式优化 ===")
    
    # 测试不同存储格式的性能
    test_file_base = 'test_data'
    
    # CSV格式
    start_time = time.time()
    df_optimized.to_csv(f'{test_file_base}.csv', index=False)
    csv_write_time = time.time() - start_time
    csv_size = os.path.getsize(f'{test_file_base}.csv') / 1024**2
    
    start_time = time.time()
    df_csv = pd.read_csv(f'{test_file_base}.csv')
    csv_read_time = time.time() - start_time
    
    # Parquet格式
    start_time = time.time()
    df_optimized.to_parquet(f'{test_file_base}.parquet', index=False)
    parquet_write_time = time.time() - start_time
    parquet_size = os.path.getsize(f'{test_file_base}.parquet') / 1024**2
    
    start_time = time.time()
    df_parquet = pd.read_parquet(f'{test_file_base}.parquet')
    parquet_read_time = time.time() - start_time
    
    # Pickle格式
    start_time = time.time()
    df_optimized.to_pickle(f'{test_file_base}.pkl')
    pickle_write_time = time.time() - start_time
    pickle_size = os.path.getsize(f'{test_file_base}.pkl') / 1024**2
    
    start_time = time.time()
    df_pickle = pd.read_pickle(f'{test_file_base}.pkl')
    pickle_read_time = time.time() - start_time
    
    # 结果对比
    print("存储格式性能对比:")
    print(f"{'格式':<10} {'写入时间':<10} {'读取时间':<10} {'文件大小':<10} {'压缩比':<10}")
    print("-" * 60)
    print(f"{'CSV':<10} {csv_write_time:<10.4f} {csv_read_time:<10.4f} {csv_size:<10.2f} {'1.00x':<10}")
    print(f"{'Parquet':<10} {parquet_write_time:<10.4f} {parquet_read_time:<10.4f} {parquet_size:<10.2f} {csv_size/parquet_size:<10.2f}")
    print(f"{'Pickle':<10} {pickle_write_time:<10.4f} {pickle_read_time:<10.4f} {pickle_size:<10.2f} {csv_size/pickle_size:<10.2f}")
    
    # 清理临时文件
    for ext in ['.csv', '.parquet', '.pkl']:
        file_path = f'{test_file_base}{ext}'
        if os.path.exists(file_path):
            os.remove(file_path)
    
    return df_optimized

optimized_storage_df = dtype_storage_optimization()

3. 并行处理和多线程

3.1 多进程处理

python
print("\n=== 并行处理优化 ===")

def parallel_processing_demo():
    """并行处理演示"""
    
    from multiprocessing import Pool, cpu_count
    from functools import partial
    
    # 创建大型数据集
    n = 1000000
    np.random.seed(42)
    
    df = pd.DataFrame({
        'A': np.random.randint(1, 100, n),
        'B': np.random.randint(1, 100, n),
        'C': np.random.random(n),
        'group': np.random.randint(0, 100, n)
    })
    
    print(f"数据集大小: {len(df):,} 行")
    print(f"CPU核心数: {cpu_count()}")
    
    # 定义计算密集型函数
    def complex_calculation(group_data):
        """复杂计算函数"""
        group_id, data = group_data
        
        # 模拟复杂计算
        result = {
            'group': group_id,
            'sum_A': data['A'].sum(),
            'mean_B': data['B'].mean(),
            'std_C': data['C'].std(),
            'correlation_AB': data['A'].corr(data['B']),
            'percentile_95_A': data['A'].quantile(0.95)
        }
        
        # 添加一些计算密集型操作
        for _ in range(1000):
            temp = data['A'].values ** 2 + data['B'].values ** 0.5
        
        return result
    
    # 准备分组数据
    grouped_data = [(name, group) for name, group in df.groupby('group')]
    
    print(f"分组数量: {len(grouped_data)}")
    
    # 方法1: 串行处理
    print("\n1. 串行处理:")
    start_time = time.time()
    serial_results = [complex_calculation(group_data) for group_data in grouped_data]
    serial_time = time.time() - start_time
    print(f"串行处理时间: {serial_time:.2f} 秒")
    
    # 方法2: 并行处理
    print("\n2. 并行处理:")
    start_time = time.time()
    
    with Pool(processes=cpu_count()) as pool:
        parallel_results = pool.map(complex_calculation, grouped_data)
    
    parallel_time = time.time() - start_time
    print(f"并行处理时间: {parallel_time:.2f} 秒")
    print(f"加速比: {serial_time / parallel_time:.2f}x")
    
    # 验证结果一致性
    serial_df = pd.DataFrame(serial_results)
    parallel_df = pd.DataFrame(parallel_results)
    
    print(f"\n结果一致性检查: {serial_df.equals(parallel_df)}")
    
    return parallel_df

parallel_result = parallel_processing_demo()

3.2 Dask并行计算

python
print("\n=== Dask并行计算 ===")

def dask_parallel_demo():
    """Dask并行计算演示"""
    
    try:
        import dask.dataframe as dd
        from dask.multiprocessing import get
        
        # 创建大型数据集
        n = 1000000
        np.random.seed(42)
        
        # 创建Pandas DataFrame
        df_pandas = pd.DataFrame({
            'A': np.random.randint(1, 1000, n),
            'B': np.random.random(n),
            'C': np.random.choice(['X', 'Y', 'Z'], n),
            'D': pd.date_range('2020-01-01', periods=n, freq='1min')
        })
        
        print(f"数据集大小: {len(df_pandas):,} 行")
        
        # 转换为Dask DataFrame
        df_dask = dd.from_pandas(df_pandas, npartitions=8)
        
        print(f"Dask分区数: {df_dask.npartitions}")
        
        # 性能对比测试
        print("\n=== 性能对比测试 ===")
        
        # 1. 聚合操作对比
        print("1. 聚合操作:")
        
        # Pandas聚合
        start_time = time.time()
        pandas_agg = df_pandas.groupby('C').agg({
            'A': ['sum', 'mean', 'std'],
            'B': ['min', 'max', 'median']
        })
        pandas_agg_time = time.time() - start_time
        
        # Dask聚合
        start_time = time.time()
        dask_agg = df_dask.groupby('C').agg({
            'A': ['sum', 'mean', 'std'],
            'B': ['min', 'max']
        }).compute()
        dask_agg_time = time.time() - start_time
        
        print(f"Pandas聚合时间: {pandas_agg_time:.2f} 秒")
        print(f"Dask聚合时间: {dask_agg_time:.2f} 秒")
        
        # 2. 复杂计算对比
        print("\n2. 复杂计算:")
        
        # Pandas复杂计算
        start_time = time.time()
        pandas_complex = df_pandas.assign(
            A_squared=df_pandas['A'] ** 2,
            B_log=np.log(df_pandas['B'] + 1),
            A_B_ratio=df_pandas['A'] / (df_pandas['B'] + 0.001)
        )
        pandas_complex_time = time.time() - start_time
        
        # Dask复杂计算
        start_time = time.time()
        dask_complex = df_dask.assign(
            A_squared=df_dask['A'] ** 2,
            B_log=dd.log(df_dask['B'] + 1),
            A_B_ratio=df_dask['A'] / (df_dask['B'] + 0.001)
        ).compute()
        dask_complex_time = time.time() - start_time
        
        print(f"Pandas复杂计算时间: {pandas_complex_time:.2f} 秒")
        print(f"Dask复杂计算时间: {dask_complex_time:.2f} 秒")
        
        # 3. 内存使用对比
        print("\n3. 内存使用分析:")
        pandas_memory = df_pandas.memory_usage(deep=True).sum() / 1024**2
        print(f"Pandas内存使用: {pandas_memory:.2f} MB")
        print(f"Dask延迟计算,内存使用更高效")
        
        return dask_agg
        
    except ImportError:
        print("Dask未安装,跳过Dask演示")
        print("安装命令: pip install dask[complete]")
        return None

dask_result = dask_parallel_demo()

4. 大数据处理策略

4.1 内存映射和懒加载

python
print("\n=== 大数据处理策略 ===")

def big_data_strategies():
    """大数据处理策略"""
    
    # 1. 数据采样策略
    print("1. 数据采样策略:")
    
    # 创建大型数据集
    n = 1000000
    np.random.seed(42)
    
    large_df = pd.DataFrame({
        'id': range(n),
        'value1': np.random.randint(1, 1000, n),
        'value2': np.random.random(n),
        'category': np.random.choice(['A', 'B', 'C', 'D'], n),
        'date': pd.date_range('2020-01-01', periods=n, freq='1min')
    })
    
    print(f"原始数据大小: {len(large_df):,} 行")
    
    # 随机采样
    sample_random = large_df.sample(n=10000, random_state=42)
    print(f"随机采样: {len(sample_random):,} 行")
    
    # 分层采样
    sample_stratified = large_df.groupby('category', group_keys=False).apply(
        lambda x: x.sample(min(len(x), 2500), random_state=42)
    )
    print(f"分层采样: {len(sample_stratified):,} 行")
    
    # 系统采样
    sample_systematic = large_df.iloc[::100]  # 每100行取1行
    print(f"系统采样: {len(sample_systematic):,} 行")
    
    # 2. 数据分区策略
    print("\n2. 数据分区策略:")
    
    def partition_data_by_date(df, partition_freq='M'):
        """按日期分区数据"""
        df['partition'] = df['date'].dt.to_period(partition_freq)
        partitions = {}
        
        for partition_name, partition_data in df.groupby('partition'):
            partitions[str(partition_name)] = partition_data.drop('partition', axis=1)
        
        return partitions
    
    # 按月分区
    monthly_partitions = partition_data_by_date(large_df, 'M')
    print(f"月度分区数量: {len(monthly_partitions)}")
    
    for partition_name, partition_df in list(monthly_partitions.items())[:3]:
        print(f"  {partition_name}: {len(partition_df):,} 行")
    
    # 3. 增量处理策略
    print("\n3. 增量处理策略:")
    
    def incremental_processing_demo():
        """增量处理演示"""
        
        # 模拟历史数据
        historical_data = large_df.iloc[:800000].copy()
        
        # 模拟新增数据
        new_data = large_df.iloc[800000:].copy()
        
        print(f"历史数据: {len(historical_data):,} 行")
        print(f"新增数据: {len(new_data):,} 行")
        
        # 历史数据聚合结果(假设已经计算过)
        historical_agg = historical_data.groupby('category').agg({
            'value1': 'sum',
            'value2': 'mean'
        })
        
        print("历史聚合结果:")
        print(historical_agg)
        
        # 新增数据聚合
        new_agg = new_data.groupby('category').agg({
            'value1': 'sum',
            'value2': 'mean'
        })
        
        # 增量更新聚合结果
        def incremental_update(historical_agg, new_agg, historical_count, new_count):
            """增量更新聚合结果"""
            updated_agg = historical_agg.copy()
            
            for category in new_agg.index:
                if category in updated_agg.index:
                    # 更新sum
                    updated_agg.loc[category, 'value1'] += new_agg.loc[category, 'value1']
                    
                    # 更新mean(加权平均)
                    hist_count = historical_count.get(category, 0)
                    new_count_cat = new_count.get(category, 0)
                    total_count = hist_count + new_count_cat
                    
                    if total_count > 0:
                        updated_agg.loc[category, 'value2'] = (
                            (updated_agg.loc[category, 'value2'] * hist_count + 
                             new_agg.loc[category, 'value2'] * new_count_cat) / total_count
                        )
                else:
                    # 新类别
                    updated_agg.loc[category] = new_agg.loc[category]
            
            return updated_agg
        
        # 计算记录数
        historical_count = historical_data['category'].value_counts().to_dict()
        new_count = new_data['category'].value_counts().to_dict()
        
        # 增量更新
        updated_result = incremental_update(historical_agg, new_agg, historical_count, new_count)
        
        # 验证结果
        full_agg = large_df.groupby('category').agg({
            'value1': 'sum',
            'value2': 'mean'
        })
        
        print("\n增量更新结果:")
        print(updated_result)
        
        print("\n完整计算结果:")
        print(full_agg)
        
        print(f"\n结果一致性: {np.allclose(updated_result.values, full_agg.values)}")
    
    incremental_processing_demo()
    
    return sample_random, monthly_partitions

sample_data, partitions = big_data_strategies()

4.2 性能监控和调优

python
print("\n=== 性能监控和调优 ===")

def performance_monitoring():
    """性能监控和调优"""
    
    import cProfile
    import pstats
    from io import StringIO
    
    # 性能分析装饰器
    def profile_performance(func):
        """性能分析装饰器"""
        def wrapper(*args, **kwargs):
            pr = cProfile.Profile()
            pr.enable()
            
            result = func(*args, **kwargs)
            
            pr.disable()
            s = StringIO()
            ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
            ps.print_stats(10)  # 显示前10个最耗时的函数
            
            print(f"\n{func.__name__} 性能分析:")
            print(s.getvalue())
            
            return result
        return wrapper
    
    # 创建测试数据
    n = 100000
    np.random.seed(42)
    
    df = pd.DataFrame({
        'A': np.random.randint(1, 1000, n),
        'B': np.random.random(n),
        'C': np.random.choice(['X', 'Y', 'Z'], n),
        'D': pd.date_range('2020-01-01', periods=n, freq='1min')
    })
    
    # 定义测试函数
    @profile_performance
    def inefficient_operation(df):
        """低效操作示例"""
        result = []
        for i in range(len(df)):
            if df.iloc[i]['A'] > 500:
                result.append(df.iloc[i]['A'] * df.iloc[i]['B'])
            else:
                result.append(0)
        return result
    
    @profile_performance
    def efficient_operation(df):
        """高效操作示例"""
        mask = df['A'] > 500
        result = np.where(mask, df['A'] * df['B'], 0)
        return result
    
    print("性能对比测试:")
    
    # 测试低效操作
    print("\n=== 低效操作 ===")
    start_time = time.time()
    result1 = inefficient_operation(df)
    inefficient_time = time.time() - start_time
    print(f"低效操作耗时: {inefficient_time:.4f} 秒")
    
    # 测试高效操作
    print("\n=== 高效操作 ===")
    start_time = time.time()
    result2 = efficient_operation(df)
    efficient_time = time.time() - start_time
    print(f"高效操作耗时: {efficient_time:.4f} 秒")
    
    print(f"\n性能提升: {inefficient_time / efficient_time:.1f}x")
    
    # 内存使用监控
    def memory_usage_monitor(func, *args, **kwargs):
        """内存使用监控"""
        import tracemalloc
        
        tracemalloc.start()
        
        # 执行函数
        result = func(*args, **kwargs)
        
        # 获取内存使用情况
        current, peak = tracemalloc.get_traced_memory()
        tracemalloc.stop()
        
        print(f"{func.__name__} 内存使用:")
        print(f"  当前内存: {current / 1024**2:.2f} MB")
        print(f"  峰值内存: {peak / 1024**2:.2f} MB")
        
        return result
    
    # 内存使用测试
    print("\n=== 内存使用监控 ===")
    
    def memory_intensive_operation(df):
        """内存密集型操作"""
        # 创建多个副本
        df_copies = [df.copy() for _ in range(5)]
        
        # 执行计算
        results = []
        for df_copy in df_copies:
            df_copy['new_col'] = df_copy['A'] * df_copy['B']
            results.append(df_copy.groupby('C')['new_col'].sum())
        
        return pd.concat(results, axis=1)
    
    def memory_efficient_operation(df):
        """内存高效操作"""
        # 避免创建多个副本
        df_temp = df.copy()
        df_temp['new_col'] = df_temp['A'] * df_temp['B']
        
        results = []
        for i in range(5):
            result = df_temp.groupby('C')['new_col'].sum()
            results.append(result)
        
        return pd.concat(results, axis=1)
    
    memory_usage_monitor(memory_intensive_operation, df)
    memory_usage_monitor(memory_efficient_operation, df)
    
    return df

monitored_df = performance_monitoring()

本章小结

本章全面介绍了 Pandas 的性能优化技巧:

核心内容回顾

  1. 内存优化:数据类型优化、分块处理、内存监控
  2. 计算优化:向量化操作、索引优化、存储格式选择
  3. 并行处理:多进程、Dask并行计算
  4. 大数据策略:采样、分区、增量处理
  5. 性能监控:性能分析、内存监控、调优技巧

关键优化原则

  • 优先使用向量化操作替代循环
  • 合理选择数据类型减少内存使用
  • 建立适当索引提升查询性能
  • 分块处理大文件避免内存溢出
  • 使用并行计算充分利用多核CPU

性能优化检查清单

内存优化

  • [ ] 使用合适的数据类型(int8, int16, category等)
  • [ ] 利用稀疏数组处理稀疏数据
  • [ ] 分块读取大文件
  • [ ] 及时删除不需要的变量

计算优化

  • [ ] 使用向量化操作替代apply
  • [ ] 建立合适的索引
  • [ ] 选择高效的存储格式(Parquet, HDF5)
  • [ ] 避免不必要的数据复制

并行优化

  • [ ] 使用多进程处理CPU密集型任务
  • [ ] 考虑使用Dask处理超大数据集
  • [ ] 合理设置并行度
  • [ ] 注意并行开销

实际应用建议

  1. 开发阶段:使用小数据集快速原型开发
  2. 测试阶段:使用采样数据验证算法正确性
  3. 生产阶段:应用所有优化技巧处理完整数据
  4. 监控阶段:持续监控性能指标,及时调优

常见性能陷阱

  • 过度使用apply函数
  • 频繁的数据类型转换
  • 不必要的数据复制
  • 缺乏适当的索引
  • 忽视内存使用监控

掌握这些性能优化技巧,能够帮助你:

  • 处理更大规模的数据集
  • 显著提升数据处理速度
  • 优化内存使用效率
  • 构建可扩展的数据处理系统

练习题

  1. 优化一个包含1000万行数据的DataFrame的内存使用
  2. 实现一个高性能的数据聚合系统
  3. 设计一个大文件的分块处理流程
  4. 开发一个并行数据处理框架
  5. 构建一个性能监控和调优工具

下一章我们将整理 Pandas 的学习资源,为你的进一步学习提供指导。