Skip to content

Pandas 数据排序与聚合

数据排序和聚合是数据分析中的核心操作。排序帮助我们理解数据的分布和模式,而聚合则让我们能够从大量数据中提取有意义的统计信息。本章将详细介绍 Pandas 中各种排序和聚合技术。

1. 数据排序基础

1.1 单列排序

python
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# 创建示例数据集
np.random.seed(42)
n_employees = 1000

# 生成员工数据
employee_data = {
    'employee_id': range(1, n_employees + 1),
    'name': [f'员工{i:04d}' for i in range(1, n_employees + 1)],
    'department': np.random.choice(['IT', 'HR', 'Finance', 'Marketing', 'Operations'], n_employees),
    'position': np.random.choice(['初级', '中级', '高级', '专家', '经理'], n_employees),
    'salary': np.random.normal(8000, 2000, n_employees).astype(int),
    'age': np.random.randint(22, 60, n_employees),
    'experience_years': np.random.randint(0, 20, n_employees),
    'performance_score': np.random.uniform(60, 100, n_employees),
    'join_date': pd.date_range('2015-01-01', periods=n_employees, freq='D')[:n_employees]
}

# 确保薪资为正数
employee_data['salary'] = np.maximum(employee_data['salary'], 3000)

df = pd.DataFrame(employee_data)

print("员工数据集:")
print(df.head())
print(f"\n数据形状: {df.shape}")

# 1. 基本排序 - sort_values()
print("\n=== 单列排序 ===")

# 按薪资升序排列
print("按薪资升序排列(前5名):")
salary_asc = df.sort_values('salary')
print(salary_asc[['name', 'department', 'salary']].head())

# 按薪资降序排列
print("\n按薪资降序排列(前5名):")
salary_desc = df.sort_values('salary', ascending=False)
print(salary_desc[['name', 'department', 'salary']].head())

# 按绩效分数排序
print("\n按绩效分数降序排列(前5名):")
performance_desc = df.sort_values('performance_score', ascending=False)
print(performance_desc[['name', 'department', 'performance_score']].head())

1.2 多列排序

python
print("\n=== 多列排序 ===")

# 按部门和薪资排序
print("按部门(升序)和薪资(降序)排序:")
multi_sort = df.sort_values(['department', 'salary'], ascending=[True, False])
print(multi_sort[['name', 'department', 'salary']].head(10))

# 复杂多列排序
print("\n按部门、职位、绩效分数排序:")
complex_sort = df.sort_values(
    ['department', 'position', 'performance_score'], 
    ascending=[True, True, False]
)
print(complex_sort[['name', 'department', 'position', 'performance_score']].head(10))

# 处理缺失值的排序
df_with_na = df.copy()
df_with_na.loc[np.random.choice(df.index, 50), 'salary'] = np.nan

print("\n处理缺失值的排序:")
# 缺失值排在最后
na_last = df_with_na.sort_values('salary', na_position='last')
print("缺失值排在最后:")
print(na_last[['name', 'salary']].tail())

# 缺失值排在最前
na_first = df_with_na.sort_values('salary', na_position='first')
print("\n缺失值排在最前:")
print(na_first[['name', 'salary']].head())

1.3 索引排序

python
print("\n=== 索引排序 ===")

# 设置多级索引
df_indexed = df.set_index(['department', 'position'])

# 按索引排序
print("按索引排序:")
index_sorted = df_indexed.sort_index()
print(index_sorted.head(10))

# 按特定级别排序
print("\n按第二级索引(职位)排序:")
level_sorted = df_indexed.sort_index(level=1)
print(level_sorted.head(10))

# 降序索引排序
print("\n按索引降序排序:")
index_desc = df_indexed.sort_index(ascending=False)
print(index_desc.head(10))

2. 高级排序技术

2.1 自定义排序

python
print("\n=== 自定义排序 ===")

# 定义职位等级
position_order = ['初级', '中级', '高级', '专家', '经理']

# 使用 Categorical 进行自定义排序
df['position_cat'] = pd.Categorical(df['position'], categories=position_order, ordered=True)

