Pandas 数据清洗
数据清洗是数据分析过程中最重要的步骤之一。在实际工作中,原始数据往往包含缺失值、重复值、异常值等问题,需要进行清洗和预处理才能用于分析。本章将详细介绍如何使用 Pandas 进行数据清洗。
1. 数据清洗概述
1.1 什么是数据清洗
数据清洗是指识别和纠正数据中的错误、不一致性和不完整性的过程。主要包括:
- 缺失值处理:识别和处理空值
- 重复值处理:发现和删除重复记录
- 异常值处理:识别和处理离群值
- 数据类型转换:确保数据类型正确
- 数据格式标准化:统一数据格式
1.2 数据质量问题的常见类型
python
import pandas as pd
import numpy as np
# 创建包含各种数据质量问题的示例数据
data = {
'name': ['张三', '李四', '王五', '张三', '赵六', None, ''],
'age': [25, 30, np.nan, 25, 35, 28, -5],
'salary': [5000, 6000, 7000, 5000, 8000, 9000, 999999],
'department': ['IT', 'HR', 'it', 'IT', 'Finance', 'hr', 'IT'],
'join_date': ['2020-01-15', '2019-03-20', '2021/05/10', '2020-01-15',
'2018-12-01', '无效日期', '2022-02-28']
}
df = pd.DataFrame(data)
print("原始数据:")
print(df)
print("\n数据信息:")
print(df.info())2. 缺失值处理
2.1 识别缺失值
python
# 检查缺失值
print("缺失值统计:")
print(df.isnull().sum())
# 查看缺失值的分布
print("\n缺失值百分比:")
print(df.isnull().mean() * 100)
# 查看包含缺失值的行
print("\n包含缺失值的行:")
print(df[df.isnull().any(axis=1)])
# 可视化缺失值模式
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(10, 6))
sns.heatmap(df.isnull(), cbar=True, yticklabels=False, cmap='viridis')
plt.title('缺失值模式')
plt.show()2.2 处理缺失值的策略
删除缺失值
python
# 删除包含任何缺失值的行
df_drop_any = df.dropna()
print("删除任何缺失值后的数据:")
print(df_drop_any)
# 删除所有值都为缺失的行
df_drop_all = df.dropna(how='all')
# 删除特定列的缺失值
df_drop_subset = df.dropna(subset=['age'])
# 删除包含缺失值的列
df_drop_columns = df.dropna(axis=1)填充缺失值
python
# 用固定值填充
df_filled = df.copy()
df_filled['age'].fillna(df['age'].mean(), inplace=True) # 用均值填充
df_filled['name'].fillna('未知', inplace=True) # 用固定值填充
# 前向填充和后向填充
df_ffill = df.fillna(method='ffill') # 前向填充
df_bfill = df.fillna(method='bfill') # 后向填充
# 用插值方法填充数值型数据
df_interpolate = df.copy()
df_interpolate['age'] = df_interpolate['age'].interpolate()
# 用众数填充分类数据
from scipy import stats
mode_department = stats.mode(df['department'].dropna())[0][0]
df_filled['department'].fillna(mode_department, inplace=True)2.3 高级缺失值处理
python
# 基于条件填充
def fill_age_by_department(row):
if pd.isna(row['age']):
if row['department'] == 'IT':
return 28 # IT部门平均年龄
elif row['department'] == 'HR':
return 32 # HR部门平均年龄
else:
return 30 # 其他部门平均年龄
return row['age']
df_conditional = df.copy()
df_conditional['age'] = df_conditional.apply(fill_age_by_department, axis=1)
# 使用机器学习方法填充缺失值
from sklearn.impute import KNNImputer
# 准备数值型数据
numeric_cols = ['age', 'salary']
imputer = KNNImputer(n_neighbors=3)
df_knn = df.copy()
df_knn[numeric_cols] = imputer.fit_transform(df[numeric_cols])3. 重复值处理
3.1 识别重复值
python
# 检查完全重复的行
print("重复行数量:", df.duplicated().sum())
print("\n重复的行:")
print(df[df.duplicated()])
# 基于特定列检查重复
print("\n基于姓名的重复:")
print(df[df.duplicated(subset=['name'])])
# 查看所有重复项(包括第一次出现)
print("\n所有重复项:")
print(df[df.duplicated(subset=['name'], keep=False)])3.2 处理重复值
python
# 删除完全重复的行
df_no_duplicates = df.drop_duplicates()
# 基于特定列删除重复,保留第一个
df_unique_names = df.drop_duplicates(subset=['name'], keep='first')
# 保留最后一个重复项
df_keep_last = df.drop_duplicates(subset=['name'], keep='last')
# 删除所有重复项
df_remove_all_duplicates = df.drop_duplicates(subset=['name'], keep=False)
print("处理重复值后的数据:")
print(df_unique_names)4. 异常值处理
4.1 识别异常值
python
# 使用统计方法识别异常值
def detect_outliers_iqr(data):
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return (data < lower_bound) | (data > upper_bound)
# 检查年龄异常值
age_outliers = detect_outliers_iqr(df['age'].dropna())
print("年龄异常值:")
print(df.loc[age_outliers, 'age'])
# 使用Z-score方法
from scipy import stats
z_scores = np.abs(stats.zscore(df['salary'].dropna()))
threshold = 3
salary_outliers = z_scores > threshold
print("\n薪资异常值(Z-score > 3):")
print(df.loc[df['salary'].dropna().index[salary_outliers], 'salary'])4.2 处理异常值
python
# 删除异常值
df_no_outliers = df.copy()
age_mask = ~detect_outliers_iqr(df_no_outliers['age'].dropna())
df_no_outliers = df_no_outliers.loc[df_no_outliers['age'].dropna().index[age_mask]]
# 用边界值替换异常值
def cap_outliers(data, method='iqr'):
if method == 'iqr':
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return data.clip(lower=lower_bound, upper=upper_bound)
df_capped = df.copy()
df_capped['salary'] = cap_outliers(df_capped['salary'])
# 用中位数替换异常值
df_median_replace = df.copy()
outlier_mask = detect_outliers_iqr(df_median_replace['age'].dropna())
median_age = df_median_replace['age'].median()
df_median_replace.loc[outlier_mask, 'age'] = median_age5. 数据类型转换
5.1 检查和转换数据类型
python
# 检查当前数据类型
print("当前数据类型:")
print(df.dtypes)
# 转换数据类型
df_converted = df.copy()
# 转换数值型
df_converted['age'] = pd.to_numeric(df_converted['age'], errors='coerce')
df_converted['salary'] = pd.to_numeric(df_converted['salary'], errors='coerce')
# 转换日期型
df_converted['join_date'] = pd.to_datetime(df_converted['join_date'],
errors='coerce',
infer_datetime_format=True)
# 转换分类型
df_converted['department'] = df_converted['department'].astype('category')
print("\n转换后的数据类型:")
print(df_converted.dtypes)5.2 处理字符串数据
python
# 字符串清洗
df_string_clean = df.copy()
# 去除空白字符
df_string_clean['name'] = df_string_clean['name'].str.strip()
df_string_clean['department'] = df_string_clean['department'].str.strip()
# 统一大小写
df_string_clean['department'] = df_string_clean['department'].str.upper()
# 替换空字符串为NaN
df_string_clean['name'] = df_string_clean['name'].replace('', np.nan)
# 标准化文本
df_string_clean['department'] = df_string_clean['department'].replace({
'IT': 'Information Technology',
'HR': 'Human Resources'
})
print("字符串清洗后的数据:")
print(df_string_clean)6. 数据验证和质量检查
6.1 数据完整性检查
python
def data_quality_report(df):
"""
生成数据质量报告
"""
report = {}
# 基本信息
report['总行数'] = len(df)
report['总列数'] = len(df.columns)
# 缺失值统计
missing_stats = df.isnull().sum()
report['缺失值统计'] = missing_stats[missing_stats > 0].to_dict()
# 重复值统计
report['重复行数'] = df.duplicated().sum()
# 数据类型统计
report['数据类型'] = df.dtypes.value_counts().to_dict()
# 数值型列的统计信息
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
report['数值型统计'] = df[numeric_cols].describe().to_dict()
return report
# 生成数据质量报告
quality_report = data_quality_report(df)
for key, value in quality_report.items():
print(f"{key}: {value}")
print("-" * 50)6.2 数据约束检查
python
def validate_data(df):
"""
数据验证函数
"""
issues = []
# 检查年龄范围
if 'age' in df.columns:
invalid_age = df[(df['age'] < 0) | (df['age'] > 120)]
if not invalid_age.empty:
issues.append(f"发现 {len(invalid_age)} 个无效年龄值")
# 检查薪资范围
if 'salary' in df.columns:
invalid_salary = df[(df['salary'] < 0) | (df['salary'] > 1000000)]
if not invalid_salary.empty:
issues.append(f"发现 {len(invalid_salary)} 个可疑薪资值")
# 检查必填字段
required_fields = ['name', 'department']
for field in required_fields:
if field in df.columns:
missing_count = df[field].isnull().sum()
if missing_count > 0:
issues.append(f"必填字段 {field} 有 {missing_count} 个缺失值")
return issues
# 执行数据验证
validation_issues = validate_data(df)
if validation_issues:
print("数据验证发现的问题:")
for issue in validation_issues:
print(f"- {issue}")
else:
print("数据验证通过")7. 综合数据清洗流程
7.1 完整的数据清洗管道
python
class DataCleaner:
def __init__(self):
self.cleaning_log = []
def log_action(self, action):
self.cleaning_log.append(action)
print(f"执行: {action}")
def clean_data(self, df):
"""
完整的数据清洗流程
"""
df_clean = df.copy()
original_shape = df_clean.shape
# 1. 处理字符串数据
self.log_action("清洗字符串数据")
string_cols = df_clean.select_dtypes(include=['object']).columns
for col in string_cols:
if col in df_clean.columns:
df_clean[col] = df_clean[col].astype(str).str.strip()
df_clean[col] = df_clean[col].replace(['', 'nan', 'None'], np.nan)
# 2. 数据类型转换
self.log_action("转换数据类型")
if 'age' in df_clean.columns:
df_clean['age'] = pd.to_numeric(df_clean['age'], errors='coerce')
if 'salary' in df_clean.columns:
df_clean['salary'] = pd.to_numeric(df_clean['salary'], errors='coerce')
if 'join_date' in df_clean.columns:
df_clean['join_date'] = pd.to_datetime(df_clean['join_date'], errors='coerce')
# 3. 处理重复值
self.log_action("删除重复值")
duplicates_before = df_clean.duplicated().sum()
df_clean = df_clean.drop_duplicates()
duplicates_removed = duplicates_before - df_clean.duplicated().sum()
# 4. 处理异常值
self.log_action("处理异常值")
numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df_clean[col] = cap_outliers(df_clean[col])
# 5. 处理缺失值
self.log_action("处理缺失值")
# 数值型用中位数填充
for col in numeric_cols:
if df_clean[col].isnull().any():
median_val = df_clean[col].median()
df_clean[col].fillna(median_val, inplace=True)
# 分类型用众数填充
categorical_cols = df_clean.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
if df_clean[col].isnull().any():
mode_val = df_clean[col].mode().iloc[0] if not df_clean[col].mode().empty else '未知'
df_clean[col].fillna(mode_val, inplace=True)
# 6. 最终验证
self.log_action("执行最终验证")
final_shape = df_clean.shape
# 生成清洗报告
report = {
'原始数据形状': original_shape,
'清洗后数据形状': final_shape,
'删除的重复行数': duplicates_removed,
'剩余缺失值': df_clean.isnull().sum().sum(),
'清洗步骤': self.cleaning_log
}
return df_clean, report
# 使用数据清洗器
cleaner = DataCleaner()
df_cleaned, cleaning_report = cleaner.clean_data(df)
print("\n清洗报告:")
for key, value in cleaning_report.items():
print(f"{key}: {value}")
print("\n清洗后的数据:")
print(df_cleaned)
print("\n清洗后数据信息:")
print(df_cleaned.info())8. 实际应用案例
8.1 销售数据清洗
python
# 模拟销售数据
sales_data = {
'order_id': ['ORD001', 'ORD002', 'ORD003', 'ORD001', 'ORD004', None],
'customer_name': ['张三', '李四', '', '张三', '王五', '赵六'],
'product': ['笔记本电脑', 'LAPTOP', '笔记本电脑', '笔记本电脑', '手机', '平板'],
'quantity': [1, 2, -1, 1, 3, 0],
'price': [5000, 6000, 5000, 5000, 2000, 3000],
'order_date': ['2023-01-15', '2023/02/20', '2023-03-10', '2023-01-15',
'2023-04-05', 'invalid']
}
sales_df = pd.DataFrame(sales_data)
print("原始销售数据:")
print(sales_df)
# 应用清洗流程
sales_cleaner = DataCleaner()
sales_cleaned, sales_report = sales_cleaner.clean_data(sales_df)
# 额外的业务规则清洗
sales_cleaned = sales_cleaned[sales_cleaned['quantity'] > 0] # 删除无效数量
sales_cleaned['product'] = sales_cleaned['product'].str.upper() # 统一产品名称
print("\n清洗后的销售数据:")
print(sales_cleaned)8.2 用户行为数据清洗
python
# 模拟用户行为数据
user_behavior = {
'user_id': [1001, 1002, 1003, 1001, 1004, 1005],
'session_duration': [300, 1500, -50, 300, 7200, 0],
'page_views': [5, 15, 0, 5, 50, 1],
'bounce_rate': [0.2, 0.1, 1.5, 0.2, 0.05, 0.8],
'device_type': ['mobile', 'DESKTOP', 'mobile', 'mobile', 'tablet', ''],
'last_visit': ['2023-01-15 10:30:00', '2023-02-20 14:45:00',
'2023-03-10 09:15:00', '2023-01-15 10:30:00',
'2023-04-05 16:20:00', 'N/A']
}
behavior_df = pd.DataFrame(user_behavior)
# 自定义清洗规则
def clean_user_behavior(df):
df_clean = df.copy()
# 处理会话时长异常值(负值和过大值)
df_clean = df_clean[df_clean['session_duration'] >= 0]
df_clean = df_clean[df_clean['session_duration'] <= 3600] # 最大1小时
# 处理跳出率范围
df_clean['bounce_rate'] = df_clean['bounce_rate'].clip(0, 1)
# 标准化设备类型
df_clean['device_type'] = df_clean['device_type'].str.lower()
df_clean['device_type'] = df_clean['device_type'].replace('', 'unknown')
# 处理时间格式
df_clean['last_visit'] = pd.to_datetime(df_clean['last_visit'], errors='coerce')
return df_clean
behavior_cleaned = clean_user_behavior(behavior_df)
print("清洗后的用户行为数据:")
print(behavior_cleaned)9. 性能优化技巧
9.1 大数据集的清洗策略
python
# 分块处理大文件
def clean_large_dataset(file_path, chunk_size=10000):
"""
分块处理大数据集
"""
cleaned_chunks = []
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 应用清洗逻辑
chunk_cleaned = clean_data_chunk(chunk)
cleaned_chunks.append(chunk_cleaned)
# 合并所有清洗后的块
return pd.concat(cleaned_chunks, ignore_index=True)
def clean_data_chunk(chunk):
"""
清洗单个数据块
"""
# 基本清洗操作
chunk = chunk.dropna(how='all') # 删除全空行
chunk = chunk.drop_duplicates() # 删除重复行
# 数据类型优化
for col in chunk.select_dtypes(include=['object']).columns:
if chunk[col].nunique() / len(chunk) < 0.5: # 如果唯一值比例小于50%
chunk[col] = chunk[col].astype('category')
return chunk9.2 内存优化
python
# 优化数据类型以节省内存
def optimize_dtypes(df):
"""
优化DataFrame的数据类型
"""
df_optimized = df.copy()
# 优化整数类型
for col in df_optimized.select_dtypes(include=['int64']).columns:
col_min = df_optimized[col].min()
col_max = df_optimized[col].max()
if col_min >= 0:
if col_max < 255:
df_optimized[col] = df_optimized[col].astype('uint8')
elif col_max < 65535:
df_optimized[col] = df_optimized[col].astype('uint16')
elif col_max < 4294967295:
df_optimized[col] = df_optimized[col].astype('uint32')
else:
if col_min > -128 and col_max < 127:
df_optimized[col] = df_optimized[col].astype('int8')
elif col_min > -32768 and col_max < 32767:
df_optimized[col] = df_optimized[col].astype('int16')
elif col_min > -2147483648 and col_max < 2147483647:
df_optimized[col] = df_optimized[col].astype('int32')
# 优化浮点类型
for col in df_optimized.select_dtypes(include=['float64']).columns:
df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='float')
return df_optimized
# 应用优化
df_optimized = optimize_dtypes(df_cleaned)
print(f"原始内存使用: {df_cleaned.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"优化后内存使用: {df_optimized.memory_usage(deep=True).sum() / 1024**2:.2f} MB")10. 最佳实践和注意事项
10.1 数据清洗最佳实践
- 保留原始数据:始终保留原始数据的备份
- 记录清洗步骤:详细记录每个清洗步骤和决策理由
- 验证清洗结果:清洗后要验证数据的完整性和正确性
- 渐进式清洗:分步骤进行清洗,每步都要检查结果
- 业务理解:清洗决策要基于对业务的理解
10.2 常见陷阱和注意事项
python
# 常见错误示例和正确做法
# 错误:直接删除所有缺失值
# df_wrong = df.dropna() # 可能删除太多有用数据
# 正确:根据业务逻辑选择性处理
df_correct = df.copy()
# 只删除关键字段的缺失值
df_correct = df_correct.dropna(subset=['name', 'department'])
# 其他字段用合适的方法填充
df_correct['age'].fillna(df_correct['age'].median(), inplace=True)
# 错误:盲目删除异常值
# df_wrong = df[df['salary'] < df['salary'].quantile(0.95)] # 可能删除合理的高薪
# 正确:结合业务逻辑判断
# 高管薪资可能确实很高,需要单独考虑
high_salary_threshold = 50000
df_correct = df[df['salary'] <= high_salary_threshold]本章小结
数据清洗是数据分析的基础工作,本章介绍了:
- 缺失值处理:识别、删除和填充缺失值的各种方法
- 重复值处理:发现和处理重复数据
- 异常值处理:使用统计方法识别和处理异常值
- 数据类型转换:确保数据类型的正确性
- 数据验证:建立数据质量检查机制
- 综合清洗流程:构建完整的数据清洗管道
- 性能优化:处理大数据集的策略和内存优化技巧
掌握这些数据清洗技能,能够帮助你处理现实中的脏数据,为后续的数据分析奠定坚实基础。在实际应用中,要根据具体的业务场景和数据特点,选择合适的清洗策略。
练习题
- 创建一个包含各种数据质量问题的DataFrame,并实现完整的清洗流程
- 编写一个函数来自动检测和报告数据质量问题
- 实现一个可配置的数据清洗管道,支持不同的清洗策略
- 处理一个真实的CSV文件,记录清洗过程和结果
- 比较不同缺失值填充方法对后续分析结果的影响
下一章我们将学习 Pandas 的常用函数,这些函数将帮助我们更高效地处理和分析数据。