Skip to content

Pandas 数据清洗

数据清洗是数据分析过程中最重要的步骤之一。在实际工作中,原始数据往往包含缺失值、重复值、异常值等问题,需要进行清洗和预处理才能用于分析。本章将详细介绍如何使用 Pandas 进行数据清洗。

1. 数据清洗概述

1.1 什么是数据清洗

数据清洗是指识别和纠正数据中的错误、不一致性和不完整性的过程。主要包括:

  • 缺失值处理:识别和处理空值
  • 重复值处理:发现和删除重复记录
  • 异常值处理:识别和处理离群值
  • 数据类型转换:确保数据类型正确
  • 数据格式标准化:统一数据格式

1.2 数据质量问题的常见类型

python
import pandas as pd
import numpy as np

# 创建包含各种数据质量问题的示例数据
data = {
    'name': ['张三', '李四', '王五', '张三', '赵六', None, ''],
    'age': [25, 30, np.nan, 25, 35, 28, -5],
    'salary': [5000, 6000, 7000, 5000, 8000, 9000, 999999],
    'department': ['IT', 'HR', 'it', 'IT', 'Finance', 'hr', 'IT'],
    'join_date': ['2020-01-15', '2019-03-20', '2021/05/10', '2020-01-15', 
                  '2018-12-01', '无效日期', '2022-02-28']
}

df = pd.DataFrame(data)
print("原始数据:")
print(df)
print("\n数据信息:")
print(df.info())

2. 缺失值处理

2.1 识别缺失值

python
# 检查缺失值
print("缺失值统计:")
print(df.isnull().sum())

# 查看缺失值的分布
print("\n缺失值百分比:")
print(df.isnull().mean() * 100)

# 查看包含缺失值的行
print("\n包含缺失值的行:")
print(df[df.isnull().any(axis=1)])

# 可视化缺失值模式
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 6))
sns.heatmap(df.isnull(), cbar=True, yticklabels=False, cmap='viridis')
plt.title('缺失值模式')
plt.show()

2.2 处理缺失值的策略

删除缺失值

python
# 删除包含任何缺失值的行
df_drop_any = df.dropna()
print("删除任何缺失值后的数据:")
print(df_drop_any)

# 删除所有值都为缺失的行
df_drop_all = df.dropna(how='all')

# 删除特定列的缺失值
df_drop_subset = df.dropna(subset=['age'])

# 删除包含缺失值的列
df_drop_columns = df.dropna(axis=1)

填充缺失值

python
# 用固定值填充
df_filled = df.copy()
df_filled['age'].fillna(df['age'].mean(), inplace=True)  # 用均值填充
df_filled['name'].fillna('未知', inplace=True)  # 用固定值填充

# 前向填充和后向填充
df_ffill = df.fillna(method='ffill')  # 前向填充
df_bfill = df.fillna(method='bfill')  # 后向填充

# 用插值方法填充数值型数据
df_interpolate = df.copy()
df_interpolate['age'] = df_interpolate['age'].interpolate()

# 用众数填充分类数据
from scipy import stats
mode_department = stats.mode(df['department'].dropna())[0][0]
df_filled['department'].fillna(mode_department, inplace=True)

2.3 高级缺失值处理

python
# 基于条件填充
def fill_age_by_department(row):
    if pd.isna(row['age']):
        if row['department'] == 'IT':
            return 28  # IT部门平均年龄
        elif row['department'] == 'HR':
            return 32  # HR部门平均年龄
        else:
            return 30  # 其他部门平均年龄
    return row['age']

df_conditional = df.copy()
df_conditional['age'] = df_conditional.apply(fill_age_by_department, axis=1)

# 使用机器学习方法填充缺失值
from sklearn.impute import KNNImputer

# 准备数值型数据
numeric_cols = ['age', 'salary']
imputer = KNNImputer(n_neighbors=3)
df_knn = df.copy()
df_knn[numeric_cols] = imputer.fit_transform(df[numeric_cols])

3. 重复值处理

3.1 识别重复值

python
# 检查完全重复的行
print("重复行数量:", df.duplicated().sum())
print("\n重复的行:")
print(df[df.duplicated()])