print("按职位等级排序:")
position_sorted = df.sort_values('position_cat')
print(position_sorted[['name', 'position', 'salary']].head(10))

# 使用 key 参数进行自定义排序(Pandas 1.1+)
def custom_sort_key(series):
    """自定义排序键函数"""
    position_map = {'初级': 1, '中级': 2, '高级': 3, '专家': 4, '经理': 5}
    return series.map(position_map)

print("\n使用自定义键函数排序:")
custom_sorted = df.sort_values('position', key=custom_sort_key)
print(custom_sorted[['name', 'position', 'salary']].head(10))

# 复杂自定义排序
def salary_performance_score(df_subset):
    """综合薪资和绩效的评分"""
    normalized_salary = (df_subset['salary'] - df_subset['salary'].min()) / (df_subset['salary'].max() - df_subset['salary'].min())
    normalized_performance = (df_subset['performance_score'] - df_subset['performance_score'].min()) / (df_subset['performance_score'].max() - df_subset['performance_score'].min())
    return normalized_salary * 0.6 + normalized_performance * 0.4

df['composite_score'] = salary_performance_score(df)
print("\n按综合评分排序(前10名):")
composite_sorted = df.sort_values('composite_score', ascending=False)
print(composite_sorted[['name', 'salary', 'performance_score', 'composite_score']].head(10))

2.2 分组内排序

python
print("\n=== 分组内排序 ===")

# 每个部门内按薪资排序
print("各部门内薪资排名:")
df['dept_salary_rank'] = df.groupby('department')['salary'].rank(ascending=False)
dept_ranking = df.sort_values(['department', 'dept_salary_rank'])
print(dept_ranking[['name', 'department', 'salary', 'dept_salary_rank']].head(15))

# 使用 nlargest 获取每个部门薪资最高的员工
print("\n各部门薪资最高的3名员工:")
top_earners = df.groupby('department').apply(
    lambda x: x.nlargest(3, 'salary')[['name', 'salary']]
).reset_index(level=1, drop=True)
print(top_earners)

# 各部门绩效最佳员工
print("\n各部门绩效最佳员工:")
top_performers = df.groupby('department').apply(
    lambda x: x.nlargest(1, 'performance_score')[['name', 'performance_score']]
).reset_index(level=1, drop=True)
print(top_performers)

3. 数据聚合基础

3.1 基本聚合函数

python
print("\n=== 基本聚合操作 ===")

# 基本统计聚合
print("整体统计信息:")
basic_stats = df[['salary', 'age', 'experience_years', 'performance_score']].agg([
    'count', 'mean', 'median', 'std', 'min', 'max'
])
print(basic_stats.round(2))

# 单列多种聚合
print("\n薪资统计:")
salary_stats = df['salary'].agg({
    '总数': 'count',
    '平均值': 'mean',
    '中位数': 'median',
    '标准差': 'std',
    '最小值': 'min',
    '最大值': 'max',
    '总和': 'sum'
})
print(salary_stats.round(2))

# 多列不同聚合
print("\n多列自定义聚合:")
custom_agg = df.agg({
    'salary': ['mean', 'median', 'std'],
    'age': ['mean', 'min', 'max'],
    'performance_score': ['mean', 'count'],
    'experience_years': 'mean'
})
print(custom_agg.round(2))

3.2 分组聚合 (GroupBy)

python
print("\n=== 分组聚合 ===")

# 按部门聚合
print("按部门聚合:")
dept_agg = df.groupby('department').agg({
    'salary': ['mean', 'median', 'count'],
    'age': 'mean',
    'performance_score': ['mean', 'std'],
    'experience_years': 'mean'
}).round(2)
print(dept_agg)

# 按职位聚合
print("\n按职位聚合:")
position_agg = df.groupby('position').agg({
    'salary': ['mean', 'count'],
    'performance_score': 'mean',
    'age': 'mean'
}).round(2)
print(position_agg)

# 多级分组聚合
print("\n按部门和职位聚合:")
multi_group_agg = df.groupby(['department', 'position']).agg({
    'salary': 'mean',
    'performance_score': 'mean',
    'employee_id': 'count'  # 计算人数
}).round(2)
multi_group_agg.columns = ['平均薪资', '平均绩效', '人数']
print(multi_group_agg.head(10))

