Skip to content

Pandas Performance Optimization

Overview

When processing large-scale data, Pandas performance optimization becomes crucial. This chapter explores various optimization techniques in depth, including memory optimization, computational optimization, data type optimization, parallel processing, and more, helping you build efficient data processing workflows.

1. Memory Optimization

1.1 Data Type Optimization

python
import pandas as pd
import numpy as np
import time
import psutil
import os
from memory_profiler import profile

print("=== Data Type Optimization ===")

def memory_usage_analysis():
    """Memory usage analysis"""
    
    # Create sample data
    n = 1000000
    np.random.seed(42)
    
    # Original data (not optimized)
    df_original = pd.DataFrame({
        'int_col': np.random.randint(0, 100, n),
        'float_col': np.random.random(n),
        'category_col': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
        'bool_col': np.random.choice([True, False], n),
        'date_col': pd.date_range('2020-01-01', periods=n, freq='1min')
    })
    
    print("Original data memory usage:")
    print(df_original.info(memory_usage='deep'))
    original_memory = df_original.memory_usage(deep=True).sum() / 1024**2
    print(f"Total memory usage: {original_memory:.2f} MB\n")
    
    # Optimized data
    df_optimized = df_original.copy()
    
    # 1. Integer type optimization
    df_optimized['int_col'] = pd.to_numeric(df_optimized['int_col'], downcast='integer')
    
    # 2. Float type optimization
    df_optimized['float_col'] = pd.to_numeric(df_optimized['float_col'], downcast='float')
    
    # 3. Category data optimization
    df_optimized['category_col'] = df_optimized['category_col'].astype('category')
    
    # 4. Boolean type is already optimal
    
    print("Optimized data memory usage:")
    print(df_optimized.info(memory_usage='deep'))
    optimized_memory = df_optimized.memory_usage(deep=True).sum() / 1024**2
    print(f"Total memory usage: {optimized_memory:.2f} MB")
    print(f"Memory saved: {((original_memory - optimized_memory) / original_memory * 100):.1f}%\n")
    
    return df_original, df_optimized

df_orig, df_opt = memory_usage_analysis()

1.2 Chunk Processing Large Files

python
print("=== Chunk Processing Large Files ===")