# 基于特定列检查重复
print("\n基于姓名的重复:")
print(df[df.duplicated(subset=['name'])])

# 查看所有重复项(包括第一次出现)
print("\n所有重复项:")
print(df[df.duplicated(subset=['name'], keep=False)])

3.2 处理重复值

python
# 删除完全重复的行
df_no_duplicates = df.drop_duplicates()

# 基于特定列删除重复,保留第一个
df_unique_names = df.drop_duplicates(subset=['name'], keep='first')

# 保留最后一个重复项
df_keep_last = df.drop_duplicates(subset=['name'], keep='last')

# 删除所有重复项
df_remove_all_duplicates = df.drop_duplicates(subset=['name'], keep=False)

print("处理重复值后的数据:")
print(df_unique_names)

4. 异常值处理

4.1 识别异常值

python
# 使用统计方法识别异常值
def detect_outliers_iqr(data):
    Q1 = data.quantile(0.25)
    Q3 = data.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    return (data < lower_bound) | (data > upper_bound)

# 检查年龄异常值
age_outliers = detect_outliers_iqr(df['age'].dropna())
print("年龄异常值:")
print(df.loc[age_outliers, 'age'])

# 使用Z-score方法
from scipy import stats
z_scores = np.abs(stats.zscore(df['salary'].dropna()))
threshold = 3
salary_outliers = z_scores > threshold
print("\n薪资异常值(Z-score > 3):")
print(df.loc[df['salary'].dropna().index[salary_outliers], 'salary'])

4.2 处理异常值

python
# 删除异常值
df_no_outliers = df.copy()
age_mask = ~detect_outliers_iqr(df_no_outliers['age'].dropna())
df_no_outliers = df_no_outliers.loc[df_no_outliers['age'].dropna().index[age_mask]]

# 用边界值替换异常值
def cap_outliers(data, method='iqr'):
    if method == 'iqr':
        Q1 = data.quantile(0.25)
        Q3 = data.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
    
    return data.clip(lower=lower_bound, upper=upper_bound)

df_capped = df.copy()
df_capped['salary'] = cap_outliers(df_capped['salary'])

# 用中位数替换异常值
df_median_replace = df.copy()
outlier_mask = detect_outliers_iqr(df_median_replace['age'].dropna())
median_age = df_median_replace['age'].median()
df_median_replace.loc[outlier_mask, 'age'] = median_age

5. 数据类型转换

5.1 检查和转换数据类型

python
# 检查当前数据类型
print("当前数据类型:")
print(df.dtypes)

# 转换数据类型
df_converted = df.copy()

# 转换数值型
df_converted['age'] = pd.to_numeric(df_converted['age'], errors='coerce')
df_converted['salary'] = pd.to_numeric(df_converted['salary'], errors='coerce')

# 转换日期型
df_converted['join_date'] = pd.to_datetime(df_converted['join_date'], 
                                          errors='coerce', 
                                          infer_datetime_format=True)

# 转换分类型
df_converted['department'] = df_converted['department'].astype('category')

print("\n转换后的数据类型:")
print(df_converted.dtypes)

5.2 处理字符串数据

python
# 字符串清洗
df_string_clean = df.copy()

# 去除空白字符
df_string_clean['name'] = df_string_clean['name'].str.strip()
df_string_clean['department'] = df_string_clean['department'].str.strip()

# 统一大小写
df_string_clean['department'] = df_string_clean['department'].str.upper()

# 替换空字符串为NaN
df_string_clean['name'] = df_string_clean['name'].replace('', np.nan)

# 标准化文本
df_string_clean['department'] = df_string_clean['department'].replace({
    'IT': 'Information Technology',
    'HR': 'Human Resources'
})

print("字符串清洗后的数据:")
print(df_string_clean)

6. 数据验证和质量检查

6.1 数据完整性检查

python
def data_quality_report(df):
    """
    生成数据质量报告
    """
    report = {}
    
    # 基本信息
    report['总行数'] = len(df)
    report['总列数'] = len(df.columns)
    
    # 缺失值统计
    missing_stats = df.isnull().sum()
    report['缺失值统计'] = missing_stats[missing_stats > 0].to_dict()
    
    # 重复值统计
    report['重复行数'] = df.duplicated().sum()
    
    # 数据类型统计
    report['数据类型'] = df.dtypes.value_counts().to_dict()
    
    # 数值型列的统计信息
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 0:
        report['数值型统计'] = df[numeric_cols].describe().to_dict()
    
    return report