3.3 时间序列聚合

python
print("\n=== 时间序列聚合 ===")

# 按入职年份聚合
df['join_year'] = df['join_date'].dt.year
print("按入职年份聚合:")
yearly_agg = df.groupby('join_year').agg({
    'employee_id': 'count',
    'salary': 'mean',
    'age': 'mean'
}).round(2)
yearly_agg.columns = ['入职人数', '平均薪资', '平均年龄']
print(yearly_agg)

# 按入职月份聚合
df['join_month'] = df['join_date'].dt.month
print("\n按入职月份聚合:")
monthly_agg = df.groupby('join_month').agg({
    'employee_id': 'count',
    'salary': 'mean'
}).round(2)
monthly_agg.columns = ['入职人数', '平均薪资']
print(monthly_agg)

# 使用 resample 进行时间重采样聚合
df_time_indexed = df.set_index('join_date')
print("\n按季度重采样聚合:")
quarterly_agg = df_time_indexed.resample('Q').agg({
    'employee_id': 'count',
    'salary': 'mean',
    'performance_score': 'mean'
}).round(2)
print(quarterly_agg.head())

4. 高级聚合技术

4.1 自定义聚合函数

python
print("\n=== 自定义聚合函数 ===")

# 定义自定义聚合函数
def salary_range(series):
    """计算薪资范围"""
    return series.max() - series.min()

def top_quartile_mean(series):
    """计算前25%的平均值"""
    return series.quantile(0.75)

def coefficient_of_variation(series):
    """计算变异系数"""
    return series.std() / series.mean() if series.mean() != 0 else 0

# 应用自定义聚合函数
print("使用自定义聚合函数:")
custom_agg_result = df.groupby('department')['salary'].agg([
    ('平均薪资', 'mean'),
    ('薪资范围', salary_range),
    ('前25%平均', top_quartile_mean),
    ('变异系数', coefficient_of_variation)
]).round(2)
print(custom_agg_result)

# 复杂自定义聚合
def department_summary(group):
    """部门综合摘要"""
    return pd.Series({
        '员工数量': len(group),
        '平均薪资': group['salary'].mean(),
        '薪资中位数': group['salary'].median(),
        '平均绩效': group['performance_score'].mean(),
        '高绩效员工比例': (group['performance_score'] > 85).mean(),
        '平均工作经验': group['experience_years'].mean(),
        '薪资标准差': group['salary'].std()
    })

print("\n部门综合摘要:")
dept_summary = df.groupby('department').apply(department_summary).round(2)
print(dept_summary)

4.2 条件聚合

python
print("\n=== 条件聚合 ===")

# 使用 where 进行条件聚合
print("高绩效员工(绩效>85)的薪资统计:")
high_performance_salary = df.groupby('department')['salary'].agg([
    ('高绩效员工数', lambda x: (df.loc[x.index, 'performance_score'] > 85).sum()),
    ('高绩效平均薪资', lambda x: x[df.loc[x.index, 'performance_score'] > 85].mean()),
    ('高绩效薪资中位数', lambda x: x[df.loc[x.index, 'performance_score'] > 85].median())
]).round(2)
print(high_performance_salary)

# 使用 query 结合聚合
print("\n工作经验5年以上员工的部门统计:")
experienced_stats = df.query('experience_years >= 5').groupby('department').agg({
    'salary': ['mean', 'count'],
    'performance_score': 'mean',
    'age': 'mean'
}).round(2)
print(experienced_stats)

# 分位数聚合
print("\n各部门薪资分位数:")
salary_quantiles = df.groupby('department')['salary'].quantile([0.25, 0.5, 0.75]).unstack()
salary_quantiles.columns = ['Q1', '中位数', 'Q3']
print(salary_quantiles.round(0))

4.3 滚动窗口聚合

python
print("\n=== 滚动窗口聚合 ===")

