Pandas 性能优化
概述
在处理大规模数据时,Pandas 的性能优化变得至关重要。本章将深入探讨各种优化技巧,包括内存优化、计算优化、数据类型优化、并行处理等,帮助你构建高效的数据处理流程。
1. 内存优化
1.1 数据类型优化
python
import pandas as pd
import numpy as np
import time
import psutil
import os
from memory_profiler import profile
print("=== 数据类型优化 ===")
def memory_usage_analysis():
"""内存使用分析"""
# 创建示例数据
n = 1000000
np.random.seed(42)
# 原始数据(未优化)
df_original = pd.DataFrame({
'int_col': np.random.randint(0, 100, n),
'float_col': np.random.random(n),
'category_col': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
'bool_col': np.random.choice([True, False], n),
'date_col': pd.date_range('2020-01-01', periods=n, freq='1min')
})
print("原始数据内存使用:")
print(df_original.info(memory_usage='deep'))
original_memory = df_original.memory_usage(deep=True).sum() / 1024**2
print(f"总内存使用: {original_memory:.2f} MB\n")
# 优化后的数据
df_optimized = df_original.copy()
# 1. 整数类型优化
df_optimized['int_col'] = pd.to_numeric(df_optimized['int_col'], downcast='integer')
# 2. 浮点数类型优化
df_optimized['float_col'] = pd.to_numeric(df_optimized['float_col'], downcast='float')
# 3. 分类数据优化
df_optimized['category_col'] = df_optimized['category_col'].astype('category')
# 4. 布尔类型已经是最优的
print("优化后数据内存使用:")
print(df_optimized.info(memory_usage='deep'))
optimized_memory = df_optimized.memory_usage(deep=True).sum() / 1024**2
print(f"总内存使用: {optimized_memory:.2f} MB")
print(f"内存节省: {((original_memory - optimized_memory) / original_memory * 100):.1f}%\n")
return df_original, df_optimized
df_orig, df_opt = memory_usage_analysis()1.2 分块处理大文件
python
print("=== 分块处理大文件 ===")
def chunked_processing_demo():
"""分块处理演示"""
# 创建大型CSV文件用于演示
large_file = 'large_dataset.csv'
# 生成大型数据集
print("生成大型数据集...")
chunk_size = 50000
total_rows = 500000
# 分批写入大文件
for i in range(0, total_rows, chunk_size):
chunk_data = pd.DataFrame({
'id': range(i, min(i + chunk_size, total_rows)),
'value1': np.random.randint(1, 1000, min(chunk_size, total_rows - i)),
'value2': np.random.random(min(chunk_size, total_rows - i)),
'category': np.random.choice(['A', 'B', 'C', 'D'], min(chunk_size, total_rows - i)),
'date': pd.date_range('2020-01-01', periods=min(chunk_size, total_rows - i), freq='1H')
})
# 第一次写入包含header,后续追加
chunk_data.to_csv(large_file, mode='w' if i == 0 else 'a',
header=(i == 0), index=False)
print(f"生成文件大小: {os.path.getsize(large_file) / 1024**2:.2f} MB")
# 方法1: 一次性读取(内存密集)
print("\n方法1: 一次性读取")
start_time = time.time()
try:
df_full = pd.read_csv(large_file)
full_load_time = time.time() - start_time
print(f"读取时间: {full_load_time:.2f} 秒")
print(f"内存使用: {df_full.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# 执行聚合操作
result_full = df_full.groupby('category')['value1'].sum()
print(f"聚合结果: {result_full.to_dict()}")
except MemoryError:
print("内存不足,无法一次性读取")
# 方法2: 分块读取(内存友好)
print("\n方法2: 分块读取")
start_time = time.time()
chunk_results = []
chunk_reader = pd.read_csv(large_file, chunksize=10000)
for i, chunk in enumerate(chunk_reader):
# 对每个块进行处理
chunk_result = chunk.groupby('category')['value1'].sum()
chunk_results.append(chunk_result)
if i % 10 == 0:
print(f"处理了 {(i+1)*10000} 行")
# 合并所有块的结果
final_result = pd.concat(chunk_results).groupby(level=0).sum()
chunk_load_time = time.time() - start_time
print(f"分块处理时间: {chunk_load_time:.2f} 秒")
print(f"聚合结果: {final_result.to_dict()}")
# 清理临时文件
if os.path.exists(large_file):
os.remove(large_file)
return final_result
chunk_result = chunked_processing_demo()1.3 内存监控和分析
python
print("\n=== 内存监控和分析 ===")
def memory_monitoring():
"""内存监控工具"""
def get_memory_usage():
"""获取当前内存使用情况"""
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return memory_info.rss / 1024**2 # MB
def memory_profiler_demo():
"""内存分析演示"""
print(f"初始内存使用: {get_memory_usage():.2f} MB")
# 创建大型DataFrame
n = 100000
df = pd.DataFrame({
'A': np.random.randint(0, 1000, n),
'B': np.random.random(n),
'C': ['category_' + str(i % 100) for i in range(n)]
})
print(f"创建DataFrame后内存使用: {get_memory_usage():.2f} MB")
# 数据类型优化
df['A'] = pd.to_numeric(df['A'], downcast='integer')
df['B'] = pd.to_numeric(df['B'], downcast='float')
df['C'] = df['C'].astype('category')
print(f"优化后内存使用: {get_memory_usage():.2f} MB")
# 删除DataFrame释放内存
del df
print(f"删除DataFrame后内存使用: {get_memory_usage():.2f} MB")
return True
# 内存使用分析工具
def analyze_dataframe_memory(df, name="DataFrame"):
"""分析DataFrame内存使用"""
print(f"\n{name} 内存分析:")
print("-" * 40)
memory_usage = df.memory_usage(deep=True)
total_memory = memory_usage.sum() / 1024**2
print(f"总内存使用: {total_memory:.2f} MB")
print(f"行数: {len(df):,}")
print(f"列数: {len(df.columns)}")
print(f"平均每行内存: {total_memory * 1024 / len(df):.2f} KB")
print("\n各列内存使用:")
for col in df.columns:
col_memory = df[col].memory_usage(deep=True) / 1024**2
print(f" {col}: {col_memory:.2f} MB ({col_memory/total_memory*100:.1f}%)")
print("\n数据类型:")
for col in df.columns:
print(f" {col}: {df[col].dtype}")
# 演示内存分析
memory_profiler_demo()
# 创建示例数据进行分析
sample_df = pd.DataFrame({
'integers': np.random.randint(0, 100, 10000),
'floats': np.random.random(10000),
'categories': np.random.choice(['A', 'B', 'C'], 10000),
'strings': ['string_' + str(i) for i in range(10000)],
'dates': pd.date_range('2020-01-01', periods=10000, freq='1H')
})
analyze_dataframe_memory(sample_df, "原始数据")
# 优化数据类型
sample_df_opt = sample_df.copy()
sample_df_opt['integers'] = pd.to_numeric(sample_df_opt['integers'], downcast='integer')
sample_df_opt['floats'] = pd.to_numeric(sample_df_opt['floats'], downcast='float')
sample_df_opt['categories'] = sample_df_opt['categories'].astype('category')
analyze_dataframe_memory(sample_df_opt, "优化后数据")
return sample_df_opt
optimized_df = memory_monitoring()2. 计算性能优化
2.1 向量化操作优化
python
print("\n=== 计算性能优化 ===")
def vectorization_optimization():
"""向量化操作优化"""
# 创建测试数据
n = 1000000
np.random.seed(42)
df = pd.DataFrame({
'A': np.random.randint(1, 100, n),
'B': np.random.randint(1, 100, n),
'C': np.random.random(n),
'D': np.random.choice(['X', 'Y', 'Z'], n)
})
print(f"测试数据: {len(df):,} 行")
# 性能测试函数
def time_operation(func, name, *args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{name}: {end_time - start_time:.4f} 秒")
return result
print("\n=== 条件计算性能对比 ===")
# 方法1: 使用apply(慢)
def apply_method(df):
return df.apply(lambda row: row['A'] * row['C'] if row['A'] > row['B'] else row['B'] * row['C'], axis=1)
# 方法2: 使用numpy.where(快)
def numpy_where_method(df):
return np.where(df['A'] > df['B'], df['A'] * df['C'], df['B'] * df['C'])
# 方法3: 使用pandas.where(快)
def pandas_where_method(df):
return (df['A'] * df['C']).where(df['A'] > df['B'], df['B'] * df['C'])
# 方法4: 使用布尔索引(快)
def boolean_indexing_method(df):
result = pd.Series(index=df.index, dtype=float)
mask = df['A'] > df['B']
result[mask] = df.loc[mask, 'A'] * df.loc[mask, 'C']
result[~mask] = df.loc[~mask, 'B'] * df.loc[~mask, 'C']
return result
# 性能测试
result1 = time_operation(apply_method, "Apply方法", df)
result2 = time_operation(numpy_where_method, "NumPy where", df)
result3 = time_operation(pandas_where_method, "Pandas where", df)
result4 = time_operation(boolean_indexing_method, "布尔索引", df)
print("\n=== 聚合操作性能对比 ===")
# 方法1: 标准groupby
def standard_groupby(df):
return df.groupby('D').agg({'A': 'sum', 'B': 'mean', 'C': 'std'})
# 方法2: 使用transform进行组内计算
def transform_method(df):
df_copy = df.copy()
df_copy['A_sum'] = df_copy.groupby('D')['A'].transform('sum')
df_copy['B_mean'] = df_copy.groupby('D')['B'].transform('mean')
df_copy['C_std'] = df_copy.groupby('D')['C'].transform('std')
return df_copy[['D', 'A_sum', 'B_mean', 'C_std']].drop_duplicates()
# 方法3: 使用pivot_table
def pivot_table_method(df):
return pd.pivot_table(df, values=['A', 'B', 'C'], index='D',
aggfunc={'A': 'sum', 'B': 'mean', 'C': 'std'})
agg_result1 = time_operation(standard_groupby, "标准GroupBy", df)
agg_result2 = time_operation(transform_method, "Transform方法", df)
agg_result3 = time_operation(pivot_table_method, "Pivot Table", df)
print("\nGroupBy结果:")
print(agg_result1)
return df
perf_df = vectorization_optimization()2.2 索引优化
python
print("\n=== 索引优化 ===")
def index_optimization():
"""索引优化演示"""
# 创建大型数据集
n = 500000
np.random.seed(42)
df = pd.DataFrame({
'id': range(n),
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
'value': np.random.randint(1, 1000, n),
'date': pd.date_range('2020-01-01', periods=n, freq='1min')
})
print(f"数据集大小: {len(df):,} 行")
# 性能测试函数
def time_query(func, name, *args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{name}: {end_time - start_time:.4f} 秒 (结果: {len(result)} 行)")
return result
print("\n=== 查询性能对比 ===")
# 无索引查询
def query_without_index(df, category):
return df[df['category'] == category]
# 设置索引后查询
df_indexed = df.set_index('category')
def query_with_index(df, category):
return df.loc[category]
# 多级索引查询
df_multi_indexed = df.set_index(['category', 'date'])
def query_multi_index(df, category, start_date, end_date):
return df.loc[(category, slice(start_date, end_date)), :]
# 性能测试
target_category = 'A'
start_date = '2020-01-01'
end_date = '2020-01-02'
print("1. 单列查询性能:")
result1 = time_query(query_without_index, "无索引查询", df, target_category)
result2 = time_query(query_with_index, "有索引查询", df_indexed, target_category)
print("\n2. 范围查询性能:")
def range_query_without_index(df, start_date, end_date):
return df[(df['date'] >= start_date) & (df['date'] <= end_date)]
df_date_indexed = df.set_index('date')
def range_query_with_index(df, start_date, end_date):
return df.loc[start_date:end_date]
result3 = time_query(range_query_without_index, "日期范围查询(无索引)", df, start_date, end_date)
result4 = time_query(range_query_with_index, "日期范围查询(有索引)", df_date_indexed, start_date, end_date)
print("\n3. 复合查询性能:")
result5 = time_query(query_multi_index, "多级索引查询", df_multi_indexed, target_category, start_date, end_date)
# 索引内存使用分析
print("\n=== 索引内存使用分析 ===")
def analyze_index_memory(df, name):
total_memory = df.memory_usage(deep=True).sum() / 1024**2
index_memory = df.index.memory_usage(deep=True) / 1024**2
print(f"{name}:")
print(f" 总内存: {total_memory:.2f} MB")
print(f" 索引内存: {index_memory:.2f} MB ({index_memory/total_memory*100:.1f}%)")
analyze_index_memory(df, "原始DataFrame")
analyze_index_memory(df_indexed, "单级索引DataFrame")
analyze_index_memory(df_multi_indexed, "多级索引DataFrame")
return df_indexed
indexed_df = index_optimization()2.3 数据类型和存储优化
python
print("\n=== 数据类型和存储优化 ===")
def dtype_storage_optimization():
"""数据类型和存储优化"""
# 创建测试数据
n = 100000
np.random.seed(42)
# 原始数据(未优化的数据类型)
df_original = pd.DataFrame({
'small_int': np.random.randint(0, 10, n), # 可以用int8
'medium_int': np.random.randint(0, 1000, n), # 可以用int16
'large_int': np.random.randint(0, 100000, n), # 需要int32
'float_data': np.random.random(n), # 可以用float32
'category_data': np.random.choice(['A', 'B', 'C', 'D', 'E'], n), # 可以用category
'boolean_data': np.random.choice([True, False], n), # 已经是最优
'sparse_data': np.random.choice([0, 1, 2, np.nan], n, p=[0.7, 0.1, 0.1, 0.1]) # 可以用sparse
})
print("原始数据类型和内存使用:")
print(df_original.dtypes)
print(f"总内存使用: {df_original.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n")
# 优化数据类型
df_optimized = df_original.copy()
# 1. 整数类型优化
df_optimized['small_int'] = df_optimized['small_int'].astype('int8')
df_optimized['medium_int'] = df_optimized['medium_int'].astype('int16')
df_optimized['large_int'] = df_optimized['large_int'].astype('int32')
# 2. 浮点数类型优化
df_optimized['float_data'] = df_optimized['float_data'].astype('float32')
# 3. 分类数据优化
df_optimized['category_data'] = df_optimized['category_data'].astype('category')
# 4. 稀疏数据优化
df_optimized['sparse_data'] = pd.arrays.SparseArray(df_optimized['sparse_data'])
print("优化后数据类型和内存使用:")
print(df_optimized.dtypes)
optimized_memory = df_optimized.memory_usage(deep=True).sum() / 1024**2
original_memory = df_original.memory_usage(deep=True).sum() / 1024**2
print(f"总内存使用: {optimized_memory:.2f} MB")
print(f"内存节省: {((original_memory - optimized_memory) / original_memory * 100):.1f}%\n")
# 存储格式优化
print("=== 存储格式优化 ===")
# 测试不同存储格式的性能
test_file_base = 'test_data'
# CSV格式
start_time = time.time()
df_optimized.to_csv(f'{test_file_base}.csv', index=False)
csv_write_time = time.time() - start_time
csv_size = os.path.getsize(f'{test_file_base}.csv') / 1024**2
start_time = time.time()
df_csv = pd.read_csv(f'{test_file_base}.csv')
csv_read_time = time.time() - start_time
# Parquet格式
start_time = time.time()
df_optimized.to_parquet(f'{test_file_base}.parquet', index=False)
parquet_write_time = time.time() - start_time
parquet_size = os.path.getsize(f'{test_file_base}.parquet') / 1024**2
start_time = time.time()
df_parquet = pd.read_parquet(f'{test_file_base}.parquet')
parquet_read_time = time.time() - start_time
# Pickle格式
start_time = time.time()
df_optimized.to_pickle(f'{test_file_base}.pkl')
pickle_write_time = time.time() - start_time
pickle_size = os.path.getsize(f'{test_file_base}.pkl') / 1024**2
start_time = time.time()
df_pickle = pd.read_pickle(f'{test_file_base}.pkl')
pickle_read_time = time.time() - start_time
# 结果对比
print("存储格式性能对比:")
print(f"{'格式':<10} {'写入时间':<10} {'读取时间':<10} {'文件大小':<10} {'压缩比':<10}")
print("-" * 60)
print(f"{'CSV':<10} {csv_write_time:<10.4f} {csv_read_time:<10.4f} {csv_size:<10.2f} {'1.00x':<10}")
print(f"{'Parquet':<10} {parquet_write_time:<10.4f} {parquet_read_time:<10.4f} {parquet_size:<10.2f} {csv_size/parquet_size:<10.2f}")
print(f"{'Pickle':<10} {pickle_write_time:<10.4f} {pickle_read_time:<10.4f} {pickle_size:<10.2f} {csv_size/pickle_size:<10.2f}")
# 清理临时文件
for ext in ['.csv', '.parquet', '.pkl']:
file_path = f'{test_file_base}{ext}'
if os.path.exists(file_path):
os.remove(file_path)
return df_optimized
optimized_storage_df = dtype_storage_optimization()3. 并行处理和多线程
3.1 多进程处理
python
print("\n=== 并行处理优化 ===")
def parallel_processing_demo():
"""并行处理演示"""
from multiprocessing import Pool, cpu_count
from functools import partial
# 创建大型数据集
n = 1000000
np.random.seed(42)
df = pd.DataFrame({
'A': np.random.randint(1, 100, n),
'B': np.random.randint(1, 100, n),
'C': np.random.random(n),
'group': np.random.randint(0, 100, n)
})
print(f"数据集大小: {len(df):,} 行")
print(f"CPU核心数: {cpu_count()}")
# 定义计算密集型函数
def complex_calculation(group_data):
"""复杂计算函数"""
group_id, data = group_data
# 模拟复杂计算
result = {
'group': group_id,
'sum_A': data['A'].sum(),
'mean_B': data['B'].mean(),
'std_C': data['C'].std(),
'correlation_AB': data['A'].corr(data['B']),
'percentile_95_A': data['A'].quantile(0.95)
}
# 添加一些计算密集型操作
for _ in range(1000):
temp = data['A'].values ** 2 + data['B'].values ** 0.5
return result
# 准备分组数据
grouped_data = [(name, group) for name, group in df.groupby('group')]
print(f"分组数量: {len(grouped_data)}")
# 方法1: 串行处理
print("\n1. 串行处理:")
start_time = time.time()
serial_results = [complex_calculation(group_data) for group_data in grouped_data]
serial_time = time.time() - start_time
print(f"串行处理时间: {serial_time:.2f} 秒")
# 方法2: 并行处理
print("\n2. 并行处理:")
start_time = time.time()
with Pool(processes=cpu_count()) as pool:
parallel_results = pool.map(complex_calculation, grouped_data)
parallel_time = time.time() - start_time
print(f"并行处理时间: {parallel_time:.2f} 秒")
print(f"加速比: {serial_time / parallel_time:.2f}x")
# 验证结果一致性
serial_df = pd.DataFrame(serial_results)
parallel_df = pd.DataFrame(parallel_results)
print(f"\n结果一致性检查: {serial_df.equals(parallel_df)}")
return parallel_df
parallel_result = parallel_processing_demo()3.2 Dask并行计算
python
print("\n=== Dask并行计算 ===")
def dask_parallel_demo():
"""Dask并行计算演示"""
try:
import dask.dataframe as dd
from dask.multiprocessing import get
# 创建大型数据集
n = 1000000
np.random.seed(42)
# 创建Pandas DataFrame
df_pandas = pd.DataFrame({
'A': np.random.randint(1, 1000, n),
'B': np.random.random(n),
'C': np.random.choice(['X', 'Y', 'Z'], n),
'D': pd.date_range('2020-01-01', periods=n, freq='1min')
})
print(f"数据集大小: {len(df_pandas):,} 行")
# 转换为Dask DataFrame
df_dask = dd.from_pandas(df_pandas, npartitions=8)
print(f"Dask分区数: {df_dask.npartitions}")
# 性能对比测试
print("\n=== 性能对比测试 ===")
# 1. 聚合操作对比
print("1. 聚合操作:")
# Pandas聚合
start_time = time.time()
pandas_agg = df_pandas.groupby('C').agg({
'A': ['sum', 'mean', 'std'],
'B': ['min', 'max', 'median']
})
pandas_agg_time = time.time() - start_time
# Dask聚合
start_time = time.time()
dask_agg = df_dask.groupby('C').agg({
'A': ['sum', 'mean', 'std'],
'B': ['min', 'max']
}).compute()
dask_agg_time = time.time() - start_time
print(f"Pandas聚合时间: {pandas_agg_time:.2f} 秒")
print(f"Dask聚合时间: {dask_agg_time:.2f} 秒")
# 2. 复杂计算对比
print("\n2. 复杂计算:")
# Pandas复杂计算
start_time = time.time()
pandas_complex = df_pandas.assign(
A_squared=df_pandas['A'] ** 2,
B_log=np.log(df_pandas['B'] + 1),
A_B_ratio=df_pandas['A'] / (df_pandas['B'] + 0.001)
)
pandas_complex_time = time.time() - start_time
# Dask复杂计算
start_time = time.time()
dask_complex = df_dask.assign(
A_squared=df_dask['A'] ** 2,
B_log=dd.log(df_dask['B'] + 1),
A_B_ratio=df_dask['A'] / (df_dask['B'] + 0.001)
).compute()
dask_complex_time = time.time() - start_time
print(f"Pandas复杂计算时间: {pandas_complex_time:.2f} 秒")
print(f"Dask复杂计算时间: {dask_complex_time:.2f} 秒")
# 3. 内存使用对比
print("\n3. 内存使用分析:")
pandas_memory = df_pandas.memory_usage(deep=True).sum() / 1024**2
print(f"Pandas内存使用: {pandas_memory:.2f} MB")
print(f"Dask延迟计算,内存使用更高效")
return dask_agg
except ImportError:
print("Dask未安装,跳过Dask演示")
print("安装命令: pip install dask[complete]")
return None
dask_result = dask_parallel_demo()4. 大数据处理策略
4.1 内存映射和懒加载
python
print("\n=== 大数据处理策略 ===")
def big_data_strategies():
"""大数据处理策略"""
# 1. 数据采样策略
print("1. 数据采样策略:")
# 创建大型数据集
n = 1000000
np.random.seed(42)
large_df = pd.DataFrame({
'id': range(n),
'value1': np.random.randint(1, 1000, n),
'value2': np.random.random(n),
'category': np.random.choice(['A', 'B', 'C', 'D'], n),
'date': pd.date_range('2020-01-01', periods=n, freq='1min')
})
print(f"原始数据大小: {len(large_df):,} 行")
# 随机采样
sample_random = large_df.sample(n=10000, random_state=42)
print(f"随机采样: {len(sample_random):,} 行")
# 分层采样
sample_stratified = large_df.groupby('category', group_keys=False).apply(
lambda x: x.sample(min(len(x), 2500), random_state=42)
)
print(f"分层采样: {len(sample_stratified):,} 行")
# 系统采样
sample_systematic = large_df.iloc[::100] # 每100行取1行
print(f"系统采样: {len(sample_systematic):,} 行")
# 2. 数据分区策略
print("\n2. 数据分区策略:")
def partition_data_by_date(df, partition_freq='M'):
"""按日期分区数据"""
df['partition'] = df['date'].dt.to_period(partition_freq)
partitions = {}
for partition_name, partition_data in df.groupby('partition'):
partitions[str(partition_name)] = partition_data.drop('partition', axis=1)
return partitions
# 按月分区
monthly_partitions = partition_data_by_date(large_df, 'M')
print(f"月度分区数量: {len(monthly_partitions)}")
for partition_name, partition_df in list(monthly_partitions.items())[:3]:
print(f" {partition_name}: {len(partition_df):,} 行")
# 3. 增量处理策略
print("\n3. 增量处理策略:")
def incremental_processing_demo():
"""增量处理演示"""
# 模拟历史数据
historical_data = large_df.iloc[:800000].copy()
# 模拟新增数据
new_data = large_df.iloc[800000:].copy()
print(f"历史数据: {len(historical_data):,} 行")
print(f"新增数据: {len(new_data):,} 行")
# 历史数据聚合结果(假设已经计算过)
historical_agg = historical_data.groupby('category').agg({
'value1': 'sum',
'value2': 'mean'
})
print("历史聚合结果:")
print(historical_agg)
# 新增数据聚合
new_agg = new_data.groupby('category').agg({
'value1': 'sum',
'value2': 'mean'
})
# 增量更新聚合结果
def incremental_update(historical_agg, new_agg, historical_count, new_count):
"""增量更新聚合结果"""
updated_agg = historical_agg.copy()
for category in new_agg.index:
if category in updated_agg.index:
# 更新sum
updated_agg.loc[category, 'value1'] += new_agg.loc[category, 'value1']
# 更新mean(加权平均)
hist_count = historical_count.get(category, 0)
new_count_cat = new_count.get(category, 0)
total_count = hist_count + new_count_cat
if total_count > 0:
updated_agg.loc[category, 'value2'] = (
(updated_agg.loc[category, 'value2'] * hist_count +
new_agg.loc[category, 'value2'] * new_count_cat) / total_count
)
else:
# 新类别
updated_agg.loc[category] = new_agg.loc[category]
return updated_agg
# 计算记录数
historical_count = historical_data['category'].value_counts().to_dict()
new_count = new_data['category'].value_counts().to_dict()
# 增量更新
updated_result = incremental_update(historical_agg, new_agg, historical_count, new_count)
# 验证结果
full_agg = large_df.groupby('category').agg({
'value1': 'sum',
'value2': 'mean'
})
print("\n增量更新结果:")
print(updated_result)
print("\n完整计算结果:")
print(full_agg)
print(f"\n结果一致性: {np.allclose(updated_result.values, full_agg.values)}")
incremental_processing_demo()
return sample_random, monthly_partitions
sample_data, partitions = big_data_strategies()4.2 性能监控和调优
python
print("\n=== 性能监控和调优 ===")
def performance_monitoring():
"""性能监控和调优"""
import cProfile
import pstats
from io import StringIO
# 性能分析装饰器
def profile_performance(func):
"""性能分析装饰器"""
def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(10) # 显示前10个最耗时的函数
print(f"\n{func.__name__} 性能分析:")
print(s.getvalue())
return result
return wrapper
# 创建测试数据
n = 100000
np.random.seed(42)
df = pd.DataFrame({
'A': np.random.randint(1, 1000, n),
'B': np.random.random(n),
'C': np.random.choice(['X', 'Y', 'Z'], n),
'D': pd.date_range('2020-01-01', periods=n, freq='1min')
})
# 定义测试函数
@profile_performance
def inefficient_operation(df):
"""低效操作示例"""
result = []
for i in range(len(df)):
if df.iloc[i]['A'] > 500:
result.append(df.iloc[i]['A'] * df.iloc[i]['B'])
else:
result.append(0)
return result
@profile_performance
def efficient_operation(df):
"""高效操作示例"""
mask = df['A'] > 500
result = np.where(mask, df['A'] * df['B'], 0)
return result
print("性能对比测试:")
# 测试低效操作
print("\n=== 低效操作 ===")
start_time = time.time()
result1 = inefficient_operation(df)
inefficient_time = time.time() - start_time
print(f"低效操作耗时: {inefficient_time:.4f} 秒")
# 测试高效操作
print("\n=== 高效操作 ===")
start_time = time.time()
result2 = efficient_operation(df)
efficient_time = time.time() - start_time
print(f"高效操作耗时: {efficient_time:.4f} 秒")
print(f"\n性能提升: {inefficient_time / efficient_time:.1f}x")
# 内存使用监控
def memory_usage_monitor(func, *args, **kwargs):
"""内存使用监控"""
import tracemalloc
tracemalloc.start()
# 执行函数
result = func(*args, **kwargs)
# 获取内存使用情况
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"{func.__name__} 内存使用:")
print(f" 当前内存: {current / 1024**2:.2f} MB")
print(f" 峰值内存: {peak / 1024**2:.2f} MB")
return result
# 内存使用测试
print("\n=== 内存使用监控 ===")
def memory_intensive_operation(df):
"""内存密集型操作"""
# 创建多个副本
df_copies = [df.copy() for _ in range(5)]
# 执行计算
results = []
for df_copy in df_copies:
df_copy['new_col'] = df_copy['A'] * df_copy['B']
results.append(df_copy.groupby('C')['new_col'].sum())
return pd.concat(results, axis=1)
def memory_efficient_operation(df):
"""内存高效操作"""
# 避免创建多个副本
df_temp = df.copy()
df_temp['new_col'] = df_temp['A'] * df_temp['B']
results = []
for i in range(5):
result = df_temp.groupby('C')['new_col'].sum()
results.append(result)
return pd.concat(results, axis=1)
memory_usage_monitor(memory_intensive_operation, df)
memory_usage_monitor(memory_efficient_operation, df)
return df
monitored_df = performance_monitoring()本章小结
本章全面介绍了 Pandas 的性能优化技巧:
核心内容回顾
- 内存优化:数据类型优化、分块处理、内存监控
- 计算优化:向量化操作、索引优化、存储格式选择
- 并行处理:多进程、Dask并行计算
- 大数据策略:采样、分区、增量处理
- 性能监控:性能分析、内存监控、调优技巧
关键优化原则
- 优先使用向量化操作替代循环
- 合理选择数据类型减少内存使用
- 建立适当索引提升查询性能
- 分块处理大文件避免内存溢出
- 使用并行计算充分利用多核CPU
性能优化检查清单
内存优化
- [ ] 使用合适的数据类型(int8, int16, category等)
- [ ] 利用稀疏数组处理稀疏数据
- [ ] 分块读取大文件
- [ ] 及时删除不需要的变量
计算优化
- [ ] 使用向量化操作替代apply
- [ ] 建立合适的索引
- [ ] 选择高效的存储格式(Parquet, HDF5)
- [ ] 避免不必要的数据复制
并行优化
- [ ] 使用多进程处理CPU密集型任务
- [ ] 考虑使用Dask处理超大数据集
- [ ] 合理设置并行度
- [ ] 注意并行开销
实际应用建议
- 开发阶段:使用小数据集快速原型开发
- 测试阶段:使用采样数据验证算法正确性
- 生产阶段:应用所有优化技巧处理完整数据
- 监控阶段:持续监控性能指标,及时调优
常见性能陷阱
- 过度使用apply函数
- 频繁的数据类型转换
- 不必要的数据复制
- 缺乏适当的索引
- 忽视内存使用监控
掌握这些性能优化技巧,能够帮助你:
- 处理更大规模的数据集
- 显著提升数据处理速度
- 优化内存使用效率
- 构建可扩展的数据处理系统
练习题
- 优化一个包含1000万行数据的DataFrame的内存使用
- 实现一个高性能的数据聚合系统
- 设计一个大文件的分块处理流程
- 开发一个并行数据处理框架
- 构建一个性能监控和调优工具
下一章我们将整理 Pandas 的学习资源,为你的进一步学习提供指导。