# 生成数据质量报告
quality_report = data_quality_report(df)
for key, value in quality_report.items():
    print(f"{key}: {value}")
    print("-" * 50)

6.2 数据约束检查

python
def validate_data(df):
    """
    数据验证函数
    """
    issues = []
    
    # 检查年龄范围
    if 'age' in df.columns:
        invalid_age = df[(df['age'] < 0) | (df['age'] > 120)]
        if not invalid_age.empty:
            issues.append(f"发现 {len(invalid_age)} 个无效年龄值")
    
    # 检查薪资范围
    if 'salary' in df.columns:
        invalid_salary = df[(df['salary'] < 0) | (df['salary'] > 1000000)]
        if not invalid_salary.empty:
            issues.append(f"发现 {len(invalid_salary)} 个可疑薪资值")
    
    # 检查必填字段
    required_fields = ['name', 'department']
    for field in required_fields:
        if field in df.columns:
            missing_count = df[field].isnull().sum()
            if missing_count > 0:
                issues.append(f"必填字段 {field}{missing_count} 个缺失值")
    
    return issues

# 执行数据验证
validation_issues = validate_data(df)
if validation_issues:
    print("数据验证发现的问题:")
    for issue in validation_issues:
        print(f"- {issue}")
else:
    print("数据验证通过")

7. 综合数据清洗流程

7.1 完整的数据清洗管道

python
class DataCleaner:
    def __init__(self):
        self.cleaning_log = []
    
    def log_action(self, action):
        self.cleaning_log.append(action)
        print(f"执行: {action}")
    
    def clean_data(self, df):
        """
        完整的数据清洗流程
        """
        df_clean = df.copy()
        original_shape = df_clean.shape
        
        # 1. 处理字符串数据
        self.log_action("清洗字符串数据")
        string_cols = df_clean.select_dtypes(include=['object']).columns
        for col in string_cols:
            if col in df_clean.columns:
                df_clean[col] = df_clean[col].astype(str).str.strip()
                df_clean[col] = df_clean[col].replace(['', 'nan', 'None'], np.nan)
        
        # 2. 数据类型转换
        self.log_action("转换数据类型")
        if 'age' in df_clean.columns:
            df_clean['age'] = pd.to_numeric(df_clean['age'], errors='coerce')
        if 'salary' in df_clean.columns:
            df_clean['salary'] = pd.to_numeric(df_clean['salary'], errors='coerce')
        if 'join_date' in df_clean.columns:
            df_clean['join_date'] = pd.to_datetime(df_clean['join_date'], errors='coerce')
        
        # 3. 处理重复值
        self.log_action("删除重复值")
        duplicates_before = df_clean.duplicated().sum()
        df_clean = df_clean.drop_duplicates()
        duplicates_removed = duplicates_before - df_clean.duplicated().sum()
        
        # 4. 处理异常值
        self.log_action("处理异常值")
        numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            df_clean[col] = cap_outliers(df_clean[col])
        
        # 5. 处理缺失值
        self.log_action("处理缺失值")
        # 数值型用中位数填充
        for col in numeric_cols:
            if df_clean[col].isnull().any():
                median_val = df_clean[col].median()
                df_clean[col].fillna(median_val, inplace=True)
        
        # 分类型用众数填充
        categorical_cols = df_clean.select_dtypes(include=['object', 'category']).columns
        for col in categorical_cols:
            if df_clean[col].isnull().any():
                mode_val = df_clean[col].mode().iloc[0] if not df_clean[col].mode().empty else '未知'
                df_clean[col].fillna(mode_val, inplace=True)
        
        # 6. 最终验证
        self.log_action("执行最终验证")
        final_shape = df_clean.shape
        
        # 生成清洗报告
        report = {
            '原始数据形状': original_shape,
            '清洗后数据形状': final_shape,
            '删除的重复行数': duplicates_removed,
            '剩余缺失值': df_clean.isnull().sum().sum(),
            '清洗步骤': self.cleaning_log
        }
        
        return df_clean, report