# 创建时间序列数据
daily_data = df.groupby('join_date').agg({
    'employee_id': 'count',
    'salary': 'mean'
}).reset_index()
daily_data.columns = ['date', 'new_hires', 'avg_salary']
daily_data = daily_data.set_index('date').sort_index()

print("每日入职数据(前10天):")
print(daily_data.head(10))

# 7天滚动平均
print("\n7天滚动平均:")
daily_data['new_hires_7d_avg'] = daily_data['new_hires'].rolling(window=7).mean()
daily_data['avg_salary_7d_avg'] = daily_data['avg_salary'].rolling(window=7).mean()
print(daily_data[['new_hires', 'new_hires_7d_avg', 'avg_salary', 'avg_salary_7d_avg']].head(10).round(2))

# 30天滚动统计
print("\n30天滚动统计:")
rolling_stats = daily_data['new_hires'].rolling(window=30).agg([
    'mean', 'std', 'min', 'max', 'sum'
]).round(2)
print(rolling_stats.head(35))

# 扩展窗口聚合
print("\n累积统计:")
daily_data['cumulative_hires'] = daily_data['new_hires'].expanding().sum()
daily_data['cumulative_avg_salary'] = daily_data['avg_salary'].expanding().mean()
print(daily_data[['new_hires', 'cumulative_hires', 'avg_salary', 'cumulative_avg_salary']].head(10).round(2))

5. 透视表和交叉表

5.1 透视表 (Pivot Table)

python
print("\n=== 透视表 ===")

# 基本透视表
print("部门-职位薪资透视表:")
pivot_salary = pd.pivot_table(
    df, 
    values='salary', 
    index='department', 
    columns='position', 
    aggfunc='mean'
).round(0)
print(pivot_salary)

# 多值透视表
print("\n多指标透视表:")
multi_pivot = pd.pivot_table(
    df,
    values=['salary', 'performance_score'],
    index='department',
    columns='position',
    aggfunc={'salary': 'mean', 'performance_score': 'mean'}
).round(2)
print(multi_pivot)

# 带边际统计的透视表
print("\n带总计的透视表:")
pivot_with_margins = pd.pivot_table(
    df,
    values='salary',
    index='department',
    columns='position',
    aggfunc='mean',
    margins=True,
    margins_name='总计'
).round(0)
print(pivot_with_margins)

# 多级索引透视表
df['age_group'] = pd.cut(df['age'], bins=[20, 30, 40, 50, 60], labels=['20-30', '31-40', '41-50', '51-60'])
print("\n多级索引透视表:")
multi_index_pivot = pd.pivot_table(
    df,
    values='salary',
    index=['department', 'age_group'],
    columns='position',
    aggfunc='mean'
).round(0)
print(multi_index_pivot.head(10))

5.2 交叉表 (Cross Tabulation)

python
print("\n=== 交叉表 ===")

# 基本交叉表
print("部门-职位人数交叉表:")
crosstab_basic = pd.crosstab(df['department'], df['position'])
print(crosstab_basic)

# 带比例的交叉表
print("\n部门-职位比例交叉表:")
crosstab_prop = pd.crosstab(df['department'], df['position'], normalize='index').round(3)
print(crosstab_prop)

# 带边际统计的交叉表
print("\n带总计的交叉表:")
crosstab_margins = pd.crosstab(
    df['department'], 
    df['position'], 
    margins=True, 
    margins_name='总计'
)
print(crosstab_margins)

# 多维交叉表
print("\n多维交叉表(部门-职位-年龄组):")
multi_crosstab = pd.crosstab(
    [df['department'], df['age_group']], 
    df['position']
)
print(multi_crosstab.head(10))

# 带值的交叉表
print("\n带平均薪资的交叉表:")
crosstab_values = pd.crosstab(
    df['department'], 
    df['position'], 
    values=df['salary'], 
    aggfunc='mean'
).round(0)
print(crosstab_values)

6. 分组变换和过滤

6.1 分组变换 (Transform)

python
print("\n=== 分组变换 ===")

# 计算部门内的相对薪资
df['dept_avg_salary'] = df.groupby('department')['salary'].transform('mean')
df['salary_vs_dept_avg'] = df['salary'] - df['dept_avg_salary']
df['salary_pct_of_dept_avg'] = (df['salary'] / df['dept_avg_salary'] * 100).round(1)