def chunked_processing_demo():
    """Chunk processing demonstration"""
    
    # Create large CSV file for demonstration
    large_file = 'large_dataset.csv'
    
    # Generate large dataset
    print("Generating large dataset...")
    chunk_size = 50000
    total_rows = 500000
    
    # Write to large file in batches
    for i in range(0, total_rows, chunk_size):
        chunk_data = pd.DataFrame({
            'id': range(i, min(i + chunk_size, total_rows)),
            'value1': np.random.randint(1, 1000, min(chunk_size, total_rows - i)),
            'value2': np.random.random(min(chunk_size, total_rows - i)),
            'category': np.random.choice(['A', 'B', 'C', 'D'], min(chunk_size, total_rows - i)),
            'date': pd.date_range('2020-01-01', periods=min(chunk_size, total_rows - i), freq='1H')
        })
        
        # First write includes header, subsequent appends do not
        chunk_data.to_csv(large_file, mode='w' if i == 0 else 'a', 
                          header=(i == 0), index=False)
    
    print(f"Generated file size: {os.path.getsize(large_file) / 1024**2:.2f} MB")
    
    # Method 1: Load all at once (memory intensive)
    print("\nMethod 1: Load all at once")
    start_time = time.time()
    try:
        df_full = pd.read_csv(large_file)
        full_load_time = time.time() - start_time
        print(f"Load time: {full_load_time:.2f} seconds")
        print(f"Memory usage: {df_full.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
        
        # Perform aggregation
        result_full = df_full.groupby('category')['value1'].sum()
        print(f"Aggregation result: {result_full.to_dict()}")
        
    except MemoryError:
        print("Insufficient memory, cannot load all at once")
    
    # Method 2: Chunk loading (memory friendly)
    print("\nMethod 2: Chunk loading")
    start_time = time.time()
    
    chunk_results = []
    chunk_reader = pd.read_csv(large_file, chunksize=10000)
    
    for i, chunk in enumerate(chunk_reader):
        # Process each chunk
        chunk_result = chunk.groupby('category')['value1'].sum()
        chunk_results.append(chunk_result)
        
        if i % 10 == 0:
            print(f"Processed {(i+1)*10000} rows")
    
    # Merge all chunk results
    final_result = pd.concat(chunk_results).groupby(level=0).sum()
    chunk_load_time = time.time() - start_time
    
    print(f"Chunk processing time: {chunk_load_time:.2f} seconds")
    print(f"Aggregation result: {final_result.to_dict()}")
    
    # Clean up temporary file
    if os.path.exists(large_file):
        os.remove(large_file)
    
    return final_result

chunk_result = chunked_processing_demo()

1.3 Memory Monitoring and Analysis

python
print("\n=== Memory Monitoring and Analysis ===")

def memory_monitoring():
    """Memory monitoring tools"""
    
    def get_memory_usage():
        """Get current memory usage"""
        process = psutil.Process(os.getpid())
        memory_info = process.memory_info()
        return memory_info.rss / 1024**2  # MB
    
    def memory_profiler_demo():
        """Memory analysis demonstration"""
        print(f"Initial memory usage: {get_memory_usage():.2f} MB")
        
        # Create large DataFrame
        n = 100000
        df = pd.DataFrame({
            'A': np.random.randint(0, 1000, n),
            'B': np.random.random(n),
            'C': ['category_' + str(i % 100) for i in range(n)]
        })
        
        print(f"Memory usage after creating DataFrame: {get_memory_usage():.2f} MB")
        
        # Data type optimization
        df['A'] = pd.to_numeric(df['A'], downcast='integer')
        df['B'] = pd.to_numeric(df['B'], downcast='float')
        df['C'] = df['C'].astype('category')
        
        print(f"Memory usage after optimization: {get_memory_usage():.2f} MB")
        
        # Delete DataFrame to release memory
        del df
        
        print(f"Memory usage after deleting DataFrame: {get_memory_usage():.2f} MB")
        
        return True
    
    # Memory usage analysis tool
    def analyze_dataframe_memory(df, name="DataFrame"):
        """Analyze DataFrame memory usage"""
        print(f"\n{name} Memory Analysis:")
        print("-" * 40)
        
        memory_usage = df.memory_usage(deep=True)
        total_memory = memory_usage.sum() / 1024**2
        
        print(f"Total memory usage: {total_memory:.2f} MB")
        print(f"Rows: {len(df):,}")
        print(f"Columns: {len(df.columns)}")
        print(f"Average memory per row: {total_memory * 1024 / len(df):.2f} KB")
        
        print("\nMemory usage per column:")
        for col in df.columns:
            col_memory = df[col].memory_usage(deep=True) / 1024**2
            print(f"  {col}: {col_memory:.2f} MB ({col_memory/total_memory*100:.1f}%)")
        
        print("\nData types:")
        for col in df.columns:
            print(f"  {col}: {df[col].dtype}")
    
    # Demonstrate memory analysis
    memory_profiler_demo()
    
    # Create sample data for analysis
    sample_df = pd.DataFrame({
        'integers': np.random.randint(0, 100, 10000),
        'floats': np.random.random(10000),
        'categories': np.random.choice(['A', 'B', 'C'], 10000),
        'strings': ['string_' + str(i) for i in range(10000)],
        'dates': pd.date_range('2020-01-01', periods=10000, freq='1H')
    })
    
    analyze_dataframe_memory(sample_df, "Original Data")
    
    # Optimize data types
    sample_df_opt = sample_df.copy()
    sample_df_opt['integers'] = pd.to_numeric(sample_df_opt['integers'], downcast='integer')
    sample_df_opt['floats'] = pd.to_numeric(sample_df_opt['floats'], downcast='float')
    sample_df_opt['categories'] = sample_df_opt['categories'].astype('category')
    
    analyze_dataframe_memory(sample_df_opt, "Optimized Data")
    
    return sample_df_opt

optimized_df = memory_monitoring()

2. Computational Performance Optimization

2.1 Vectorization Optimization

python
print("\n=== Computational Performance Optimization ===")

def vectorization_optimization():
    """Vectorization operation optimization"""
    
    # Create test data
    n = 1000000
    np.random.seed(42)
    
    df = pd.DataFrame({
        'A': np.random.randint(1, 100, n),
        'B': np.random.randint(1, 100, n),
        'C': np.random.random(n),
        'D': np.random.choice(['X', 'Y', 'Z'], n)
    })
    
    print(f"Test data: {len(df):,} rows")
    
    # Performance test function
    def time_operation(func, name, *args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{name}: {end_time - start_time:.4f} seconds")
        return result
    
    print("\n=== Conditional Calculation Performance Comparison ===")
    
    # Method 1: Using apply (slow)
    def apply_method(df):
        return df.apply(lambda row: row['A'] * row['C'] if row['A'] > row['B'] else row['B'] * row['C'], axis=1)
    
    # Method 2: Using numpy.where (fast)
    def numpy_where_method(df):
        return np.where(df['A'] > df['B'], df['A'] * df['C'], df['B'] * df['C'])
    
    # Method 3: Using pandas.where (fast)
    def pandas_where_method(df):
        return (df['A'] * df['C']).where(df['A'] > df['B'], df['B'] * df['C'])
    
    # Method 4: Using boolean indexing (fast)
    def boolean_indexing_method(df):
        result = pd.Series(index=df.index, dtype=float)
        mask = df['A'] > df['B']
        result[mask] = df.loc[mask, 'A'] * df.loc[mask, 'C']
        result[~mask] = df.loc[~mask, 'B'] * df.loc[~mask, 'C']
        return result
    
    # Performance tests
    result1 = time_operation(apply_method, "Apply method", df)
    result2 = time_operation(numpy_where_method, "NumPy where", df)
    result3 = time_operation(pandas_where_method, "Pandas where", df)
    result4 = time_operation(boolean_indexing_method, "Boolean indexing", df)
    
    print("\n=== Aggregation Operation Performance Comparison ===")
    
    # Method 1: Standard groupby
    def standard_groupby(df):
        return df.groupby('D').agg({'A': 'sum', 'B': 'mean', 'C': 'std'})
    
    # Method 2: Using transform for group calculations
    def transform_method(df):
        df_copy = df.copy()
        df_copy['A_sum'] = df_copy.groupby('D')['A'].transform('sum')
        df_copy['B_mean'] = df_copy.groupby('D')['B'].transform('mean')
        df_copy['C_std'] = df_copy.groupby('D')['C'].transform('std')
        return df_copy[['D', 'A_sum', 'B_mean', 'C_std']].drop_duplicates()
    
    # Method 3: Using pivot_table
    def pivot_table_method(df):
        return pd.pivot_table(df, values=['A', 'B', 'C'], index='D', 
                             aggfunc={'A': 'sum', 'B': 'mean', 'C': 'std'})
    
    agg_result1 = time_operation(standard_groupby, "Standard GroupBy", df)
    agg_result2 = time_operation(transform_method, "Transform method", df)
    agg_result3 = time_operation(pivot_table_method, "Pivot Table", df)
    
    print("\nGroupBy result:")
    print(agg_result1)
    
    return df

perf_df = vectorization_optimization()

2.2 Index Optimization

python
print("\n=== Index Optimization ===")

def index_optimization():
    """Index optimization demonstration"""
    
    # Create large dataset
    n = 500000
    np.random.seed(42)
    
    df = pd.DataFrame({
        'id': range(n),
        'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),
        'value': np.random.randint(1, 1000, n),
        'date': pd.date_range('2020-01-01', periods=n, freq='1min')
    })
    
    print(f"Dataset size: {len(df):,} rows")
    
    # Performance test function
    def time_query(func, name, *args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{name}: {end_time - start_time:.4f} seconds (Result: {len(result)} rows)")
        return result
    
    print("\n=== Query Performance Comparison ===")
    
    # Query without index
    def query_without_index(df, category):
        return df[df['category'] == category]
    
    # Query after setting index
    df_indexed = df.set_index('category')
    
    def query_with_index(df, category):
        return df.loc[category]
    
    # MultiIndex query
    df_multi_indexed = df.set_index(['category', 'date'])
    
    def query_multi_index(df, category, start_date, end_date):
        return df.loc[(category, slice(start_date, end_date)), :]
    
    # Performance tests
    target_category = 'A'
    start_date = '2020-01-01'
    end_date = '2020-01-02'
    
    print("1. Single column query performance:")
    result1 = time_query(query_without_index, "Query without index", df, target_category)
    result2 = time_query(query_with_index, "Query with index", df_indexed, target_category)
    
    print("\n2. Range query performance:")
    
    def range_query_without_index(df, start_date, end_date):
        return df[(df['date'] >= start_date) & (df['date'] <= end_date)]
    
    df_date_indexed = df.set_index('date')
    
    def range_query_with_index(df, start_date, end_date):
        return df.loc[start_date:end_date]
    
    result3 = time_query(range_query_without_index, "Date range query (no index)", df, start_date, end_date)
    result4 = time_query(range_query_with_index, "Date range query (with index)", df_date_indexed, start_date, end_date)
    
    print("\n3. Composite query performance:")
    result5 = time_query(query_multi_index, "MultiIndex query", df_multi_indexed, target_category, start_date, end_date)
    
    # Index memory usage analysis
    print("\n=== Index Memory Usage Analysis ===")
    
    def analyze_index_memory(df, name):
        total_memory = df.memory_usage(deep=True).sum() / 1024**2
        index_memory = df.index.memory_usage(deep=True) / 1024**2
        print(f"{name}:")
        print(f"  Total memory: {total_memory:.2f} MB")
        print(f"  Index memory: {index_memory:.2f} MB ({index_memory/total_memory*100:.1f}%)")
    
    analyze_index_memory(df, "Original DataFrame")
    analyze_index_memory(df_indexed, "Single Index DataFrame")
    analyze_index_memory(df_multi_indexed, "MultiIndex DataFrame")
    
    return df_indexed

indexed_df = index_optimization()

2.3 Data Type and Storage Optimization

python
print("\n=== Data Type and Storage Optimization ===")

def dtype_storage_optimization():
    """Data type and storage optimization"""
    
    # Create test data
    n = 100000
    np.random.seed(42)
    
    # Original data (non-optimized data types)
    df_original = pd.DataFrame({
        'small_int': np.random.randint(0, 10, n),  # Can use int8
        'medium_int': np.random.randint(0, 1000, n),  # Can use int16
        'large_int': np.random.randint(0, 100000, n),  # Needs int32
        'float_data': np.random.random(n),  # Can use float32
        'category_data': np.random.choice(['A', 'B', 'C', 'D', 'E'], n),  # Can use category
        'boolean_data': np.random.choice([True, False], n),  # Already optimal
        'sparse_data': np.random.choice([0, 1, 2, np.nan], n, p=[0.7, 0.1, 0.1, 0.1])  # Can use sparse
    })
    
    print("Original data types and memory usage:")
    print(df_original.dtypes)
    print(f"Total memory usage: {df_original.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n")
    
    # Optimize data types
    df_optimized = df_original.copy()
    
    # 1. Integer type optimization
    df_optimized['small_int'] = df_optimized['small_int'].astype('int8')
    df_optimized['medium_int'] = df_optimized['medium_int'].astype('int16')
    df_optimized['large_int'] = df_optimized['large_int'].astype('int32')
    
    # 2. Float type optimization
    df_optimized['float_data'] = df_optimized['float_data'].astype('float32')
    
    # 3. Category data optimization
    df_optimized['category_data'] = df_optimized['category_data'].astype('category')
    
    # 4. Sparse data optimization
    df_optimized['sparse_data'] = pd.arrays.SparseArray(df_optimized['sparse_data'])
    
    print("Optimized data types and memory usage:")
    print(df_optimized.dtypes)
    optimized_memory = df_optimized.memory_usage(deep=True).sum() / 1024**2
    original_memory = df_original.memory_usage(deep=True).sum() / 1024**2
    print(f"Total memory usage: {optimized_memory:.2f} MB")
    print(f"Memory saved: {((original_memory - optimized_memory) / original_memory * 100):.1f}%\n")
    
    # Storage format optimization
    print("=== Storage Format Optimization ===")
    
    # Test performance of different storage formats
    test_file_base = 'test_data'
    
    # CSV format
    start_time = time.time()
    df_optimized.to_csv(f'{test_file_base}.csv', index=False)
    csv_write_time = time.time() - start_time
    csv_size = os.path.getsize(f'{test_file_base}.csv') / 1024**2
    
    start_time = time.time()
    df_csv = pd.read_csv(f'{test_file_base}.csv')
    csv_read_time = time.time() - start_time
    
    # Parquet format
    start_time = time.time()
    df_optimized.to_parquet(f'{test_file_base}.parquet', index=False)
    parquet_write_time = time.time() - start_time
    parquet_size = os.path.getsize(f'{test_file_base}.parquet') / 1024**2
    
    start_time = time.time()
    df_parquet = pd.read_parquet(f'{test_file_base}.parquet')
    parquet_read_time = time.time() - start_time
    
    # Pickle format
    start_time = time.time()
    df_optimized.to_pickle(f'{test_file_base}.pkl')
    pickle_write_time = time.time() - start_time
    pickle_size = os.path.getsize(f'{test_file_base}.pkl') / 1024**2
    
    start_time = time.time()
    df_pickle = pd.read_pickle(f'{test_file_base}.pkl')
    pickle_read_time = time.time() - start_time
    
    # Results comparison
    print("Storage format performance comparison:")
    print(f"{'Format':<10} {'Write Time':<12} {'Read Time':<12} {'File Size':<12} {'Compression':<10}")
    print("-" * 60)
    print(f"{'CSV':<10} {csv_write_time:<12.4f} {csv_read_time:<12.4f} {csv_size:<12.2f} {'1.00x':<10}")
    print(f"{'Parquet':<10} {parquet_write_time:<12.4f} {parquet_read_time:<12.4f} {parquet_size:<12.2f} {csv_size/parquet_size:<10.2f}")
    print(f"{'Pickle':<10} {pickle_write_time:<12.4f} {pickle_read_time:<12.4f} {pickle_size:<12.2f} {csv_size/pickle_size:<10.2f}")
    
    # Clean up temporary files
    for ext in ['.csv', '.parquet', '.pkl']:
        file_path = f'{test_file_base}{ext}'
        if os.path.exists(file_path):
            os.remove(file_path)
    
    return df_optimized

optimized_storage_df = dtype_storage_optimization()

3. Parallel Processing and Multithreading

3.1 Multiprocessing

python
print("\n=== Parallel Processing Optimization ===")

def parallel_processing_demo():
    """Parallel processing demonstration"""
    
    from multiprocessing import Pool, cpu_count
    from functools import partial
    
    # Create large dataset
    n = 1000000
    np.random.seed(42)
    
    df = pd.DataFrame({
        'A': np.random.randint(1, 100, n),
        'B': np.random.randint(1, 100, n),
        'C': np.random.random(n),
        'group': np.random.randint(0, 100, n)
    })
    
    print(f"Dataset size: {len(df):,} rows")
    print(f"CPU cores: {cpu_count()}")
    
    # Define computationally intensive function
    def complex_calculation(group_data):
        """Complex calculation function"""
        group_id, data = group_data
        
        # Simulate complex calculations
        result = {
            'group': group_id,
            'sum_A': data['A'].sum(),
            'mean_B': data['B'].mean(),
            'std_C': data['C'].std(),
            'correlation_AB': data['A'].corr(data['B']),
            'percentile_95_A': data['A'].quantile(0.95)
        }
        
        # Add some computationally intensive operations
        for _ in range(1000):
            temp = data['A'].values ** 2 + data['B'].values ** 0.5
        
        return result
    
    # Prepare grouped data
    grouped_data = [(name, group) for name, group in df.groupby('group')]
    
    print(f"Number of groups: {len(grouped_data)}")
    
    # Method 1: Serial processing
    print("\n1. Serial processing:")
    start_time = time.time()
    serial_results = [complex_calculation(group_data) for group_data in grouped_data]
    serial_time = time.time() - start_time
    print(f"Serial processing time: {serial_time:.2f} seconds")
    
    # Method 2: Parallel processing
    print("\n2. Parallel processing:")
    start_time = time.time()
    
    with Pool(processes=cpu_count()) as pool:
        parallel_results = pool.map(complex_calculation, grouped_data)
    
    parallel_time = time.time() - start_time
    print(f"Parallel processing time: {parallel_time:.2f} seconds")
    print(f"Speedup: {serial_time / parallel_time:.2f}x")
    
    # Verify result consistency
    serial_df = pd.DataFrame(serial_results)
    parallel_df = pd.DataFrame(parallel_results)
    
    print(f"\nResult consistency check: {serial_df.equals(parallel_df)}")
    
    return parallel_df

parallel_result = parallel_processing_demo()

3.2 Dask Parallel Computing

python
print("\n=== Dask Parallel Computing ===")

def dask_parallel_demo():
    """Dask parallel computing demonstration"""
    
    try:
        import dask.dataframe as dd
        from dask.multiprocessing import get
        
        # Create large dataset
        n = 1000000
        np.random.seed(42)
        
        # Create Pandas DataFrame
        df_pandas = pd.DataFrame({
            'A': np.random.randint(1, 1000, n),
            'B': np.random.random(n),
            'C': np.random.choice(['X', 'Y', 'Z'], n),
            'D': pd.date_range('2020-01-01', periods=n, freq='1min')
        })
        
        print(f"Dataset size: {len(df_pandas):,} rows")
        
        # Convert to Dask DataFrame
        df_dask = dd.from_pandas(df_pandas, npartitions=8)
        
        print(f"Dask partitions: {df_dask.npartitions}")
        
        # Performance comparison tests
        print("\n=== Performance Comparison Tests ===")
        
        # 1. Aggregation operation comparison
        print("1. Aggregation operations:")
        
        # Pandas aggregation
        start_time = time.time()
        pandas_agg = df_pandas.groupby('C').agg({
            'A': ['sum', 'mean', 'std'],
            'B': ['min', 'max', 'median']
        })
        pandas_agg_time = time.time() - start_time
        
        # Dask aggregation
        start_time = time.time()
        dask_agg = df_dask.groupby('C').agg({
            'A': ['sum', 'mean', 'std'],
            'B': ['min', 'max']
        }).compute()
        dask_agg_time = time.time() - start_time
        
        print(f"Pandas aggregation time: {pandas_agg_time:.2f} seconds")
        print(f"Dask aggregation time: {dask_agg_time:.2f} seconds")
        
        # 2. Complex calculation comparison
        print("\n2. Complex calculations:")
        
        # Pandas complex calculation
        start_time = time.time()
        pandas_complex = df_pandas.assign(
            A_squared=df_pandas['A'] ** 2,
            B_log=np.log(df_pandas['B'] + 1),
            A_B_ratio=df_pandas['A'] / (df_pandas['B'] + 0.001)
        )
        pandas_complex_time = time.time() - start_time
        
        # Dask complex calculation
        start_time = time.time()
        dask_complex = df_dask.assign(
            A_squared=df_dask['A'] ** 2,
            B_log=dd.log(df_dask['B'] + 1),
            A_B_ratio=df_dask['A'] / (df_dask['B'] + 0.001)
        ).compute()
        dask_complex_time = time.time() - start_time
        
        print(f"Pandas complex calculation time: {pandas_complex_time:.2f} seconds")
        print(f"Dask complex calculation time: {dask_complex_time:.2f} seconds")
        
        # 3. Memory usage comparison
        print("\n3. Memory usage analysis:")
        pandas_memory = df_pandas.memory_usage(deep=True).sum() / 1024**2
        print(f"Pandas memory usage: {pandas_memory:.2f} MB")
        print(f"Dask uses lazy evaluation for more efficient memory usage")
        
        return dask_agg
        
    except ImportError:
        print("Dask not installed, skipping Dask demonstration")
        print("Install command: pip install dask[complete]")
        return None

dask_result = dask_parallel_demo()

4. Big Data Processing Strategies

4.1 Memory Mapping and Lazy Loading

python
print("\n=== Big Data Processing Strategies ===")

def big_data_strategies():
    """Big data processing strategies"""
    
    # 1. Data sampling strategy
    print("1. Data sampling strategies:")
    
    # Create large dataset
    n = 1000000
    np.random.seed(42)
    
    large_df = pd.DataFrame({
        'id': range(n),
        'value1': np.random.randint(1, 1000, n),
        'value2': np.random.random(n),
        'category': np.random.choice(['A', 'B', 'C', 'D'], n),
        'date': pd.date_range('2020-01-01', periods=n, freq='1min')
    })
    
    print(f"Original data size: {len(large_df):,} rows")
    
    # Random sampling
    sample_random = large_df.sample(n=10000, random_state=42)
    print(f"Random sampling: {len(sample_random):,} rows")
    
    # Stratified sampling
    sample_stratified = large_df.groupby('category', group_keys=False).apply(
        lambda x: x.sample(min(len(x), 2500), random_state=42)
    )
    print(f"Stratified sampling: {len(sample_stratified):,} rows")
    
    # Systematic sampling
    sample_systematic = large_df.iloc[::100]  # Take every 100th row
    print(f"Systematic sampling: {len(sample_systematic):,} rows")
    
    # 2. Data partitioning strategy
    print("\n2. Data partitioning strategies:")
    
    def partition_data_by_date(df, partition_freq='M'):
        """Partition data by date"""
        df['partition'] = df['date'].dt.to_period(partition_freq)
        partitions = {}
        
        for partition_name, partition_data in df.groupby('partition'):
            partitions[str(partition_name)] = partition_data.drop('partition', axis=1)
        
        return partitions
    
    # Partition by month
    monthly_partitions = partition_data_by_date(large_df, 'M')
    print(f"Monthly partition count: {len(monthly_partitions)}")
    
    for partition_name, partition_df in list(monthly_partitions.items())[:3]:
        print(f"  {partition_name}: {len(partition_df):,} rows")
    
    # 3. Incremental processing strategy
    print("\n3. Incremental processing strategies:")
    
    def incremental_processing_demo():
        """Incremental processing demonstration"""
        
        # Simulate historical data
        historical_data = large_df.iloc[:800000].copy()
        
        # Simulate new data
        new_data = large_df.iloc[800000:].copy()
        
        print(f"Historical data: {len(historical_data):,} rows")
        print(f"New data: {len(new_data):,} rows")
        
        # Historical data aggregation result (assume already calculated)
        historical_agg = historical_data.groupby('category').agg({
            'value1': 'sum',
            'value2': 'mean'
        })
        
        print("Historical aggregation result:")
        print(historical_agg)
        
        # New data aggregation
        new_agg = new_data.groupby('category').agg({
            'value1': 'sum',
            'value2': 'mean'
        })
        
        # Incrementally update aggregation result
        def incremental_update(historical_agg, new_agg, historical_count, new_count):
            """Incrementally update aggregation result"""
            updated_agg = historical_agg.copy()
            
            for category in new_agg.index:
                if category in updated_agg.index:
                    # Update sum
                    updated_agg.loc[category, 'value1'] += new_agg.loc[category, 'value1']
                    
                    # Update mean (weighted average)
                    hist_count = historical_count.get(category, 0)
                    new_count_cat = new_count.get(category, 0)
                    total_count = hist_count + new_count_cat
                    
                    if total_count > 0:
                        updated_agg.loc[category, 'value2'] = (
                            (updated_agg.loc[category, 'value2'] * hist_count + 
                             new_agg.loc[category, 'value2'] * new_count_cat) / total_count
                        )
                else:
                    # New category
                    updated_agg.loc[category] = new_agg.loc[category]
            
            return updated_agg
        
        # Calculate record counts
        historical_count = historical_data['category'].value_counts().to_dict()
        new_count = new_data['category'].value_counts().to_dict()
        
        # Incremental update
        updated_result = incremental_update(historical_agg, new_agg, historical_count, new_count)
        
        # Verify result
        full_agg = large_df.groupby('category').agg({
            'value1': 'sum',
            'value2': 'mean'
        })
        
        print("\nIncremental update result:")
        print(updated_result)
        
        print("\nFull calculation result:")
        print(full_agg)
        
        print(f"\nResult consistency: {np.allclose(updated_result.values, full_agg.values)}")
    
    incremental_processing_demo()
    
    return sample_random, monthly_partitions

sample_data, partitions = big_data_strategies()

4.2 Performance Monitoring and Tuning

python
print("\n=== Performance Monitoring and Tuning ===")

def performance_monitoring():
    """Performance monitoring and tuning"""
    
    import cProfile
    import pstats
    from io import StringIO
    
    # Performance analysis decorator
    def profile_performance(func):
        """Performance analysis decorator"""
        def wrapper(*args, **kwargs):
            pr = cProfile.Profile()
            pr.enable()
            
            result = func(*args, **kwargs)
            
            pr.disable()
            s = StringIO()
            ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
            ps.print_stats(10)  # Show top 10 time-consuming functions
            
            print(f"\n{func.__name__} Performance Analysis:")
            print(s.getvalue())
            
            return result
        return wrapper
    
    # Create test data
    n = 100000
    np.random.seed(42)
    
    df = pd.DataFrame({
        'A': np.random.randint(1, 1000, n),
        'B': np.random.random(n),
        'C': np.random.choice(['X', 'Y', 'Z'], n),
        'D': pd.date_range('2020-01-01', periods=n, freq='1min')
    })
    
    # Define test functions
    @profile_performance
    def inefficient_operation(df):
        """Inefficient operation example"""
        result = []
        for i in range(len(df)):
            if df.iloc[i]['A'] > 500:
                result.append(df.iloc[i]['A'] * df.iloc[i]['B'])
            else:
                result.append(0)
        return result
    
    @profile_performance
    def efficient_operation(df):
        """Efficient operation example"""
        mask = df['A'] > 500
        result = np.where(mask, df['A'] * df['B'], 0)
        return result
    
    print("Performance comparison test:")
    
    # Test inefficient operation
    print("\n=== Inefficient Operation ===")
    start_time = time.time()
    result1 = inefficient_operation(df)
    inefficient_time = time.time() - start_time
    print(f"Inefficient operation time: {inefficient_time:.4f} seconds")
    
    # Test efficient operation
    print("\n=== Efficient Operation ===")
    start_time = time.time()
    result2 = efficient_operation(df)
    efficient_time = time.time() - start_time
    print(f"Efficient operation time: {efficient_time:.4f} seconds")
    
    print(f"\nPerformance improvement: {inefficient_time / efficient_time:.1f}x")
    
    # Memory usage monitoring
    def memory_usage_monitor(func, *args, **kwargs):
        """Memory usage monitoring"""
        import tracemalloc
        
        tracemalloc.start()
        
        # Execute function
        result = func(*args, **kwargs)
        
        # Get memory usage
        current, peak = tracemalloc.get_traced_memory()
        tracemalloc.stop()
        
        print(f"{func.__name__} Memory Usage:")
        print(f"  Current memory: {current / 1024**2:.2f} MB")
        print(f"  Peak memory: {peak / 1024**2:.2f} MB")
        
        return result
    
    # Memory usage test
    print("\n=== Memory Usage Monitoring ===")
    
    def memory_intensive_operation(df):
        """Memory intensive operation"""
        # Create multiple copies
        df_copies = [df.copy() for _ in range(5)]
        
        # Execute calculations
        results = []
        for df_copy in df_copies:
            df_copy['new_col'] = df_copy['A'] * df_copy['B']
            results.append(df_copy.groupby('C')['new_col'].sum())
        
        return pd.concat(results, axis=1)
    
    def memory_efficient_operation(df):
        """Memory efficient operation"""
        # Avoid creating multiple copies
        df_temp = df.copy()
        df_temp['new_col'] = df_temp['A'] * df_temp['B']
        
        results = []
        for i in range(5):
            result = df_temp.groupby('C')['new_col'].sum()
            results.append(result)
        
        return pd.concat(results, axis=1)
    
    memory_usage_monitor(memory_intensive_operation, df)
    memory_usage_monitor(memory_efficient_operation, df)
    
    return df

monitored_df = performance_monitoring()

Chapter Summary

This chapter comprehensively covered Pandas performance optimization techniques:

Core Content Review

  1. Memory Optimization: Data type optimization, chunk processing, memory monitoring
  2. Computational Optimization: Vectorization, index optimization, storage format selection
  3. Parallel Processing: Multiprocessing, Dask parallel computing
  4. Big Data Strategies: Sampling, partitioning, incremental processing
  5. Performance Monitoring: Profiling, memory monitoring, tuning techniques

Key Optimization Principles

  • Prioritize vectorized operations over loops
  • Choose appropriate data types to reduce memory usage
  • Build proper indexes to improve query performance
  • Use chunk processing for large files to avoid memory overflow
  • Use parallel computing to fully utilize multi-core CPUs

Performance Optimization Checklist

Memory Optimization

  • [ ] Use appropriate data types (int8, int16, category, etc.)
  • [ ] Use sparse arrays for sparse data
  • [ ] Read large files in chunks
  • [ ] Delete variables no longer needed

Computational Optimization

  • [ ] Use vectorized operations instead of apply
  • [ ] Build appropriate indexes
  • [ ] Choose efficient storage formats (Parquet, HDF5)
  • [ ] Avoid unnecessary data copying

Parallel Optimization

  • [ ] Use multiprocessing for CPU-intensive tasks
  • [ ] Consider using Dask for very large datasets
  • [ ] Set appropriate parallelism levels
  • [ ] Be aware of parallel overhead

Practical Recommendations

  1. Development Phase: Use small datasets for rapid prototyping
  2. Testing Phase: Use sampled data to verify algorithm correctness
  3. Production Phase: Apply all optimization techniques for full data
  4. Monitoring Phase: Continuously monitor performance metrics, tune as needed

Common Performance Pitfalls

  • Overusing apply functions
  • Frequent data type conversions
  • Unnecessary data copying
  • Lack of proper indexes
  • Ignoring memory usage monitoring

Mastering these performance optimization techniques helps you:

  • Process larger scale datasets
  • Significantly improve data processing speed
  • Optimize memory usage efficiency
  • Build scalable data processing systems

Practice Exercises

  1. Optimize memory usage of a DataFrame with 10 million rows
  2. Implement a high-performance data aggregation system
  3. Design a chunk processing workflow for large files
  4. Develop a parallel data processing framework
  5. Build a performance monitoring and tuning tool

In the next chapter, we will compile Pandas learning resources to guide your further learning.

Content is for learning and research only.