# 使用数据清洗器
cleaner = DataCleaner()
df_cleaned, cleaning_report = cleaner.clean_data(df)

print("\n清洗报告:")
for key, value in cleaning_report.items():
    print(f"{key}: {value}")

print("\n清洗后的数据:")
print(df_cleaned)
print("\n清洗后数据信息:")
print(df_cleaned.info())

8. 实际应用案例

8.1 销售数据清洗

python
# 模拟销售数据
sales_data = {
    'order_id': ['ORD001', 'ORD002', 'ORD003', 'ORD001', 'ORD004', None],
    'customer_name': ['张三', '李四', '', '张三', '王五', '赵六'],
    'product': ['笔记本电脑', 'LAPTOP', '笔记本电脑', '笔记本电脑', '手机', '平板'],
    'quantity': [1, 2, -1, 1, 3, 0],
    'price': [5000, 6000, 5000, 5000, 2000, 3000],
    'order_date': ['2023-01-15', '2023/02/20', '2023-03-10', '2023-01-15', 
                   '2023-04-05', 'invalid']
}

sales_df = pd.DataFrame(sales_data)
print("原始销售数据:")
print(sales_df)

# 应用清洗流程
sales_cleaner = DataCleaner()
sales_cleaned, sales_report = sales_cleaner.clean_data(sales_df)

# 额外的业务规则清洗
sales_cleaned = sales_cleaned[sales_cleaned['quantity'] > 0]  # 删除无效数量
sales_cleaned['product'] = sales_cleaned['product'].str.upper()  # 统一产品名称

print("\n清洗后的销售数据:")
print(sales_cleaned)

8.2 用户行为数据清洗

python
# 模拟用户行为数据
user_behavior = {
    'user_id': [1001, 1002, 1003, 1001, 1004, 1005],
    'session_duration': [300, 1500, -50, 300, 7200, 0],
    'page_views': [5, 15, 0, 5, 50, 1],
    'bounce_rate': [0.2, 0.1, 1.5, 0.2, 0.05, 0.8],
    'device_type': ['mobile', 'DESKTOP', 'mobile', 'mobile', 'tablet', ''],
    'last_visit': ['2023-01-15 10:30:00', '2023-02-20 14:45:00', 
                   '2023-03-10 09:15:00', '2023-01-15 10:30:00',
                   '2023-04-05 16:20:00', 'N/A']
}

behavior_df = pd.DataFrame(user_behavior)

# 自定义清洗规则
def clean_user_behavior(df):
    df_clean = df.copy()
    
    # 处理会话时长异常值(负值和过大值)
    df_clean = df_clean[df_clean['session_duration'] >= 0]
    df_clean = df_clean[df_clean['session_duration'] <= 3600]  # 最大1小时
    
    # 处理跳出率范围
    df_clean['bounce_rate'] = df_clean['bounce_rate'].clip(0, 1)
    
    # 标准化设备类型
    df_clean['device_type'] = df_clean['device_type'].str.lower()
    df_clean['device_type'] = df_clean['device_type'].replace('', 'unknown')
    
    # 处理时间格式
    df_clean['last_visit'] = pd.to_datetime(df_clean['last_visit'], errors='coerce')
    
    return df_clean

behavior_cleaned = clean_user_behavior(behavior_df)
print("清洗后的用户行为数据:")
print(behavior_cleaned)

9. 性能优化技巧

9.1 大数据集的清洗策略

python
# 分块处理大文件
def clean_large_dataset(file_path, chunk_size=10000):
    """
    分块处理大数据集
    """
    cleaned_chunks = []
    
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        # 应用清洗逻辑
        chunk_cleaned = clean_data_chunk(chunk)
        cleaned_chunks.append(chunk_cleaned)
    
    # 合并所有清洗后的块
    return pd.concat(cleaned_chunks, ignore_index=True)