print("员工薪资相对于部门平均值:")
print(df[['name', 'department', 'salary', 'dept_avg_salary', 'salary_vs_dept_avg', 'salary_pct_of_dept_avg']].head(10))

# 计算部门内排名
df['dept_salary_rank'] = df.groupby('department')['salary'].transform(
    lambda x: x.rank(ascending=False)
)
df['dept_performance_rank'] = df.groupby('department')['performance_score'].transform(
    lambda x: x.rank(ascending=False)
)

print("\n部门内排名:")
print(df[['name', 'department', 'salary', 'dept_salary_rank', 'performance_score', 'dept_performance_rank']].head(10))

# 标准化分数
df['salary_zscore'] = df.groupby('department')['salary'].transform(
    lambda x: (x - x.mean()) / x.std()
)
df['performance_zscore'] = df.groupby('department')['performance_score'].transform(
    lambda x: (x - x.mean()) / x.std()
)

print("\n部门内标准化分数:")
print(df[['name', 'department', 'salary_zscore', 'performance_zscore']].head(10).round(3))

6.2 分组过滤 (Filter)

python
print("\n=== 分组过滤 ===")

# 过滤大部门(员工数>150)
large_departments = df.groupby('department').filter(lambda x: len(x) > 150)
print(f"大部门员工数: {len(large_departments)}")
print("大部门分布:")
print(large_departments['department'].value_counts())

# 过滤高薪部门(平均薪资>8000)
high_salary_depts = df.groupby('department').filter(
    lambda x: x['salary'].mean() > 8000
)
print(f"\n高薪部门员工数: {len(high_salary_depts)}")
print("高薪部门分布:")
print(high_salary_depts['department'].value_counts())

# 过滤高绩效部门(平均绩效>80)
high_performance_depts = df.groupby('department').filter(
    lambda x: x['performance_score'].mean() > 80
)
print(f"\n高绩效部门员工数: {len(high_performance_depts)}")
print("高绩效部门分布:")
print(high_performance_depts['department'].value_counts())

# 复合条件过滤
complex_filter = df.groupby('department').filter(
    lambda x: (len(x) > 100) and (x['salary'].mean() > 7500) and (x['performance_score'].mean() > 75)
)
print(f"\n符合复合条件的部门员工数: {len(complex_filter)}")
if len(complex_filter) > 0:
    print("符合条件的部门:")
    print(complex_filter['department'].value_counts())

7. 聚合结果的可视化

7.1 聚合数据可视化

python
print("\n=== 聚合结果可视化 ===")

# 部门薪资统计可视化
dept_salary_stats = df.groupby('department')['salary'].agg(['mean', 'median', 'std']).round(0)

fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 平均薪资柱状图
dept_salary_stats['mean'].plot(kind='bar', ax=axes[0,0], color='skyblue')
axes[0,0].set_title('各部门平均薪资')
axes[0,0].set_ylabel('薪资')
axes[0,0].tick_params(axis='x', rotation=45)

# 薪资分布箱线图
df.boxplot(column='salary', by='department', ax=axes[0,1])
axes[0,1].set_title('各部门薪资分布')
axes[0,1].set_xlabel('部门')
axes[0,1].set_ylabel('薪资')

# 职位分布堆叠柱状图
position_dist = pd.crosstab(df['department'], df['position'])
position_dist.plot(kind='bar', stacked=True, ax=axes[1,0])
axes[1,0].set_title('各部门职位分布')
axes[1,0].set_ylabel('人数')
axes[1,0].tick_params(axis='x', rotation=45)
axes[1,0].legend(title='职位', bbox_to_anchor=(1.05, 1), loc='upper left')

# 绩效vs薪资散点图
for dept in df['department'].unique():
    dept_data = df[df['department'] == dept]
    axes[1,1].scatter(dept_data['performance_score'], dept_data['salary'], 
                     label=dept, alpha=0.6)
axes[1,1].set_xlabel('绩效分数')
axes[1,1].set_ylabel('薪资')
axes[1,1].set_title('绩效与薪资关系')
axes[1,1].legend()

