Pandas 数据排序与聚合
数据排序和聚合是数据分析中的核心操作。排序帮助我们理解数据的分布和模式,而聚合则让我们能够从大量数据中提取有意义的统计信息。本章将详细介绍 Pandas 中各种排序和聚合技术。
1. 数据排序基础
1.1 单列排序
python
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 创建示例数据集
np.random.seed(42)
n_employees = 1000
# 生成员工数据
employee_data = {
'employee_id': range(1, n_employees + 1),
'name': [f'员工{i:04d}' for i in range(1, n_employees + 1)],
'department': np.random.choice(['IT', 'HR', 'Finance', 'Marketing', 'Operations'], n_employees),
'position': np.random.choice(['初级', '中级', '高级', '专家', '经理'], n_employees),
'salary': np.random.normal(8000, 2000, n_employees).astype(int),
'age': np.random.randint(22, 60, n_employees),
'experience_years': np.random.randint(0, 20, n_employees),
'performance_score': np.random.uniform(60, 100, n_employees),
'join_date': pd.date_range('2015-01-01', periods=n_employees, freq='D')[:n_employees]
}
# 确保薪资为正数
employee_data['salary'] = np.maximum(employee_data['salary'], 3000)
df = pd.DataFrame(employee_data)
print("员工数据集:")
print(df.head())
print(f"\n数据形状: {df.shape}")
# 1. 基本排序 - sort_values()
print("\n=== 单列排序 ===")
# 按薪资升序排列
print("按薪资升序排列(前5名):")
salary_asc = df.sort_values('salary')
print(salary_asc[['name', 'department', 'salary']].head())
# 按薪资降序排列
print("\n按薪资降序排列(前5名):")
salary_desc = df.sort_values('salary', ascending=False)
print(salary_desc[['name', 'department', 'salary']].head())
# 按绩效分数排序
print("\n按绩效分数降序排列(前5名):")
performance_desc = df.sort_values('performance_score', ascending=False)
print(performance_desc[['name', 'department', 'performance_score']].head())1.2 多列排序
python
print("\n=== 多列排序 ===")
# 按部门和薪资排序
print("按部门(升序)和薪资(降序)排序:")
multi_sort = df.sort_values(['department', 'salary'], ascending=[True, False])
print(multi_sort[['name', 'department', 'salary']].head(10))
# 复杂多列排序
print("\n按部门、职位、绩效分数排序:")
complex_sort = df.sort_values(
['department', 'position', 'performance_score'],
ascending=[True, True, False]
)
print(complex_sort[['name', 'department', 'position', 'performance_score']].head(10))
# 处理缺失值的排序
df_with_na = df.copy()
df_with_na.loc[np.random.choice(df.index, 50), 'salary'] = np.nan
print("\n处理缺失值的排序:")
# 缺失值排在最后
na_last = df_with_na.sort_values('salary', na_position='last')
print("缺失值排在最后:")
print(na_last[['name', 'salary']].tail())
# 缺失值排在最前
na_first = df_with_na.sort_values('salary', na_position='first')
print("\n缺失值排在最前:")
print(na_first[['name', 'salary']].head())1.3 索引排序
python
print("\n=== 索引排序 ===")
# 设置多级索引
df_indexed = df.set_index(['department', 'position'])
# 按索引排序
print("按索引排序:")
index_sorted = df_indexed.sort_index()
print(index_sorted.head(10))
# 按特定级别排序
print("\n按第二级索引(职位)排序:")
level_sorted = df_indexed.sort_index(level=1)
print(level_sorted.head(10))
# 降序索引排序
print("\n按索引降序排序:")
index_desc = df_indexed.sort_index(ascending=False)
print(index_desc.head(10))2. 高级排序技术
2.1 自定义排序
python
print("\n=== 自定义排序 ===")
# 定义职位等级
position_order = ['初级', '中级', '高级', '专家', '经理']
# 使用 Categorical 进行自定义排序
df['position_cat'] = pd.Categorical(df['position'], categories=position_order, ordered=True)
print("按职位等级排序:")
position_sorted = df.sort_values('position_cat')
print(position_sorted[['name', 'position', 'salary']].head(10))
# 使用 key 参数进行自定义排序(Pandas 1.1+)
def custom_sort_key(series):
"""自定义排序键函数"""
position_map = {'初级': 1, '中级': 2, '高级': 3, '专家': 4, '经理': 5}
return series.map(position_map)
print("\n使用自定义键函数排序:")
custom_sorted = df.sort_values('position', key=custom_sort_key)
print(custom_sorted[['name', 'position', 'salary']].head(10))
# 复杂自定义排序
def salary_performance_score(df_subset):
"""综合薪资和绩效的评分"""
normalized_salary = (df_subset['salary'] - df_subset['salary'].min()) / (df_subset['salary'].max() - df_subset['salary'].min())
normalized_performance = (df_subset['performance_score'] - df_subset['performance_score'].min()) / (df_subset['performance_score'].max() - df_subset['performance_score'].min())
return normalized_salary * 0.6 + normalized_performance * 0.4
df['composite_score'] = salary_performance_score(df)
print("\n按综合评分排序(前10名):")
composite_sorted = df.sort_values('composite_score', ascending=False)
print(composite_sorted[['name', 'salary', 'performance_score', 'composite_score']].head(10))2.2 分组内排序
python
print("\n=== 分组内排序 ===")
# 每个部门内按薪资排序
print("各部门内薪资排名:")
df['dept_salary_rank'] = df.groupby('department')['salary'].rank(ascending=False)
dept_ranking = df.sort_values(['department', 'dept_salary_rank'])
print(dept_ranking[['name', 'department', 'salary', 'dept_salary_rank']].head(15))
# 使用 nlargest 获取每个部门薪资最高的员工
print("\n各部门薪资最高的3名员工:")
top_earners = df.groupby('department').apply(
lambda x: x.nlargest(3, 'salary')[['name', 'salary']]
).reset_index(level=1, drop=True)
print(top_earners)
# 各部门绩效最佳员工
print("\n各部门绩效最佳员工:")
top_performers = df.groupby('department').apply(
lambda x: x.nlargest(1, 'performance_score')[['name', 'performance_score']]
).reset_index(level=1, drop=True)
print(top_performers)3. 数据聚合基础
3.1 基本聚合函数
python
print("\n=== 基本聚合操作 ===")
# 基本统计聚合
print("整体统计信息:")
basic_stats = df[['salary', 'age', 'experience_years', 'performance_score']].agg([
'count', 'mean', 'median', 'std', 'min', 'max'
])
print(basic_stats.round(2))
# 单列多种聚合
print("\n薪资统计:")
salary_stats = df['salary'].agg({
'总数': 'count',
'平均值': 'mean',
'中位数': 'median',
'标准差': 'std',
'最小值': 'min',
'最大值': 'max',
'总和': 'sum'
})
print(salary_stats.round(2))
# 多列不同聚合
print("\n多列自定义聚合:")
custom_agg = df.agg({
'salary': ['mean', 'median', 'std'],
'age': ['mean', 'min', 'max'],
'performance_score': ['mean', 'count'],
'experience_years': 'mean'
})
print(custom_agg.round(2))3.2 分组聚合 (GroupBy)
python
print("\n=== 分组聚合 ===")
# 按部门聚合
print("按部门聚合:")
dept_agg = df.groupby('department').agg({
'salary': ['mean', 'median', 'count'],
'age': 'mean',
'performance_score': ['mean', 'std'],
'experience_years': 'mean'
}).round(2)
print(dept_agg)
# 按职位聚合
print("\n按职位聚合:")
position_agg = df.groupby('position').agg({
'salary': ['mean', 'count'],
'performance_score': 'mean',
'age': 'mean'
}).round(2)
print(position_agg)
# 多级分组聚合
print("\n按部门和职位聚合:")
multi_group_agg = df.groupby(['department', 'position']).agg({
'salary': 'mean',
'performance_score': 'mean',
'employee_id': 'count' # 计算人数
}).round(2)
multi_group_agg.columns = ['平均薪资', '平均绩效', '人数']
print(multi_group_agg.head(10))3.3 时间序列聚合
python
print("\n=== 时间序列聚合 ===")
# 按入职年份聚合
df['join_year'] = df['join_date'].dt.year
print("按入职年份聚合:")
yearly_agg = df.groupby('join_year').agg({
'employee_id': 'count',
'salary': 'mean',
'age': 'mean'
}).round(2)
yearly_agg.columns = ['入职人数', '平均薪资', '平均年龄']
print(yearly_agg)
# 按入职月份聚合
df['join_month'] = df['join_date'].dt.month
print("\n按入职月份聚合:")
monthly_agg = df.groupby('join_month').agg({
'employee_id': 'count',
'salary': 'mean'
}).round(2)
monthly_agg.columns = ['入职人数', '平均薪资']
print(monthly_agg)
# 使用 resample 进行时间重采样聚合
df_time_indexed = df.set_index('join_date')
print("\n按季度重采样聚合:")
quarterly_agg = df_time_indexed.resample('Q').agg({
'employee_id': 'count',
'salary': 'mean',
'performance_score': 'mean'
}).round(2)
print(quarterly_agg.head())4. 高级聚合技术
4.1 自定义聚合函数
python
print("\n=== 自定义聚合函数 ===")
# 定义自定义聚合函数
def salary_range(series):
"""计算薪资范围"""
return series.max() - series.min()
def top_quartile_mean(series):
"""计算前25%的平均值"""
return series.quantile(0.75)
def coefficient_of_variation(series):
"""计算变异系数"""
return series.std() / series.mean() if series.mean() != 0 else 0
# 应用自定义聚合函数
print("使用自定义聚合函数:")
custom_agg_result = df.groupby('department')['salary'].agg([
('平均薪资', 'mean'),
('薪资范围', salary_range),
('前25%平均', top_quartile_mean),
('变异系数', coefficient_of_variation)
]).round(2)
print(custom_agg_result)
# 复杂自定义聚合
def department_summary(group):
"""部门综合摘要"""
return pd.Series({
'员工数量': len(group),
'平均薪资': group['salary'].mean(),
'薪资中位数': group['salary'].median(),
'平均绩效': group['performance_score'].mean(),
'高绩效员工比例': (group['performance_score'] > 85).mean(),
'平均工作经验': group['experience_years'].mean(),
'薪资标准差': group['salary'].std()
})
print("\n部门综合摘要:")
dept_summary = df.groupby('department').apply(department_summary).round(2)
print(dept_summary)4.2 条件聚合
python
print("\n=== 条件聚合 ===")
# 使用 where 进行条件聚合
print("高绩效员工(绩效>85)的薪资统计:")
high_performance_salary = df.groupby('department')['salary'].agg([
('高绩效员工数', lambda x: (df.loc[x.index, 'performance_score'] > 85).sum()),
('高绩效平均薪资', lambda x: x[df.loc[x.index, 'performance_score'] > 85].mean()),
('高绩效薪资中位数', lambda x: x[df.loc[x.index, 'performance_score'] > 85].median())
]).round(2)
print(high_performance_salary)
# 使用 query 结合聚合
print("\n工作经验5年以上员工的部门统计:")
experienced_stats = df.query('experience_years >= 5').groupby('department').agg({
'salary': ['mean', 'count'],
'performance_score': 'mean',
'age': 'mean'
}).round(2)
print(experienced_stats)
# 分位数聚合
print("\n各部门薪资分位数:")
salary_quantiles = df.groupby('department')['salary'].quantile([0.25, 0.5, 0.75]).unstack()
salary_quantiles.columns = ['Q1', '中位数', 'Q3']
print(salary_quantiles.round(0))4.3 滚动窗口聚合
python
print("\n=== 滚动窗口聚合 ===")
# 创建时间序列数据
daily_data = df.groupby('join_date').agg({
'employee_id': 'count',
'salary': 'mean'
}).reset_index()
daily_data.columns = ['date', 'new_hires', 'avg_salary']
daily_data = daily_data.set_index('date').sort_index()
print("每日入职数据(前10天):")
print(daily_data.head(10))
# 7天滚动平均
print("\n7天滚动平均:")
daily_data['new_hires_7d_avg'] = daily_data['new_hires'].rolling(window=7).mean()
daily_data['avg_salary_7d_avg'] = daily_data['avg_salary'].rolling(window=7).mean()
print(daily_data[['new_hires', 'new_hires_7d_avg', 'avg_salary', 'avg_salary_7d_avg']].head(10).round(2))
# 30天滚动统计
print("\n30天滚动统计:")
rolling_stats = daily_data['new_hires'].rolling(window=30).agg([
'mean', 'std', 'min', 'max', 'sum'
]).round(2)
print(rolling_stats.head(35))
# 扩展窗口聚合
print("\n累积统计:")
daily_data['cumulative_hires'] = daily_data['new_hires'].expanding().sum()
daily_data['cumulative_avg_salary'] = daily_data['avg_salary'].expanding().mean()
print(daily_data[['new_hires', 'cumulative_hires', 'avg_salary', 'cumulative_avg_salary']].head(10).round(2))5. 透视表和交叉表
5.1 透视表 (Pivot Table)
python
print("\n=== 透视表 ===")
# 基本透视表
print("部门-职位薪资透视表:")
pivot_salary = pd.pivot_table(
df,
values='salary',
index='department',
columns='position',
aggfunc='mean'
).round(0)
print(pivot_salary)
# 多值透视表
print("\n多指标透视表:")
multi_pivot = pd.pivot_table(
df,
values=['salary', 'performance_score'],
index='department',
columns='position',
aggfunc={'salary': 'mean', 'performance_score': 'mean'}
).round(2)
print(multi_pivot)
# 带边际统计的透视表
print("\n带总计的透视表:")
pivot_with_margins = pd.pivot_table(
df,
values='salary',
index='department',
columns='position',
aggfunc='mean',
margins=True,
margins_name='总计'
).round(0)
print(pivot_with_margins)
# 多级索引透视表
df['age_group'] = pd.cut(df['age'], bins=[20, 30, 40, 50, 60], labels=['20-30', '31-40', '41-50', '51-60'])
print("\n多级索引透视表:")
multi_index_pivot = pd.pivot_table(
df,
values='salary',
index=['department', 'age_group'],
columns='position',
aggfunc='mean'
).round(0)
print(multi_index_pivot.head(10))5.2 交叉表 (Cross Tabulation)
python
print("\n=== 交叉表 ===")
# 基本交叉表
print("部门-职位人数交叉表:")
crosstab_basic = pd.crosstab(df['department'], df['position'])
print(crosstab_basic)
# 带比例的交叉表
print("\n部门-职位比例交叉表:")
crosstab_prop = pd.crosstab(df['department'], df['position'], normalize='index').round(3)
print(crosstab_prop)
# 带边际统计的交叉表
print("\n带总计的交叉表:")
crosstab_margins = pd.crosstab(
df['department'],
df['position'],
margins=True,
margins_name='总计'
)
print(crosstab_margins)
# 多维交叉表
print("\n多维交叉表(部门-职位-年龄组):")
multi_crosstab = pd.crosstab(
[df['department'], df['age_group']],
df['position']
)
print(multi_crosstab.head(10))
# 带值的交叉表
print("\n带平均薪资的交叉表:")
crosstab_values = pd.crosstab(
df['department'],
df['position'],
values=df['salary'],
aggfunc='mean'
).round(0)
print(crosstab_values)6. 分组变换和过滤
6.1 分组变换 (Transform)
python
print("\n=== 分组变换 ===")
# 计算部门内的相对薪资
df['dept_avg_salary'] = df.groupby('department')['salary'].transform('mean')
df['salary_vs_dept_avg'] = df['salary'] - df['dept_avg_salary']
df['salary_pct_of_dept_avg'] = (df['salary'] / df['dept_avg_salary'] * 100).round(1)
print("员工薪资相对于部门平均值:")
print(df[['name', 'department', 'salary', 'dept_avg_salary', 'salary_vs_dept_avg', 'salary_pct_of_dept_avg']].head(10))
# 计算部门内排名
df['dept_salary_rank'] = df.groupby('department')['salary'].transform(
lambda x: x.rank(ascending=False)
)
df['dept_performance_rank'] = df.groupby('department')['performance_score'].transform(
lambda x: x.rank(ascending=False)
)
print("\n部门内排名:")
print(df[['name', 'department', 'salary', 'dept_salary_rank', 'performance_score', 'dept_performance_rank']].head(10))
# 标准化分数
df['salary_zscore'] = df.groupby('department')['salary'].transform(
lambda x: (x - x.mean()) / x.std()
)
df['performance_zscore'] = df.groupby('department')['performance_score'].transform(
lambda x: (x - x.mean()) / x.std()
)
print("\n部门内标准化分数:")
print(df[['name', 'department', 'salary_zscore', 'performance_zscore']].head(10).round(3))6.2 分组过滤 (Filter)
python
print("\n=== 分组过滤 ===")
# 过滤大部门(员工数>150)
large_departments = df.groupby('department').filter(lambda x: len(x) > 150)
print(f"大部门员工数: {len(large_departments)}")
print("大部门分布:")
print(large_departments['department'].value_counts())
# 过滤高薪部门(平均薪资>8000)
high_salary_depts = df.groupby('department').filter(
lambda x: x['salary'].mean() > 8000
)
print(f"\n高薪部门员工数: {len(high_salary_depts)}")
print("高薪部门分布:")
print(high_salary_depts['department'].value_counts())
# 过滤高绩效部门(平均绩效>80)
high_performance_depts = df.groupby('department').filter(
lambda x: x['performance_score'].mean() > 80
)
print(f"\n高绩效部门员工数: {len(high_performance_depts)}")
print("高绩效部门分布:")
print(high_performance_depts['department'].value_counts())
# 复合条件过滤
complex_filter = df.groupby('department').filter(
lambda x: (len(x) > 100) and (x['salary'].mean() > 7500) and (x['performance_score'].mean() > 75)
)
print(f"\n符合复合条件的部门员工数: {len(complex_filter)}")
if len(complex_filter) > 0:
print("符合条件的部门:")
print(complex_filter['department'].value_counts())7. 聚合结果的可视化
7.1 聚合数据可视化
python
print("\n=== 聚合结果可视化 ===")
# 部门薪资统计可视化
dept_salary_stats = df.groupby('department')['salary'].agg(['mean', 'median', 'std']).round(0)
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 平均薪资柱状图
dept_salary_stats['mean'].plot(kind='bar', ax=axes[0,0], color='skyblue')
axes[0,0].set_title('各部门平均薪资')
axes[0,0].set_ylabel('薪资')
axes[0,0].tick_params(axis='x', rotation=45)
# 薪资分布箱线图
df.boxplot(column='salary', by='department', ax=axes[0,1])
axes[0,1].set_title('各部门薪资分布')
axes[0,1].set_xlabel('部门')
axes[0,1].set_ylabel('薪资')
# 职位分布堆叠柱状图
position_dist = pd.crosstab(df['department'], df['position'])
position_dist.plot(kind='bar', stacked=True, ax=axes[1,0])
axes[1,0].set_title('各部门职位分布')
axes[1,0].set_ylabel('人数')
axes[1,0].tick_params(axis='x', rotation=45)
axes[1,0].legend(title='职位', bbox_to_anchor=(1.05, 1), loc='upper left')
# 绩效vs薪资散点图
for dept in df['department'].unique():
dept_data = df[df['department'] == dept]
axes[1,1].scatter(dept_data['performance_score'], dept_data['salary'],
label=dept, alpha=0.6)
axes[1,1].set_xlabel('绩效分数')
axes[1,1].set_ylabel('薪资')
axes[1,1].set_title('绩效与薪资关系')
axes[1,1].legend()
plt.tight_layout()
plt.show()7.2 时间序列聚合可视化
python
# 入职趋势可视化
monthly_hires = df.groupby(df['join_date'].dt.to_period('M')).size()
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
# 月度入职人数
monthly_hires.plot(ax=axes[0], marker='o')
axes[0].set_title('月度入职人数趋势')
axes[0].set_ylabel('入职人数')
axes[0].grid(True, alpha=0.3)
# 累积入职人数
monthly_hires.cumsum().plot(ax=axes[1], marker='s')
axes[1].set_title('累积入职人数')
axes[1].set_ylabel('累积人数')
axes[1].set_xlabel('时间')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()8. 性能优化技巧
8.1 高效聚合方法
python
print("\n=== 性能优化技巧 ===")
import time
# 创建大数据集进行性能测试
large_df = pd.concat([df] * 10, ignore_index=True)
print(f"大数据集大小: {large_df.shape}")
# 方法1: 使用 agg
start_time = time.time()
result1 = large_df.groupby('department').agg({
'salary': ['mean', 'std'],
'performance_score': 'mean'
})
time1 = time.time() - start_time
# 方法2: 分别计算
start_time = time.time()
result2_mean = large_df.groupby('department')['salary'].mean()
result2_std = large_df.groupby('department')['salary'].std()
result2_perf = large_df.groupby('department')['performance_score'].mean()
time2 = time.time() - start_time
print(f"使用 agg 方法耗时: {time1:.4f} 秒")
print(f"分别计算耗时: {time2:.4f} 秒")
# 内存使用优化
print("\n内存使用优化:")
print(f"原始数据内存使用: {large_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# 优化数据类型
optimized_df = large_df.copy()
optimized_df['department'] = optimized_df['department'].astype('category')
optimized_df['position'] = optimized_df['position'].astype('category')
optimized_df['salary'] = optimized_df['salary'].astype('int32')
optimized_df['age'] = optimized_df['age'].astype('int8')
print(f"优化后内存使用: {optimized_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"内存节省: {(1 - optimized_df.memory_usage(deep=True).sum() / large_df.memory_usage(deep=True).sum()) * 100:.1f}%")8.2 并行处理
python
# 使用 numba 加速自定义聚合函数(需要安装 numba)
try:
from numba import jit
@jit
def fast_coefficient_of_variation(arr):
"""快速计算变异系数"""
mean_val = arr.mean()
if mean_val == 0:
return 0
return arr.std() / mean_val
print("\n使用 Numba 加速的聚合:")
start_time = time.time()
fast_cv = large_df.groupby('department')['salary'].apply(
lambda x: fast_coefficient_of_variation(x.values)
)
fast_time = time.time() - start_time
print(f"Numba 加速耗时: {fast_time:.4f} 秒")
except ImportError:
print("Numba 未安装,跳过加速演示")
# 分块处理大数据
def chunked_aggregation(df, chunk_size=1000):
"""分块聚合处理"""
results = []
for i in range(0, len(df), chunk_size):
chunk = df.iloc[i:i+chunk_size]
chunk_result = chunk.groupby('department')['salary'].mean()
results.append(chunk_result)
# 合并结果
combined = pd.concat(results, axis=1)
return combined.mean(axis=1)
print("\n分块处理结果:")
chunked_result = chunked_aggregation(large_df)
print(chunked_result.head())9. 实际应用案例
9.1 销售数据分析
python
print("\n=== 销售数据分析案例 ===")
# 创建销售数据
np.random.seed(42)
sales_data = {
'date': pd.date_range('2023-01-01', periods=365, freq='D'),
'product': np.random.choice(['产品A', '产品B', '产品C', '产品D'], 365),
'region': np.random.choice(['北区', '南区', '东区', '西区'], 365),
'salesperson': np.random.choice([f'销售员{i}' for i in range(1, 21)], 365),
'quantity': np.random.randint(1, 100, 365),
'unit_price': np.random.uniform(50, 500, 365),
}
sales_df = pd.DataFrame(sales_data)
sales_df['revenue'] = sales_df['quantity'] * sales_df['unit_price']
sales_df['month'] = sales_df['date'].dt.month
sales_df['quarter'] = sales_df['date'].dt.quarter
print("销售数据样本:")
print(sales_df.head())
# 1. 月度销售分析
print("\n月度销售分析:")
monthly_sales = sales_df.groupby('month').agg({
'revenue': ['sum', 'mean', 'count'],
'quantity': 'sum'
}).round(2)
monthly_sales.columns = ['总收入', '平均收入', '交易次数', '总销量']
print(monthly_sales)
# 2. 产品-区域分析
print("\n产品-区域销售矩阵:")
product_region_matrix = pd.pivot_table(
sales_df,
values='revenue',
index='product',
columns='region',
aggfunc='sum'
).round(0)
print(product_region_matrix)
# 3. 销售员绩效排名
print("\n销售员绩效排名(前10名):")
salesperson_performance = sales_df.groupby('salesperson').agg({
'revenue': 'sum',
'quantity': 'sum',
'date': 'count'
}).round(2)
salesperson_performance.columns = ['总收入', '总销量', '交易次数']
salesperson_performance['平均单笔收入'] = (salesperson_performance['总收入'] / salesperson_performance['交易次数']).round(2)
top_performers = salesperson_performance.sort_values('总收入', ascending=False)
print(top_performers.head(10))9.2 客户行为分析
python
print("\n=== 客户行为分析案例 ===")
# 创建客户数据
customer_data = {
'customer_id': np.repeat(range(1, 201), np.random.randint(1, 10, 200)),
'purchase_date': [],
'amount': [],
'category': []
}
# 生成购买记录
for cid in range(1, 201):
n_purchases = np.random.randint(1, 10)
start_date = pd.Timestamp('2023-01-01')
dates = pd.date_range(start_date, periods=n_purchases, freq=f'{np.random.randint(1, 30)}D')
customer_data['purchase_date'].extend(dates)
customer_data['amount'].extend(np.random.uniform(20, 500, n_purchases))
customer_data['category'].extend(np.random.choice(['电子产品', '服装', '食品', '家居'], n_purchases))
customer_df = pd.DataFrame({
'customer_id': customer_data['customer_id'],
'purchase_date': customer_data['purchase_date'],
'amount': customer_data['amount'],
'category': customer_data['category']
})
print(f"客户数据记录数: {len(customer_df)}")
print("客户数据样本:")
print(customer_df.head())
# 1. 客户价值分析
print("\n客户价值分析:")
customer_value = customer_df.groupby('customer_id').agg({
'amount': ['sum', 'mean', 'count'],
'purchase_date': ['min', 'max']
})
customer_value.columns = ['总消费', '平均消费', '购买次数', '首次购买', '最近购买']
customer_value['购买周期'] = (customer_value['最近购买'] - customer_value['首次购买']).dt.days
# 客户分层
customer_value['客户等级'] = pd.cut(
customer_value['总消费'],
bins=[0, 500, 1500, 3000, float('inf')],
labels=['铜牌', '银牌', '金牌', '钻石']
)
print("客户等级分布:")
print(customer_value['客户等级'].value_counts())
print("\n各等级客户统计:")
grade_stats = customer_value.groupby('客户等级').agg({
'总消费': ['mean', 'count'],
'购买次数': 'mean',
'购买周期': 'mean'
}).round(2)
print(grade_stats)
# 2. 品类偏好分析
print("\n客户品类偏好分析:")
category_preference = pd.crosstab(
customer_df['customer_id'],
customer_df['category'],
values=customer_df['amount'],
aggfunc='sum'
).fillna(0)
# 找出每个客户的主要品类
customer_main_category = category_preference.idxmax(axis=1)
print("客户主要品类分布:")
print(customer_main_category.value_counts())10. 最佳实践和注意事项
10.1 排序和聚合最佳实践
python
print("\n=== 最佳实践总结 ===")
def sorting_aggregation_best_practices():
"""
排序和聚合最佳实践指南
"""
practices = {
"排序最佳实践": [
"1. 使用 sort_values() 而不是 sort()",
"2. 对于大数据集,考虑使用 nlargest() 或 nsmallest()",
"3. 合理使用 na_position 参数处理缺失值",
"4. 使用 Categorical 类型进行自定义排序",
"5. 避免在循环中重复排序"
],
"聚合最佳实践": [
"1. 使用 agg() 进行多种聚合操作",
"2. 优先使用内置聚合函数而非自定义函数",
"3. 合理使用 transform() 保持原数据结构",
"4. 使用 filter() 进行条件筛选",
"5. 考虑内存使用,优化数据类型"
],
"性能优化建议": [
"1. 在聚合前先过滤数据",
"2. 使用分类数据类型减少内存使用",
"3. 避免在 groupby 中使用复杂的 lambda 函数",
"4. 考虑使用 numba 加速自定义函数",
"5. 对于超大数据集,考虑分块处理"
],
"常见陷阱": [
"1. 注意聚合后索引的变化",
"2. 小心处理缺失值对聚合结果的影响",
"3. 避免在聚合中意外包含非数值列",
"4. 注意时间序列聚合的时区问题",
"5. 验证聚合结果的业务逻辑合理性"
]
}
for category, items in practices.items():
print(f"\n{category}:")
for item in items:
print(f" {item}")
sorting_aggregation_best_practices()
# 性能对比示例
print("\n=== 性能对比示例 ===")
# 低效方式
def inefficient_aggregation(df):
results = []
for dept in df['department'].unique():
dept_data = df[df['department'] == dept]
avg_salary = dept_data['salary'].mean()
results.append({'department': dept, 'avg_salary': avg_salary})
return pd.DataFrame(results)
# 高效方式
def efficient_aggregation(df):
return df.groupby('department')['salary'].mean().reset_index()
# 性能测试
start_time = time.time()
inefficient_result = inefficient_aggregation(df)
inefficient_time = time.time() - start_time
start_time = time.time()
efficient_result = efficient_aggregation(df)
efficient_time = time.time() - start_time
print(f"低效方式耗时: {inefficient_time:.4f} 秒")
print(f"高效方式耗时: {efficient_time:.4f} 秒")
print(f"性能提升: {inefficient_time / efficient_time:.1f}x")本章小结
本章全面介绍了 Pandas 中的数据排序和聚合技术:
排序技术
- 基本排序:
sort_values(),sort_index()的使用 - 高级排序:自定义排序、分组内排序、多级排序
- 排序优化:处理缺失值、使用分类数据类型
聚合技术
- 基本聚合:
agg(),groupby()的各种用法 - 高级聚合:自定义聚合函数、条件聚合、时间序列聚合
- 分组操作:
transform(),filter(),apply()的区别和应用
数据透视
- 透视表:
pivot_table()的多维分析 - 交叉表:
crosstab()的统计分析 - 数据重塑:长宽表转换和多级索引处理
性能优化
- 内存优化:数据类型优化、分块处理
- 计算优化:向量化操作、并行处理
- 最佳实践:避免常见陷阱、提高代码效率
掌握这些排序和聚合技术,能够帮助你:
- 快速理解数据的分布和模式
- 进行多维度的数据分析
- 生成各种统计报告和业务洞察
- 为数据可视化和建模做好准备
练习题
- 创建一个复杂的多级分组聚合分析
- 实现一个自定义的滚动窗口聚合函数
- 设计一个客户细分分析系统
- 优化一个大数据集的聚合性能
- 构建一个动态的数据透视表分析工具
下一章我们将学习 Pandas 的数据可视化功能,探索如何将数据分析结果转化为直观的图表。