Pandas Performance Optimization
Overview
When processing large-scale data, Pandas performance optimization becomes crucial. This chapter explores various optimization techniques in depth, including memory optimization, computational optimization, data type optimization, parallel processing, and more, helping you build efficient data processing workflows.
1. Memory Optimization
1.1 Data Type Optimization
python
import pandas as pd
import numpy as np
import time
import psutil
import os
from memory_profiler import profile
print("=== Data Type Optimization ===")
def memory_usage_analysis():
"""Memory usage analysis"""
# Create sample data
n = 1000000
np.random.seed(42)
# Original data (not optimized)
df_original = pd.DataFrame({
'int_col': np.random.randint(0, 100, n),
'float_col': np.random.random(n),
'category_col': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
'bool_col': np.random.choice([True, False], n),
'date_col': pd.date_range('2020-01-01', periods=n, freq='1min')
})
print("Original data memory usage:")
print(df_original.info(memory_usage='deep'))
original_memory = df_original.memory_usage(deep=True).sum() / 1024**2
print(f"Total memory usage: {original_memory:.2f} MB\n")
# Optimized data
df_optimized = df_original.copy()
# 1. Integer type optimization
df_optimized['int_col'] = pd.to_numeric(df_optimized['int_col'], downcast='integer')
# 2. Float type optimization
df_optimized['float_col'] = pd.to_numeric(df_optimized['float_col'], downcast='float')
# 3. Category data optimization
df_optimized['category_col'] = df_optimized['category_col'].astype('category')
# 4. Boolean type is already optimal
print("Optimized data memory usage:")
print(df_optimized.info(memory_usage='deep'))
optimized_memory = df_optimized.memory_usage(deep=True).sum() / 1024**2
print(f"Total memory usage: {optimized_memory:.2f} MB")
print(f"Memory saved: {((original_memory - optimized_memory) / original_memory * 100):.1f}%\n")
return df_original, df_optimized
df_orig, df_opt = memory_usage_analysis()1.2 Chunk Processing Large Files
python
print("=== Chunk Processing Large Files ===")
def chunked_processing_demo():
"""Chunk processing demonstration"""
# Create large CSV file for demonstration
large_file = 'large_dataset.csv'
# Generate large dataset
print("Generating large dataset...")
chunk_size = 50000
total_rows = 500000
# Write to large file in batches
for i in range(0, total_rows, chunk_size):
chunk_data = pd.DataFrame({
'id': range(i, min(i + chunk_size, total_rows)),
'value1': np.random.randint(1, 1000, min(chunk_size, total_rows - i)),
'value2': np.random.random(min(chunk_size, total_rows - i)),
'category': np.random.choice(['A', 'B', 'C', 'D'], min(chunk_size, total_rows - i)),
'date': pd.date_range('2020-01-01', periods=min(chunk_size, total_rows - i), freq='1H')
})
# First write includes header, subsequent appends do not
chunk_data.to_csv(large_file, mode='w' if i == 0 else 'a',
header=(i == 0), index=False)
print(f"Generated file size: {os.path.getsize(large_file) / 1024**2:.2f} MB")
# Method 1: Load all at once (memory intensive)
print("\nMethod 1: Load all at once")
start_time = time.time()
try:
df_full = pd.read_csv(large_file)
full_load_time = time.time() - start_time
print(f"Load time: {full_load_time:.2f} seconds")
print(f"Memory usage: {df_full.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# Perform aggregation
result_full = df_full.groupby('category')['value1'].sum()
print(f"Aggregation result: {result_full.to_dict()}")
except MemoryError:
print("Insufficient memory, cannot load all at once")
# Method 2: Chunk loading (memory friendly)
print("\nMethod 2: Chunk loading")
start_time = time.time()
chunk_results = []
chunk_reader = pd.read_csv(large_file, chunksize=10000)
for i, chunk in enumerate(chunk_reader):
# Process each chunk
chunk_result = chunk.groupby('category')['value1'].sum()
chunk_results.append(chunk_result)
if i % 10 == 0:
print(f"Processed {(i+1)*10000} rows")
# Merge all chunk results
final_result = pd.concat(chunk_results).groupby(level=0).sum()
chunk_load_time = time.time() - start_time
print(f"Chunk processing time: {chunk_load_time:.2f} seconds")
print(f"Aggregation result: {final_result.to_dict()}")
# Clean up temporary file
if os.path.exists(large_file):
os.remove(large_file)
return final_result
chunk_result = chunked_processing_demo()1.3 Memory Monitoring and Analysis
python
print("\n=== Memory Monitoring and Analysis ===")
def memory_monitoring():
"""Memory monitoring tools"""
def get_memory_usage():
"""Get current memory usage"""
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return memory_info.rss / 1024**2 # MB
def memory_profiler_demo():
"""Memory analysis demonstration"""
print(f"Initial memory usage: {get_memory_usage():.2f} MB")
# Create large DataFrame
n = 100000
df = pd.DataFrame({
'A': np.random.randint(0, 1000, n),
'B': np.random.random(n),
'C': ['category_' + str(i % 100) for i in range(n)]
})
print(f"Memory usage after creating DataFrame: {get_memory_usage():.2f} MB")
# Data type optimization
df['A'] = pd.to_numeric(df['A'], downcast='integer')
df['B'] = pd.to_numeric(df['B'], downcast='float')
df['C'] = df['C'].astype('category')
print(f"Memory usage after optimization: {get_memory_usage():.2f} MB")
# Delete DataFrame to release memory
del df
print(f"Memory usage after deleting DataFrame: {get_memory_usage():.2f} MB")
return True
# Memory usage analysis tool
def analyze_dataframe_memory(df, name="DataFrame"):
"""Analyze DataFrame memory usage"""
print(f"\n{name} Memory Analysis:")
print("-" * 40)
memory_usage = df.memory_usage(deep=True)
total_memory = memory_usage.sum() / 1024**2
print(f"Total memory usage: {total_memory:.2f} MB")
print(f"Rows: {len(df):,}")
print(f"Columns: {len(df.columns)}")
print(f"Average memory per row: {total_memory * 1024 / len(df):.2f} KB")
print("\nMemory usage per column:")
for col in df.columns:
col_memory = df[col].memory_usage(deep=True) / 1024**2
print(f" {col}: {col_memory:.2f} MB ({col_memory/total_memory*100:.1f}%)")
print("\nData types:")
for col in df.columns:
print(f" {col}: {df[col].dtype}")
# Demonstrate memory analysis
memory_profiler_demo()
# Create sample data for analysis
sample_df = pd.DataFrame({
'integers': np.random.randint(0, 100, 10000),
'floats': np.random.random(10000),
'categories': np.random.choice(['A', 'B', 'C'], 10000),
'strings': ['string_' + str(i) for i in range(10000)],
'dates': pd.date_range('2020-01-01', periods=10000, freq='1H')
})
analyze_dataframe_memory(sample_df, "Original Data")
# Optimize data types
sample_df_opt = sample_df.copy()
sample_df_opt['integers'] = pd.to_numeric(sample_df_opt['integers'], downcast='integer')
sample_df_opt['floats'] = pd.to_numeric(sample_df_opt['floats'], downcast='float')
sample_df_opt['categories'] = sample_df_opt['categories'].astype('category')
analyze_dataframe_memory(sample_df_opt, "Optimized Data")
return sample_df_opt
optimized_df = memory_monitoring()2. Computational Performance Optimization
2.1 Vectorization Optimization
python
print("\n=== Computational Performance Optimization ===")
def vectorization_optimization():
"""Vectorization operation optimization"""
# Create test data
n = 1000000
np.random.seed(42)
df = pd.DataFrame({
'A': np.random.randint(1, 100, n),
'B': np.random.randint(1, 100, n),
'C': np.random.random(n),
'D': np.random.choice(['X', 'Y', 'Z'], n)
})
print(f"Test data: {len(df):,} rows")
# Performance test function
def time_operation(func, name, *args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{name}: {end_time - start_time:.4f} seconds")
return result
print("\n=== Conditional Calculation Performance Comparison ===")
# Method 1: Using apply (slow)
def apply_method(df):
return df.apply(lambda row: row['A'] * row['C'] if row['A'] > row['B'] else row['B'] * row['C'], axis=1)
# Method 2: Using numpy.where (fast)
def numpy_where_method(df):
return np.where(df['A'] > df['B'], df['A'] * df['C'], df['B'] * df['C'])
# Method 3: Using pandas.where (fast)
def pandas_where_method(df):
return (df['A'] * df['C']).where(df['A'] > df['B'], df['B'] * df['C'])
# Method 4: Using boolean indexing (fast)
def boolean_indexing_method(df):
result = pd.Series(index=df.index, dtype=float)
mask = df['A'] > df['B']
result[mask] = df.loc[mask, 'A'] * df.loc[mask, 'C']
result[~mask] = df.loc[~mask, 'B'] * df.loc[~mask, 'C']
return result
# Performance tests
result1 = time_operation(apply_method, "Apply method", df)
result2 = time_operation(numpy_where_method, "NumPy where", df)
result3 = time_operation(pandas_where_method, "Pandas where", df)
result4 = time_operation(boolean_indexing_method, "Boolean indexing", df)
print("\n=== Aggregation Operation Performance Comparison ===")
# Method 1: Standard groupby
def standard_groupby(df):
return df.groupby('D').agg({'A': 'sum', 'B': 'mean', 'C': 'std'})
# Method 2: Using transform for group calculations
def transform_method(df):
df_copy = df.copy()
df_copy['A_sum'] = df_copy.groupby('D')['A'].transform('sum')
df_copy['B_mean'] = df_copy.groupby('D')['B'].transform('mean')
df_copy['C_std'] = df_copy.groupby('D')['C'].transform('std')
return df_copy[['D', 'A_sum', 'B_mean', 'C_std']].drop_duplicates()
# Method 3: Using pivot_table
def pivot_table_method(df):
return pd.pivot_table(df, values=['A', 'B', 'C'], index='D',
aggfunc={'A': 'sum', 'B': 'mean', 'C': 'std'})
agg_result1 = time_operation(standard_groupby, "Standard GroupBy", df)
agg_result2 = time_operation(transform_method, "Transform method", df)
agg_result3 = time_operation(pivot_table_method, "Pivot Table", df)
print("\nGroupBy result:")
print(agg_result1)
return df
perf_df = vectorization_optimization()2.2 Index Optimization
python
print("\n=== Index Optimization ===")
def index_optimization():
"""Index optimization demonstration"""
# Create large dataset
n = 500000
np.random.seed(42)
df = pd.DataFrame({
'id': range(n),
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
'value': np.random.randint(1, 1000, n),
'date': pd.date_range('2020-01-01', periods=n, freq='1min')
})
print(f"Dataset size: {len(df):,} rows")
# Performance test function
def time_query(func, name, *args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{name}: {end_time - start_time:.4f} seconds (Result: {len(result)} rows)")
return result
print("\n=== Query Performance Comparison ===")
# Query without index
def query_without_index(df, category):
return df[df['category'] == category]
# Query after setting index
df_indexed = df.set_index('category')
def query_with_index(df, category):
return df.loc[category]
# MultiIndex query
df_multi_indexed = df.set_index(['category', 'date'])
def query_multi_index(df, category, start_date, end_date):
return df.loc[(category, slice(start_date, end_date)), :]
# Performance tests
target_category = 'A'
start_date = '2020-01-01'
end_date = '2020-01-02'
print("1. Single column query performance:")
result1 = time_query(query_without_index, "Query without index", df, target_category)
result2 = time_query(query_with_index, "Query with index", df_indexed, target_category)
print("\n2. Range query performance:")
def range_query_without_index(df, start_date, end_date):
return df[(df['date'] >= start_date) & (df['date'] <= end_date)]
df_date_indexed = df.set_index('date')
def range_query_with_index(df, start_date, end_date):
return df.loc[start_date:end_date]
result3 = time_query(range_query_without_index, "Date range query (no index)", df, start_date, end_date)
result4 = time_query(range_query_with_index, "Date range query (with index)", df_date_indexed, start_date, end_date)
print("\n3. Composite query performance:")
result5 = time_query(query_multi_index, "MultiIndex query", df_multi_indexed, target_category, start_date, end_date)
# Index memory usage analysis
print("\n=== Index Memory Usage Analysis ===")
def analyze_index_memory(df, name):
total_memory = df.memory_usage(deep=True).sum() / 1024**2
index_memory = df.index.memory_usage(deep=True) / 1024**2
print(f"{name}:")
print(f" Total memory: {total_memory:.2f} MB")
print(f" Index memory: {index_memory:.2f} MB ({index_memory/total_memory*100:.1f}%)")
analyze_index_memory(df, "Original DataFrame")
analyze_index_memory(df_indexed, "Single Index DataFrame")
analyze_index_memory(df_multi_indexed, "MultiIndex DataFrame")
return df_indexed
indexed_df = index_optimization()2.3 Data Type and Storage Optimization
python
print("\n=== Data Type and Storage Optimization ===")
def dtype_storage_optimization():
"""Data type and storage optimization"""
# Create test data
n = 100000
np.random.seed(42)
# Original data (non-optimized data types)
df_original = pd.DataFrame({
'small_int': np.random.randint(0, 10, n), # Can use int8
'medium_int': np.random.randint(0, 1000, n), # Can use int16
'large_int': np.random.randint(0, 100000, n), # Needs int32
'float_data': np.random.random(n), # Can use float32
'category_data': np.random.choice(['A', 'B', 'C', 'D', 'E'], n), # Can use category
'boolean_data': np.random.choice([True, False], n), # Already optimal
'sparse_data': np.random.choice([0, 1, 2, np.nan], n, p=[0.7, 0.1, 0.1, 0.1]) # Can use sparse
})
print("Original data types and memory usage:")
print(df_original.dtypes)
print(f"Total memory usage: {df_original.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n")
# Optimize data types
df_optimized = df_original.copy()
# 1. Integer type optimization
df_optimized['small_int'] = df_optimized['small_int'].astype('int8')
df_optimized['medium_int'] = df_optimized['medium_int'].astype('int16')
df_optimized['large_int'] = df_optimized['large_int'].astype('int32')
# 2. Float type optimization
df_optimized['float_data'] = df_optimized['float_data'].astype('float32')
# 3. Category data optimization
df_optimized['category_data'] = df_optimized['category_data'].astype('category')
# 4. Sparse data optimization
df_optimized['sparse_data'] = pd.arrays.SparseArray(df_optimized['sparse_data'])
print("Optimized data types and memory usage:")
print(df_optimized.dtypes)
optimized_memory = df_optimized.memory_usage(deep=True).sum() / 1024**2
original_memory = df_original.memory_usage(deep=True).sum() / 1024**2
print(f"Total memory usage: {optimized_memory:.2f} MB")
print(f"Memory saved: {((original_memory - optimized_memory) / original_memory * 100):.1f}%\n")
# Storage format optimization
print("=== Storage Format Optimization ===")
# Test performance of different storage formats
test_file_base = 'test_data'
# CSV format
start_time = time.time()
df_optimized.to_csv(f'{test_file_base}.csv', index=False)
csv_write_time = time.time() - start_time
csv_size = os.path.getsize(f'{test_file_base}.csv') / 1024**2
start_time = time.time()
df_csv = pd.read_csv(f'{test_file_base}.csv')
csv_read_time = time.time() - start_time
# Parquet format
start_time = time.time()
df_optimized.to_parquet(f'{test_file_base}.parquet', index=False)
parquet_write_time = time.time() - start_time
parquet_size = os.path.getsize(f'{test_file_base}.parquet') / 1024**2
start_time = time.time()
df_parquet = pd.read_parquet(f'{test_file_base}.parquet')
parquet_read_time = time.time() - start_time
# Pickle format
start_time = time.time()
df_optimized.to_pickle(f'{test_file_base}.pkl')
pickle_write_time = time.time() - start_time
pickle_size = os.path.getsize(f'{test_file_base}.pkl') / 1024**2
start_time = time.time()
df_pickle = pd.read_pickle(f'{test_file_base}.pkl')
pickle_read_time = time.time() - start_time
# Results comparison
print("Storage format performance comparison:")
print(f"{'Format':<10} {'Write Time':<12} {'Read Time':<12} {'File Size':<12} {'Compression':<10}")
print("-" * 60)
print(f"{'CSV':<10} {csv_write_time:<12.4f} {csv_read_time:<12.4f} {csv_size:<12.2f} {'1.00x':<10}")
print(f"{'Parquet':<10} {parquet_write_time:<12.4f} {parquet_read_time:<12.4f} {parquet_size:<12.2f} {csv_size/parquet_size:<10.2f}")
print(f"{'Pickle':<10} {pickle_write_time:<12.4f} {pickle_read_time:<12.4f} {pickle_size:<12.2f} {csv_size/pickle_size:<10.2f}")
# Clean up temporary files
for ext in ['.csv', '.parquet', '.pkl']:
file_path = f'{test_file_base}{ext}'
if os.path.exists(file_path):
os.remove(file_path)
return df_optimized
optimized_storage_df = dtype_storage_optimization()3. Parallel Processing and Multithreading
3.1 Multiprocessing
python
print("\n=== Parallel Processing Optimization ===")
def parallel_processing_demo():
"""Parallel processing demonstration"""
from multiprocessing import Pool, cpu_count
from functools import partial
# Create large dataset
n = 1000000
np.random.seed(42)
df = pd.DataFrame({
'A': np.random.randint(1, 100, n),
'B': np.random.randint(1, 100, n),
'C': np.random.random(n),
'group': np.random.randint(0, 100, n)
})
print(f"Dataset size: {len(df):,} rows")
print(f"CPU cores: {cpu_count()}")
# Define computationally intensive function
def complex_calculation(group_data):
"""Complex calculation function"""
group_id, data = group_data
# Simulate complex calculations
result = {
'group': group_id,
'sum_A': data['A'].sum(),
'mean_B': data['B'].mean(),
'std_C': data['C'].std(),
'correlation_AB': data['A'].corr(data['B']),
'percentile_95_A': data['A'].quantile(0.95)
}
# Add some computationally intensive operations
for _ in range(1000):
temp = data['A'].values ** 2 + data['B'].values ** 0.5
return result
# Prepare grouped data
grouped_data = [(name, group) for name, group in df.groupby('group')]
print(f"Number of groups: {len(grouped_data)}")
# Method 1: Serial processing
print("\n1. Serial processing:")
start_time = time.time()
serial_results = [complex_calculation(group_data) for group_data in grouped_data]
serial_time = time.time() - start_time
print(f"Serial processing time: {serial_time:.2f} seconds")
# Method 2: Parallel processing
print("\n2. Parallel processing:")
start_time = time.time()
with Pool(processes=cpu_count()) as pool:
parallel_results = pool.map(complex_calculation, grouped_data)
parallel_time = time.time() - start_time
print(f"Parallel processing time: {parallel_time:.2f} seconds")
print(f"Speedup: {serial_time / parallel_time:.2f}x")
# Verify result consistency
serial_df = pd.DataFrame(serial_results)
parallel_df = pd.DataFrame(parallel_results)
print(f"\nResult consistency check: {serial_df.equals(parallel_df)}")
return parallel_df
parallel_result = parallel_processing_demo()3.2 Dask Parallel Computing
python
print("\n=== Dask Parallel Computing ===")
def dask_parallel_demo():
"""Dask parallel computing demonstration"""
try:
import dask.dataframe as dd
from dask.multiprocessing import get
# Create large dataset
n = 1000000
np.random.seed(42)
# Create Pandas DataFrame
df_pandas = pd.DataFrame({
'A': np.random.randint(1, 1000, n),
'B': np.random.random(n),
'C': np.random.choice(['X', 'Y', 'Z'], n),
'D': pd.date_range('2020-01-01', periods=n, freq='1min')
})
print(f"Dataset size: {len(df_pandas):,} rows")
# Convert to Dask DataFrame
df_dask = dd.from_pandas(df_pandas, npartitions=8)
print(f"Dask partitions: {df_dask.npartitions}")
# Performance comparison tests
print("\n=== Performance Comparison Tests ===")
# 1. Aggregation operation comparison
print("1. Aggregation operations:")
# Pandas aggregation
start_time = time.time()
pandas_agg = df_pandas.groupby('C').agg({
'A': ['sum', 'mean', 'std'],
'B': ['min', 'max', 'median']
})
pandas_agg_time = time.time() - start_time
# Dask aggregation
start_time = time.time()
dask_agg = df_dask.groupby('C').agg({
'A': ['sum', 'mean', 'std'],
'B': ['min', 'max']
}).compute()
dask_agg_time = time.time() - start_time
print(f"Pandas aggregation time: {pandas_agg_time:.2f} seconds")
print(f"Dask aggregation time: {dask_agg_time:.2f} seconds")
# 2. Complex calculation comparison
print("\n2. Complex calculations:")
# Pandas complex calculation
start_time = time.time()
pandas_complex = df_pandas.assign(
A_squared=df_pandas['A'] ** 2,
B_log=np.log(df_pandas['B'] + 1),
A_B_ratio=df_pandas['A'] / (df_pandas['B'] + 0.001)
)
pandas_complex_time = time.time() - start_time
# Dask complex calculation
start_time = time.time()
dask_complex = df_dask.assign(
A_squared=df_dask['A'] ** 2,
B_log=dd.log(df_dask['B'] + 1),
A_B_ratio=df_dask['A'] / (df_dask['B'] + 0.001)
).compute()
dask_complex_time = time.time() - start_time
print(f"Pandas complex calculation time: {pandas_complex_time:.2f} seconds")
print(f"Dask complex calculation time: {dask_complex_time:.2f} seconds")
# 3. Memory usage comparison
print("\n3. Memory usage analysis:")
pandas_memory = df_pandas.memory_usage(deep=True).sum() / 1024**2
print(f"Pandas memory usage: {pandas_memory:.2f} MB")
print(f"Dask uses lazy evaluation for more efficient memory usage")
return dask_agg
except ImportError:
print("Dask not installed, skipping Dask demonstration")
print("Install command: pip install dask[complete]")
return None
dask_result = dask_parallel_demo()4. Big Data Processing Strategies
4.1 Memory Mapping and Lazy Loading
python
print("\n=== Big Data Processing Strategies ===")
def big_data_strategies():
"""Big data processing strategies"""
# 1. Data sampling strategy
print("1. Data sampling strategies:")
# Create large dataset
n = 1000000
np.random.seed(42)
large_df = pd.DataFrame({
'id': range(n),
'value1': np.random.randint(1, 1000, n),
'value2': np.random.random(n),
'category': np.random.choice(['A', 'B', 'C', 'D'], n),
'date': pd.date_range('2020-01-01', periods=n, freq='1min')
})
print(f"Original data size: {len(large_df):,} rows")
# Random sampling
sample_random = large_df.sample(n=10000, random_state=42)
print(f"Random sampling: {len(sample_random):,} rows")
# Stratified sampling
sample_stratified = large_df.groupby('category', group_keys=False).apply(
lambda x: x.sample(min(len(x), 2500), random_state=42)
)
print(f"Stratified sampling: {len(sample_stratified):,} rows")
# Systematic sampling
sample_systematic = large_df.iloc[::100] # Take every 100th row
print(f"Systematic sampling: {len(sample_systematic):,} rows")
# 2. Data partitioning strategy
print("\n2. Data partitioning strategies:")
def partition_data_by_date(df, partition_freq='M'):
"""Partition data by date"""
df['partition'] = df['date'].dt.to_period(partition_freq)
partitions = {}
for partition_name, partition_data in df.groupby('partition'):
partitions[str(partition_name)] = partition_data.drop('partition', axis=1)
return partitions
# Partition by month
monthly_partitions = partition_data_by_date(large_df, 'M')
print(f"Monthly partition count: {len(monthly_partitions)}")
for partition_name, partition_df in list(monthly_partitions.items())[:3]:
print(f" {partition_name}: {len(partition_df):,} rows")
# 3. Incremental processing strategy
print("\n3. Incremental processing strategies:")
def incremental_processing_demo():
"""Incremental processing demonstration"""
# Simulate historical data
historical_data = large_df.iloc[:800000].copy()
# Simulate new data
new_data = large_df.iloc[800000:].copy()
print(f"Historical data: {len(historical_data):,} rows")
print(f"New data: {len(new_data):,} rows")
# Historical data aggregation result (assume already calculated)
historical_agg = historical_data.groupby('category').agg({
'value1': 'sum',
'value2': 'mean'
})
print("Historical aggregation result:")
print(historical_agg)
# New data aggregation
new_agg = new_data.groupby('category').agg({
'value1': 'sum',
'value2': 'mean'
})
# Incrementally update aggregation result
def incremental_update(historical_agg, new_agg, historical_count, new_count):
"""Incrementally update aggregation result"""
updated_agg = historical_agg.copy()
for category in new_agg.index:
if category in updated_agg.index:
# Update sum
updated_agg.loc[category, 'value1'] += new_agg.loc[category, 'value1']
# Update mean (weighted average)
hist_count = historical_count.get(category, 0)
new_count_cat = new_count.get(category, 0)
total_count = hist_count + new_count_cat
if total_count > 0:
updated_agg.loc[category, 'value2'] = (
(updated_agg.loc[category, 'value2'] * hist_count +
new_agg.loc[category, 'value2'] * new_count_cat) / total_count
)
else:
# New category
updated_agg.loc[category] = new_agg.loc[category]
return updated_agg
# Calculate record counts
historical_count = historical_data['category'].value_counts().to_dict()
new_count = new_data['category'].value_counts().to_dict()
# Incremental update
updated_result = incremental_update(historical_agg, new_agg, historical_count, new_count)
# Verify result
full_agg = large_df.groupby('category').agg({
'value1': 'sum',
'value2': 'mean'
})
print("\nIncremental update result:")
print(updated_result)
print("\nFull calculation result:")
print(full_agg)
print(f"\nResult consistency: {np.allclose(updated_result.values, full_agg.values)}")
incremental_processing_demo()
return sample_random, monthly_partitions
sample_data, partitions = big_data_strategies()4.2 Performance Monitoring and Tuning
python
print("\n=== Performance Monitoring and Tuning ===")
def performance_monitoring():
"""Performance monitoring and tuning"""
import cProfile
import pstats
from io import StringIO
# Performance analysis decorator
def profile_performance(func):
"""Performance analysis decorator"""
def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(10) # Show top 10 time-consuming functions
print(f"\n{func.__name__} Performance Analysis:")
print(s.getvalue())
return result
return wrapper
# Create test data
n = 100000
np.random.seed(42)
df = pd.DataFrame({
'A': np.random.randint(1, 1000, n),
'B': np.random.random(n),
'C': np.random.choice(['X', 'Y', 'Z'], n),
'D': pd.date_range('2020-01-01', periods=n, freq='1min')
})
# Define test functions
@profile_performance
def inefficient_operation(df):
"""Inefficient operation example"""
result = []
for i in range(len(df)):
if df.iloc[i]['A'] > 500:
result.append(df.iloc[i]['A'] * df.iloc[i]['B'])
else:
result.append(0)
return result
@profile_performance
def efficient_operation(df):
"""Efficient operation example"""
mask = df['A'] > 500
result = np.where(mask, df['A'] * df['B'], 0)
return result
print("Performance comparison test:")
# Test inefficient operation
print("\n=== Inefficient Operation ===")
start_time = time.time()
result1 = inefficient_operation(df)
inefficient_time = time.time() - start_time
print(f"Inefficient operation time: {inefficient_time:.4f} seconds")
# Test efficient operation
print("\n=== Efficient Operation ===")
start_time = time.time()
result2 = efficient_operation(df)
efficient_time = time.time() - start_time
print(f"Efficient operation time: {efficient_time:.4f} seconds")
print(f"\nPerformance improvement: {inefficient_time / efficient_time:.1f}x")
# Memory usage monitoring
def memory_usage_monitor(func, *args, **kwargs):
"""Memory usage monitoring"""
import tracemalloc
tracemalloc.start()
# Execute function
result = func(*args, **kwargs)
# Get memory usage
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"{func.__name__} Memory Usage:")
print(f" Current memory: {current / 1024**2:.2f} MB")
print(f" Peak memory: {peak / 1024**2:.2f} MB")
return result
# Memory usage test
print("\n=== Memory Usage Monitoring ===")
def memory_intensive_operation(df):
"""Memory intensive operation"""
# Create multiple copies
df_copies = [df.copy() for _ in range(5)]
# Execute calculations
results = []
for df_copy in df_copies:
df_copy['new_col'] = df_copy['A'] * df_copy['B']
results.append(df_copy.groupby('C')['new_col'].sum())
return pd.concat(results, axis=1)
def memory_efficient_operation(df):
"""Memory efficient operation"""
# Avoid creating multiple copies
df_temp = df.copy()
df_temp['new_col'] = df_temp['A'] * df_temp['B']
results = []
for i in range(5):
result = df_temp.groupby('C')['new_col'].sum()
results.append(result)
return pd.concat(results, axis=1)
memory_usage_monitor(memory_intensive_operation, df)
memory_usage_monitor(memory_efficient_operation, df)
return df
monitored_df = performance_monitoring()Chapter Summary
This chapter comprehensively covered Pandas performance optimization techniques:
Core Content Review
- Memory Optimization: Data type optimization, chunk processing, memory monitoring
- Computational Optimization: Vectorization, index optimization, storage format selection
- Parallel Processing: Multiprocessing, Dask parallel computing
- Big Data Strategies: Sampling, partitioning, incremental processing
- Performance Monitoring: Profiling, memory monitoring, tuning techniques
Key Optimization Principles
- Prioritize vectorized operations over loops
- Choose appropriate data types to reduce memory usage
- Build proper indexes to improve query performance
- Use chunk processing for large files to avoid memory overflow
- Use parallel computing to fully utilize multi-core CPUs
Performance Optimization Checklist
Memory Optimization
- [ ] Use appropriate data types (int8, int16, category, etc.)
- [ ] Use sparse arrays for sparse data
- [ ] Read large files in chunks
- [ ] Delete variables no longer needed
Computational Optimization
- [ ] Use vectorized operations instead of apply
- [ ] Build appropriate indexes
- [ ] Choose efficient storage formats (Parquet, HDF5)
- [ ] Avoid unnecessary data copying
Parallel Optimization
- [ ] Use multiprocessing for CPU-intensive tasks
- [ ] Consider using Dask for very large datasets
- [ ] Set appropriate parallelism levels
- [ ] Be aware of parallel overhead
Practical Recommendations
- Development Phase: Use small datasets for rapid prototyping
- Testing Phase: Use sampled data to verify algorithm correctness
- Production Phase: Apply all optimization techniques for full data
- Monitoring Phase: Continuously monitor performance metrics, tune as needed
Common Performance Pitfalls
- Overusing apply functions
- Frequent data type conversions
- Unnecessary data copying
- Lack of proper indexes
- Ignoring memory usage monitoring
Mastering these performance optimization techniques helps you:
- Process larger scale datasets
- Significantly improve data processing speed
- Optimize memory usage efficiency
- Build scalable data processing systems
Practice Exercises
- Optimize memory usage of a DataFrame with 10 million rows
- Implement a high-performance data aggregation system
- Design a chunk processing workflow for large files
- Develop a parallel data processing framework
- Build a performance monitoring and tuning tool
In the next chapter, we will compile Pandas learning resources to guide your further learning.