plt.tight_layout()
plt.show()

7.2 时间序列聚合可视化

python
# 入职趋势可视化
monthly_hires = df.groupby(df['join_date'].dt.to_period('M')).size()

fig, axes = plt.subplots(2, 1, figsize=(12, 8))

# 月度入职人数
monthly_hires.plot(ax=axes[0], marker='o')
axes[0].set_title('月度入职人数趋势')
axes[0].set_ylabel('入职人数')
axes[0].grid(True, alpha=0.3)

# 累积入职人数
monthly_hires.cumsum().plot(ax=axes[1], marker='s')
axes[1].set_title('累积入职人数')
axes[1].set_ylabel('累积人数')
axes[1].set_xlabel('时间')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

8. 性能优化技巧

8.1 高效聚合方法

python
print("\n=== 性能优化技巧 ===")

import time

# 创建大数据集进行性能测试
large_df = pd.concat([df] * 10, ignore_index=True)
print(f"大数据集大小: {large_df.shape}")

# 方法1: 使用 agg
start_time = time.time()
result1 = large_df.groupby('department').agg({
    'salary': ['mean', 'std'],
    'performance_score': 'mean'
})
time1 = time.time() - start_time

# 方法2: 分别计算
start_time = time.time()
result2_mean = large_df.groupby('department')['salary'].mean()
result2_std = large_df.groupby('department')['salary'].std()
result2_perf = large_df.groupby('department')['performance_score'].mean()
time2 = time.time() - start_time

print(f"使用 agg 方法耗时: {time1:.4f} 秒")
print(f"分别计算耗时: {time2:.4f} 秒")