def clean_data_chunk(chunk):
    """
    清洗单个数据块
    """
    # 基本清洗操作
    chunk = chunk.dropna(how='all')  # 删除全空行
    chunk = chunk.drop_duplicates()  # 删除重复行
    
    # 数据类型优化
    for col in chunk.select_dtypes(include=['object']).columns:
        if chunk[col].nunique() / len(chunk) < 0.5:  # 如果唯一值比例小于50%
            chunk[col] = chunk[col].astype('category')
    
    return chunk

9.2 内存优化

python
# 优化数据类型以节省内存
def optimize_dtypes(df):
    """
    优化DataFrame的数据类型
    """
    df_optimized = df.copy()
    
    # 优化整数类型
    for col in df_optimized.select_dtypes(include=['int64']).columns:
        col_min = df_optimized[col].min()
        col_max = df_optimized[col].max()
        
        if col_min >= 0:
            if col_max < 255:
                df_optimized[col] = df_optimized[col].astype('uint8')
            elif col_max < 65535:
                df_optimized[col] = df_optimized[col].astype('uint16')
            elif col_max < 4294967295:
                df_optimized[col] = df_optimized[col].astype('uint32')
        else:
            if col_min > -128 and col_max < 127:
                df_optimized[col] = df_optimized[col].astype('int8')
            elif col_min > -32768 and col_max < 32767:
                df_optimized[col] = df_optimized[col].astype('int16')
            elif col_min > -2147483648 and col_max < 2147483647:
                df_optimized[col] = df_optimized[col].astype('int32')
    
    # 优化浮点类型
    for col in df_optimized.select_dtypes(include=['float64']).columns:
        df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='float')
    
    return df_optimized

# 应用优化
df_optimized = optimize_dtypes(df_cleaned)
print(f"原始内存使用: {df_cleaned.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"优化后内存使用: {df_optimized.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

10. 最佳实践和注意事项

10.1 数据清洗最佳实践

  1. 保留原始数据:始终保留原始数据的备份
  2. 记录清洗步骤:详细记录每个清洗步骤和决策理由
  3. 验证清洗结果:清洗后要验证数据的完整性和正确性
  4. 渐进式清洗:分步骤进行清洗,每步都要检查结果
  5. 业务理解:清洗决策要基于对业务的理解

10.2 常见陷阱和注意事项

python
# 常见错误示例和正确做法

# 错误:直接删除所有缺失值
# df_wrong = df.dropna()  # 可能删除太多有用数据

# 正确:根据业务逻辑选择性处理
df_correct = df.copy()
# 只删除关键字段的缺失值
df_correct = df_correct.dropna(subset=['name', 'department'])
# 其他字段用合适的方法填充
df_correct['age'].fillna(df_correct['age'].median(), inplace=True)

# 错误:盲目删除异常值
# df_wrong = df[df['salary'] < df['salary'].quantile(0.95)]  # 可能删除合理的高薪

# 正确:结合业务逻辑判断
# 高管薪资可能确实很高,需要单独考虑
high_salary_threshold = 50000
df_correct = df[df['salary'] <= high_salary_threshold]

本章小结

数据清洗是数据分析的基础工作,本章介绍了:

  1. 缺失值处理:识别、删除和填充缺失值的各种方法
  2. 重复值处理:发现和处理重复数据
  3. 异常值处理:使用统计方法识别和处理异常值
  4. 数据类型转换:确保数据类型的正确性
  5. 数据验证:建立数据质量检查机制
  6. 综合清洗流程:构建完整的数据清洗管道
  7. 性能优化:处理大数据集的策略和内存优化技巧

掌握这些数据清洗技能,能够帮助你处理现实中的脏数据,为后续的数据分析奠定坚实基础。在实际应用中,要根据具体的业务场景和数据特点,选择合适的清洗策略。

练习题

  1. 创建一个包含各种数据质量问题的DataFrame,并实现完整的清洗流程
  2. 编写一个函数来自动检测和报告数据质量问题
  3. 实现一个可配置的数据清洗管道,支持不同的清洗策略
  4. 处理一个真实的CSV文件,记录清洗过程和结果
  5. 比较不同缺失值填充方法对后续分析结果的影响

下一章我们将学习 Pandas 的常用函数,这些函数将帮助我们更高效地处理和分析数据。