# 内存使用优化
print("\n内存使用优化:")
print(f"原始数据内存使用: {large_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# 优化数据类型
optimized_df = large_df.copy()
optimized_df['department'] = optimized_df['department'].astype('category')
optimized_df['position'] = optimized_df['position'].astype('category')
optimized_df['salary'] = optimized_df['salary'].astype('int32')
optimized_df['age'] = optimized_df['age'].astype('int8')

print(f"优化后内存使用: {optimized_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"内存节省: {(1 - optimized_df.memory_usage(deep=True).sum() / large_df.memory_usage(deep=True).sum()) * 100:.1f}%")

8.2 并行处理

python
# 使用 numba 加速自定义聚合函数(需要安装 numba)
try:
    from numba import jit
    
    @jit
    def fast_coefficient_of_variation(arr):
        """快速计算变异系数"""
        mean_val = arr.mean()
        if mean_val == 0:
            return 0
        return arr.std() / mean_val
    
    print("\n使用 Numba 加速的聚合:")
    start_time = time.time()
    fast_cv = large_df.groupby('department')['salary'].apply(
        lambda x: fast_coefficient_of_variation(x.values)
    )
    fast_time = time.time() - start_time
    print(f"Numba 加速耗时: {fast_time:.4f} 秒")
    
except ImportError:
    print("Numba 未安装,跳过加速演示")

# 分块处理大数据
def chunked_aggregation(df, chunk_size=1000):
    """分块聚合处理"""
    results = []
    for i in range(0, len(df), chunk_size):
        chunk = df.iloc[i:i+chunk_size]
        chunk_result = chunk.groupby('department')['salary'].mean()
        results.append(chunk_result)
    
    # 合并结果
    combined = pd.concat(results, axis=1)
    return combined.mean(axis=1)

print("\n分块处理结果:")
chunked_result = chunked_aggregation(large_df)
print(chunked_result.head())

9. 实际应用案例

9.1 销售数据分析

python
print("\n=== 销售数据分析案例 ===")

# 创建销售数据
np.random.seed(42)
sales_data = {
    'date': pd.date_range('2023-01-01', periods=365, freq='D'),
    'product': np.random.choice(['产品A', '产品B', '产品C', '产品D'], 365),
    'region': np.random.choice(['北区', '南区', '东区', '西区'], 365),
    'salesperson': np.random.choice([f'销售员{i}' for i in range(1, 21)], 365),
    'quantity': np.random.randint(1, 100, 365),
    'unit_price': np.random.uniform(50, 500, 365),
}

sales_df = pd.DataFrame(sales_data)
sales_df['revenue'] = sales_df['quantity'] * sales_df['unit_price']
sales_df['month'] = sales_df['date'].dt.month
sales_df['quarter'] = sales_df['date'].dt.quarter

print("销售数据样本:")
print(sales_df.head())

# 1. 月度销售分析
print("\n月度销售分析:")
monthly_sales = sales_df.groupby('month').agg({
    'revenue': ['sum', 'mean', 'count'],
    'quantity': 'sum'
}).round(2)
monthly_sales.columns = ['总收入', '平均收入', '交易次数', '总销量']
print(monthly_sales)

# 2. 产品-区域分析
print("\n产品-区域销售矩阵:")
product_region_matrix = pd.pivot_table(
    sales_df,
    values='revenue',
    index='product',
    columns='region',
    aggfunc='sum'
).round(0)
print(product_region_matrix)

# 3. 销售员绩效排名
print("\n销售员绩效排名(前10名):")
salesperson_performance = sales_df.groupby('salesperson').agg({
    'revenue': 'sum',
    'quantity': 'sum',
    'date': 'count'
}).round(2)
salesperson_performance.columns = ['总收入', '总销量', '交易次数']
salesperson_performance['平均单笔收入'] = (salesperson_performance['总收入'] / salesperson_performance['交易次数']).round(2)
top_performers = salesperson_performance.sort_values('总收入', ascending=False)
print(top_performers.head(10))

9.2 客户行为分析

python
print("\n=== 客户行为分析案例 ===")

# 创建客户数据
customer_data = {
    'customer_id': np.repeat(range(1, 201), np.random.randint(1, 10, 200)),
    'purchase_date': [],
    'amount': [],
    'category': []
}

# 生成购买记录
for cid in range(1, 201):
    n_purchases = np.random.randint(1, 10)
    start_date = pd.Timestamp('2023-01-01')
    dates = pd.date_range(start_date, periods=n_purchases, freq=f'{np.random.randint(1, 30)}D')
    customer_data['purchase_date'].extend(dates)
    customer_data['amount'].extend(np.random.uniform(20, 500, n_purchases))
    customer_data['category'].extend(np.random.choice(['电子产品', '服装', '食品', '家居'], n_purchases))

customer_df = pd.DataFrame({
    'customer_id': customer_data['customer_id'],
    'purchase_date': customer_data['purchase_date'],
    'amount': customer_data['amount'],
    'category': customer_data['category']
})

print(f"客户数据记录数: {len(customer_df)}")
print("客户数据样本:")
print(customer_df.head())

# 1. 客户价值分析
print("\n客户价值分析:")
customer_value = customer_df.groupby('customer_id').agg({
    'amount': ['sum', 'mean', 'count'],
    'purchase_date': ['min', 'max']
})
customer_value.columns = ['总消费', '平均消费', '购买次数', '首次购买', '最近购买']
customer_value['购买周期'] = (customer_value['最近购买'] - customer_value['首次购买']).dt.days

# 客户分层
customer_value['客户等级'] = pd.cut(
    customer_value['总消费'],
    bins=[0, 500, 1500, 3000, float('inf')],
    labels=['铜牌', '银牌', '金牌', '钻石']
)

print("客户等级分布:")
print(customer_value['客户等级'].value_counts())

print("\n各等级客户统计:")
grade_stats = customer_value.groupby('客户等级').agg({
    '总消费': ['mean', 'count'],
    '购买次数': 'mean',
    '购买周期': 'mean'
}).round(2)
print(grade_stats)

# 2. 品类偏好分析
print("\n客户品类偏好分析:")
category_preference = pd.crosstab(
    customer_df['customer_id'], 
    customer_df['category'], 
    values=customer_df['amount'], 
    aggfunc='sum'
).fillna(0)

# 找出每个客户的主要品类
customer_main_category = category_preference.idxmax(axis=1)
print("客户主要品类分布:")
print(customer_main_category.value_counts())

10. 最佳实践和注意事项

10.1 排序和聚合最佳实践

python
print("\n=== 最佳实践总结 ===")

def sorting_aggregation_best_practices():
    """
    排序和聚合最佳实践指南
    """
    practices = {
        "排序最佳实践": [
            "1. 使用 sort_values() 而不是 sort()",
            "2. 对于大数据集,考虑使用 nlargest() 或 nsmallest()",
            "3. 合理使用 na_position 参数处理缺失值",
            "4. 使用 Categorical 类型进行自定义排序",
            "5. 避免在循环中重复排序"
        ],
        "聚合最佳实践": [
            "1. 使用 agg() 进行多种聚合操作",
            "2. 优先使用内置聚合函数而非自定义函数",
            "3. 合理使用 transform() 保持原数据结构",
            "4. 使用 filter() 进行条件筛选",
            "5. 考虑内存使用,优化数据类型"
        ],
        "性能优化建议": [
            "1. 在聚合前先过滤数据",
            "2. 使用分类数据类型减少内存使用",
            "3. 避免在 groupby 中使用复杂的 lambda 函数",
            "4. 考虑使用 numba 加速自定义函数",
            "5. 对于超大数据集,考虑分块处理"
        ],
        "常见陷阱": [
            "1. 注意聚合后索引的变化",
            "2. 小心处理缺失值对聚合结果的影响",
            "3. 避免在聚合中意外包含非数值列",
            "4. 注意时间序列聚合的时区问题",
            "5. 验证聚合结果的业务逻辑合理性"
        ]
    }
    
    for category, items in practices.items():
        print(f"\n{category}:")
        for item in items:
            print(f"  {item}")

sorting_aggregation_best_practices()

# 性能对比示例
print("\n=== 性能对比示例 ===")

# 低效方式
def inefficient_aggregation(df):
    results = []
    for dept in df['department'].unique():
        dept_data = df[df['department'] == dept]
        avg_salary = dept_data['salary'].mean()
        results.append({'department': dept, 'avg_salary': avg_salary})
    return pd.DataFrame(results)

# 高效方式
def efficient_aggregation(df):
    return df.groupby('department')['salary'].mean().reset_index()

# 性能测试
start_time = time.time()
inefficient_result = inefficient_aggregation(df)
inefficient_time = time.time() - start_time

start_time = time.time()
efficient_result = efficient_aggregation(df)
efficient_time = time.time() - start_time

print(f"低效方式耗时: {inefficient_time:.4f} 秒")
print(f"高效方式耗时: {efficient_time:.4f} 秒")
print(f"性能提升: {inefficient_time / efficient_time:.1f}x")

本章小结

本章全面介绍了 Pandas 中的数据排序和聚合技术:

排序技术

  1. 基本排序sort_values(), sort_index() 的使用
  2. 高级排序:自定义排序、分组内排序、多级排序
  3. 排序优化:处理缺失值、使用分类数据类型

聚合技术

  1. 基本聚合agg(), groupby() 的各种用法
  2. 高级聚合:自定义聚合函数、条件聚合、时间序列聚合
  3. 分组操作transform(), filter(), apply() 的区别和应用

数据透视

  1. 透视表pivot_table() 的多维分析
  2. 交叉表crosstab() 的统计分析
  3. 数据重塑:长宽表转换和多级索引处理

性能优化

  1. 内存优化:数据类型优化、分块处理
  2. 计算优化:向量化操作、并行处理
  3. 最佳实践:避免常见陷阱、提高代码效率

掌握这些排序和聚合技术,能够帮助你:

  • 快速理解数据的分布和模式
  • 进行多维度的数据分析
  • 生成各种统计报告和业务洞察
  • 为数据可视化和建模做好准备

练习题

  1. 创建一个复杂的多级分组聚合分析
  2. 实现一个自定义的滚动窗口聚合函数
  3. 设计一个客户细分分析系统
  4. 优化一个大数据集的聚合性能
  5. 构建一个动态的数据透视表分析工具

下一章我们将学习 Pandas 的数据可视化功能,探索如何将数据分析结果转化为直观的图表。