Skip to content

Pandas CSV与Excel处理

CSV 和 Excel 文件是数据分析中最常见的数据格式。本章将详细介绍如何使用 Pandas 读取、处理和保存这些文件格式的数据。

📚 文件格式概述

CSV 文件特点

  • 逗号分隔值:Comma-Separated Values
  • 纯文本格式:可用任何文本编辑器打开
  • 跨平台兼容:几乎所有系统都支持
  • 文件小巧:相比 Excel 文件更节省空间
  • 处理速度快:读写效率高

Excel 文件特点

  • 多工作表:一个文件可包含多个工作表
  • 格式丰富:支持字体、颜色、公式等
  • 数据类型保持:自动识别数字、日期等类型
  • 商业标准:广泛用于商业环境
  • 文件较大:包含格式信息,文件相对较大

📖 CSV 文件处理

读取 CSV 文件

基本读取

python
import pandas as pd
import numpy as np

# 创建示例 CSV 数据(模拟文件内容)
csv_content = """姓名,年龄,城市,薪资
张三,25,北京,8000
李四,30,上海,12000
王五,35,广州,15000
赵六,28,深圳,9500
钱七,32,杭州,11000"""

# 将内容写入文件(实际使用中,您已经有现成的 CSV 文件)
with open('employees.csv', 'w', encoding='utf-8') as f:
    f.write(csv_content)

# 读取 CSV 文件
df = pd.read_csv('employees.csv')
print("基本读取结果:")
print(df)
print(f"\n数据类型:\n{df.dtypes}")

指定参数读取

python
# 指定编码
df_encoding = pd.read_csv('employees.csv', encoding='utf-8')

# 指定分隔符(如果不是逗号)
# df_tab = pd.read_csv('data.txt', sep='\t')  # 制表符分隔
# df_semicolon = pd.read_csv('data.csv', sep=';')  # 分号分隔

# 指定索引列
df_index = pd.read_csv('employees.csv', index_col='姓名')
print("指定索引列:")
print(df_index)

# 选择特定列
df_selected = pd.read_csv('employees.csv', usecols=['姓名', '薪资'])
print("\n选择特定列:")
print(df_selected)

# 跳过行
df_skip = pd.read_csv('employees.csv', skiprows=1)  # 跳过第一行
print("\n跳过第一行:")
print(df_skip)

处理缺失值和特殊情况

python
# 创建包含缺失值的 CSV
csv_with_nan = """姓名,年龄,城市,薪资
张三,25,北京,8000
李四,,上海,12000
王五,35,,15000
赵六,28,深圳,
钱七,32,杭州,11000"""

with open('employees_nan.csv', 'w', encoding='utf-8') as f:
    f.write(csv_with_nan)

# 读取时处理缺失值
df_nan = pd.read_csv('employees_nan.csv')
print("包含缺失值的数据:")
print(df_nan)
print(f"\n缺失值统计:\n{df_nan.isnull().sum()}")

# 指定缺失值标识
df_na_values = pd.read_csv('employees_nan.csv', na_values=['', 'NULL', 'N/A'])
print("\n指定缺失值标识后:")
print(df_na_values)

数据类型指定

python
# 指定数据类型
dtype_dict = {
    '姓名': 'string',
    '年龄': 'Int64',  # 可空整数类型
    '城市': 'category',
    '薪资': 'float64'
}

df_typed = pd.read_csv('employees.csv', dtype=dtype_dict)
print("指定数据类型后:")
print(df_typed.dtypes)
print(df_typed.info())

写入 CSV 文件

基本写入

python
# 创建示例数据
data = {
    '产品名称': ['笔记本电脑', '台式机', '平板电脑', '智能手机'],
    '价格': [5999, 3999, 2999, 1999],
    '库存': [50, 30, 80, 120],
    '分类': ['电脑', '电脑', '电脑', '手机']
}

products_df = pd.DataFrame(data)

# 保存为 CSV
products_df.to_csv('products.csv', index=False, encoding='utf-8')
print("数据已保存到 products.csv")

# 验证保存结果
saved_df = pd.read_csv('products.csv')
print("\n保存的数据:")
print(saved_df)

高级写入选项

python
# 保留索引
products_df.to_csv('products_with_index.csv', encoding='utf-8')

# 指定分隔符
products_df.to_csv('products_tab.txt', sep='\t', index=False)

# 选择特定列
products_df[['产品名称', '价格']].to_csv('products_simple.csv', index=False)

# 追加模式(注意:需要先创建文件)
new_product = pd.DataFrame({
    '产品名称': ['智能手表'],
    '价格': [1299],
    '库存': [60],
    '分类': ['电子产品']
})

# 追加到现有文件
new_product.to_csv('products.csv', mode='a', header=False, index=False)

print("\n追加数据后:")
print(pd.read_csv('products.csv'))

📊 Excel 文件处理

安装依赖

python
# 需要安装额外的库来处理 Excel 文件
# pip install openpyxl  # 用于 .xlsx 文件
# pip install xlrd      # 用于 .xls 文件
# pip install xlsxwriter # 用于写入 Excel 文件

读取 Excel 文件

基本读取

python
# 首先创建一个 Excel 文件用于演示
with pd.ExcelWriter('company_data.xlsx', engine='openpyxl') as writer:
    # 员工信息表
    employees = pd.DataFrame({
        '员工ID': ['E001', 'E002', 'E003', 'E004', 'E005'],
        '姓名': ['张三', '李四', '王五', '赵六', '钱七'],
        '部门': ['技术部', '销售部', '技术部', '人事部', '财务部'],
        '薪资': [8000, 12000, 15000, 9500, 11000],
        '入职日期': pd.date_range('2020-01-01', periods=5, freq='3M')
    })
    
    # 部门信息表
    departments = pd.DataFrame({
        '部门名称': ['技术部', '销售部', '人事部', '财务部'],
        '部门经理': ['张经理', '李经理', '王经理', '赵经理'],
        '员工数量': [15, 8, 5, 6]
    })
    
    employees.to_excel(writer, sheet_name='员工信息', index=False)
    departments.to_excel(writer, sheet_name='部门信息', index=False)

print("Excel 文件已创建")

# 读取 Excel 文件(默认读取第一个工作表)
df_excel = pd.read_excel('company_data.xlsx')
print("\n读取第一个工作表:")
print(df_excel)

读取特定工作表

python
# 读取指定工作表
employees_sheet = pd.read_excel('company_data.xlsx', sheet_name='员工信息')
departments_sheet = pd.read_excel('company_data.xlsx', sheet_name='部门信息')

print("员工信息表:")
print(employees_sheet)
print("\n部门信息表:")
print(departments_sheet)

# 读取多个工作表
all_sheets = pd.read_excel('company_data.xlsx', sheet_name=None)
print(f"\n工作表名称: {list(all_sheets.keys())}")

# 读取指定的多个工作表
selected_sheets = pd.read_excel('company_data.xlsx', 
                               sheet_name=['员工信息', '部门信息'])
print(f"\n选择的工作表: {list(selected_sheets.keys())}")

高级读取选项

python
# 指定读取范围
df_range = pd.read_excel('company_data.xlsx', 
                        sheet_name='员工信息',
                        usecols='A:D',  # 只读取 A 到 D 列
                        nrows=3)        # 只读取前 3 行数据
print("指定范围读取:")
print(df_range)

# 跳过行和指定标题行
df_skip = pd.read_excel('company_data.xlsx',
                       sheet_name='员工信息',
                       skiprows=1,     # 跳过第一行
                       header=0)       # 第二行作为标题
print("\n跳过行后:")
print(df_skip)

写入 Excel 文件

基本写入

python
# 创建销售数据
sales_data = pd.DataFrame({
    '月份': pd.date_range('2024-01-01', periods=12, freq='M'),
    '销售额': [120000, 135000, 142000, 158000, 163000, 171000,
              185000, 192000, 178000, 165000, 155000, 148000],
    '成本': [80000, 85000, 90000, 95000, 98000, 102000,
            108000, 115000, 110000, 105000, 98000, 92000]
})

sales_data['利润'] = sales_data['销售额'] - sales_data['成本']
sales_data['利润率'] = (sales_data['利润'] / sales_data['销售额'] * 100).round(2)

# 保存到 Excel
sales_data.to_excel('sales_report.xlsx', sheet_name='月度销售', index=False)
print("销售报告已保存")

多工作表写入

python
# 创建多个数据表
q1_data = sales_data.iloc[:3].copy()
q2_data = sales_data.iloc[3:6].copy()
q3_data = sales_data.iloc[6:9].copy()
q4_data = sales_data.iloc[9:12].copy()

# 写入多个工作表
with pd.ExcelWriter('quarterly_report.xlsx', engine='openpyxl') as writer:
    q1_data.to_excel(writer, sheet_name='Q1', index=False)
    q2_data.to_excel(writer, sheet_name='Q2', index=False)
    q3_data.to_excel(writer, sheet_name='Q3', index=False)
    q4_data.to_excel(writer, sheet_name='Q4', index=False)
    
    # 年度汇总
    annual_summary = pd.DataFrame({
        '季度': ['Q1', 'Q2', 'Q3', 'Q4'],
        '销售额': [q1_data['销售额'].sum(), q2_data['销售额'].sum(),
                 q3_data['销售额'].sum(), q4_data['销售额'].sum()],
        '利润': [q1_data['利润'].sum(), q2_data['利润'].sum(),
               q3_data['利润'].sum(), q4_data['利润'].sum()]
    })
    
    annual_summary.to_excel(writer, sheet_name='年度汇总', index=False)

print("季度报告已保存")

格式化 Excel 输出

python
# 使用 xlsxwriter 引擎进行格式化
with pd.ExcelWriter('formatted_report.xlsx', engine='xlsxwriter') as writer:
    sales_data.to_excel(writer, sheet_name='销售数据', index=False)
    
    # 获取工作簿和工作表对象
    workbook = writer.book
    worksheet = writer.sheets['销售数据']
    
    # 定义格式
    header_format = workbook.add_format({
        'bold': True,
        'text_wrap': True,
        'valign': 'top',
        'fg_color': '#D7E4BC',
        'border': 1
    })
    
    currency_format = workbook.add_format({'num_format': '¥#,##0'})
    percent_format = workbook.add_format({'num_format': '0.00%'})
    date_format = workbook.add_format({'num_format': 'yyyy-mm-dd'})
    
    # 应用格式
    for col_num, value in enumerate(sales_data.columns.values):
        worksheet.write(0, col_num, value, header_format)
    
    # 设置列宽
    worksheet.set_column('A:A', 12)  # 月份列
    worksheet.set_column('B:D', 15)  # 数值列
    
    # 格式化数据列
    worksheet.set_column('B:C', 15, currency_format)  # 销售额和成本
    worksheet.set_column('D:D', 15, currency_format)  # 利润
    
print("格式化报告已保存")

🔧 高级文件处理技巧

大文件处理

分块读取

python
# 创建大文件示例
large_data = pd.DataFrame({
    'ID': range(1, 10001),
    '数值1': np.random.randn(10000),
    '数值2': np.random.randn(10000),
    '分类': np.random.choice(['A', 'B', 'C'], 10000)
})

large_data.to_csv('large_file.csv', index=False)

# 分块读取大文件
chunk_size = 1000
results = []

for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    # 对每个块进行处理
    processed_chunk = chunk[chunk['数值1'] > 0]  # 示例处理:筛选正数
    results.append(processed_chunk)

# 合并结果
final_result = pd.concat(results, ignore_index=True)
print(f"原始数据: {len(large_data)} 行")
print(f"处理后数据: {len(final_result)} 行")

内存优化读取

python
# 优化数据类型以节省内存
dtype_optimization = {
    'ID': 'int32',
    '数值1': 'float32',
    '数值2': 'float32',
    '分类': 'category'
}

# 比较内存使用
df_default = pd.read_csv('large_file.csv')
df_optimized = pd.read_csv('large_file.csv', dtype=dtype_optimization)

print(f"默认类型内存使用: {df_default.memory_usage(deep=True).sum()} bytes")
print(f"优化类型内存使用: {df_optimized.memory_usage(deep=True).sum()} bytes")
print(f"内存节省: {(1 - df_optimized.memory_usage(deep=True).sum()/df_default.memory_usage(deep=True).sum())*100:.1f}%")

文件格式转换

CSV 与 Excel 互转

python
# CSV 转 Excel
csv_df = pd.read_csv('employees.csv')
csv_df.to_excel('employees_from_csv.xlsx', index=False)

# Excel 转 CSV
excel_df = pd.read_excel('company_data.xlsx', sheet_name='员工信息')
excel_df.to_csv('employees_from_excel.csv', index=False)

print("文件格式转换完成")

批量处理多个文件

python
import os
import glob

# 创建多个 CSV 文件用于演示
for i in range(3):
    sample_data = pd.DataFrame({
        '日期': pd.date_range(f'2024-0{i+1}-01', periods=5, freq='D'),
        '销量': np.random.randint(50, 200, 5),
        '收入': np.random.randint(1000, 5000, 5)
    })
    sample_data.to_csv(f'sales_month_{i+1}.csv', index=False)

# 批量读取并合并
csv_files = glob.glob('sales_month_*.csv')
all_data = []

for file in csv_files:
    df = pd.read_csv(file)
    df['来源文件'] = file
    all_data.append(df)

combined_data = pd.concat(all_data, ignore_index=True)
print("合并后的数据:")
print(combined_data.head(10))

# 保存合并结果
combined_data.to_excel('combined_sales.xlsx', index=False)
print("\n合并数据已保存到 Excel")

🎨 实际应用示例

示例1:财务报表处理

python
# 创建财务数据
financial_data = {
    '科目': ['营业收入', '营业成本', '销售费用', '管理费用', '财务费用', '营业利润'],
    '本月': [1000000, 600000, 80000, 120000, 15000, 185000],
    '上月': [950000, 570000, 75000, 115000, 12000, 178000],
    '去年同期': [900000, 540000, 70000, 110000, 10000, 170000]
}

financial_df = pd.DataFrame(financial_data)

# 计算增长率
financial_df['环比增长率'] = ((financial_df['本月'] - financial_df['上月']) / financial_df['上月'] * 100).round(2)
financial_df['同比增长率'] = ((financial_df['本月'] - financial_df['去年同期']) / financial_df['去年同期'] * 100).round(2)

print("财务报表:")
print(financial_df)

# 保存到 Excel 并格式化
with pd.ExcelWriter('financial_report.xlsx', engine='xlsxwriter') as writer:
    financial_df.to_excel(writer, sheet_name='财务报表', index=False)
    
    workbook = writer.book
    worksheet = writer.sheets['财务报表']
    
    # 格式定义
    header_format = workbook.add_format({
        'bold': True, 'bg_color': '#4F81BD', 'font_color': 'white'
    })
    
    currency_format = workbook.add_format({'num_format': '¥#,##0'})
    percent_format = workbook.add_format({'num_format': '0.00%'})
    
    # 应用格式
    for col_num, value in enumerate(financial_df.columns.values):
        worksheet.write(0, col_num, value, header_format)
    
    # 格式化数值列
    worksheet.set_column('B:D', 15, currency_format)
    worksheet.set_column('E:F', 12, percent_format)

print("\n财务报表已保存")

示例2:销售数据分析

python
# 创建销售明细数据
sales_detail = pd.DataFrame({
    '订单号': [f'ORD{str(i).zfill(4)}' for i in range(1, 101)],
    '客户': np.random.choice(['客户A', '客户B', '客户C', '客户D', '客户E'], 100),
    '产品': np.random.choice(['产品1', '产品2', '产品3', '产品4'], 100),
    '数量': np.random.randint(1, 20, 100),
    '单价': np.random.choice([100, 150, 200, 250, 300], 100),
    '日期': pd.date_range('2024-01-01', periods=100, freq='D')
})

sales_detail['金额'] = sales_detail['数量'] * sales_detail['单价']

# 数据分析
customer_summary = sales_detail.groupby('客户').agg({
    '订单号': 'count',
    '数量': 'sum',
    '金额': 'sum'
}).rename(columns={'订单号': '订单数量'})

product_summary = sales_detail.groupby('产品').agg({
    '订单号': 'count',
    '数量': 'sum',
    '金额': 'sum'
}).rename(columns={'订单号': '订单数量'})

# 月度汇总
sales_detail['月份'] = sales_detail['日期'].dt.to_period('M')
monthly_summary = sales_detail.groupby('月份').agg({
    '订单号': 'count',
    '金额': 'sum'
}).rename(columns={'订单号': '订单数量'})

# 保存分析结果
with pd.ExcelWriter('sales_analysis.xlsx') as writer:
    sales_detail.to_excel(writer, sheet_name='销售明细', index=False)
    customer_summary.to_excel(writer, sheet_name='客户分析')
    product_summary.to_excel(writer, sheet_name='产品分析')
    monthly_summary.to_excel(writer, sheet_name='月度汇总')

print("销售分析报告已生成")
print("\n客户分析:")
print(customer_summary.sort_values('金额', ascending=False))

示例3:数据清洗和标准化

python
# 创建需要清洗的数据
dirty_data = pd.DataFrame({
    '姓名': ['张三', ' 李四 ', 'WANG WU', '赵六', '钱七'],
    '电话': ['138-0000-0000', '139 0000 0000', '(140)0000-0000', '141-0000-0000', '142.0000.0000'],
    '邮箱': ['zhang@email.com', 'LI@EMAIL.COM', 'wang@Email.Com', 'zhao@email.com', 'qian@email.com'],
    '年龄': ['25', '30岁', '35', 'unknown', '28'],
    '薪资': ['8000', '12,000', '15000元', 'N/A', '9500']
})

print("原始数据:")
print(dirty_data)

# 数据清洗
cleaned_data = dirty_data.copy()

# 清洗姓名:去除空格,统一大小写
cleaned_data['姓名'] = cleaned_data['姓名'].str.strip().str.title()

# 清洗电话:统一格式
cleaned_data['电话'] = cleaned_data['电话'].str.replace(r'[^\d]', '', regex=True)
cleaned_data['电话'] = cleaned_data['电话'].str.replace(r'(\d{3})(\d{4})(\d{4})', r'\1-\2-\3', regex=True)

# 清洗邮箱:统一小写
cleaned_data['邮箱'] = cleaned_data['邮箱'].str.lower()

# 清洗年龄:提取数字
cleaned_data['年龄'] = cleaned_data['年龄'].str.extract(r'(\d+)').astype(float)

# 清洗薪资:提取数字,处理千分位
cleaned_data['薪资'] = cleaned_data['薪资'].str.replace(r'[^\d,]', '', regex=True)
cleaned_data['薪资'] = cleaned_data['薪资'].str.replace(',', '')
cleaned_data['薪资'] = pd.to_numeric(cleaned_data['薪资'], errors='coerce')

print("\n清洗后数据:")
print(cleaned_data)

# 保存清洗结果
with pd.ExcelWriter('data_cleaning_result.xlsx') as writer:
    dirty_data.to_excel(writer, sheet_name='原始数据', index=False)
    cleaned_data.to_excel(writer, sheet_name='清洗后数据', index=False)
    
    # 数据质量报告
    quality_report = pd.DataFrame({
        '字段': cleaned_data.columns,
        '缺失值数量': [cleaned_data[col].isnull().sum() for col in cleaned_data.columns],
        '缺失值比例': [f"{cleaned_data[col].isnull().sum()/len(cleaned_data)*100:.1f}%" for col in cleaned_data.columns]
    })
    
    quality_report.to_excel(writer, sheet_name='数据质量报告', index=False)

print("\n数据清洗报告已保存")

🚀 性能优化技巧

读取性能优化

python
import time

# 创建测试数据
test_data = pd.DataFrame({
    'col1': range(100000),
    'col2': np.random.randn(100000),
    'col3': np.random.choice(['A', 'B', 'C'], 100000)
})

test_data.to_csv('performance_test.csv', index=False)

# 方法1:默认读取
start_time = time.time()
df1 = pd.read_csv('performance_test.csv')
default_time = time.time() - start_time

# 方法2:指定数据类型
start_time = time.time()
df2 = pd.read_csv('performance_test.csv', dtype={
    'col1': 'int32',
    'col2': 'float32',
    'col3': 'category'
})
typed_time = time.time() - start_time

# 方法3:使用 pyarrow 引擎(如果安装了)
try:
    start_time = time.time()
    df3 = pd.read_csv('performance_test.csv', engine='pyarrow')
    pyarrow_time = time.time() - start_time
except:
    pyarrow_time = None

print(f"默认读取时间: {default_time:.4f} 秒")
print(f"指定类型读取时间: {typed_time:.4f} 秒")
if pyarrow_time:
    print(f"PyArrow 引擎读取时间: {pyarrow_time:.4f} 秒")

print(f"\n内存使用对比:")
print(f"默认类型: {df1.memory_usage(deep=True).sum()} bytes")
print(f"优化类型: {df2.memory_usage(deep=True).sum()} bytes")

写入性能优化

python
# 比较不同写入方法的性能
large_df = pd.DataFrame({
    'A': range(50000),
    'B': np.random.randn(50000),
    'C': np.random.choice(['X', 'Y', 'Z'], 50000)
})

# 方法1:标准 CSV 写入
start_time = time.time()
large_df.to_csv('standard_write.csv', index=False)
standard_time = time.time() - start_time

# 方法2:压缩写入
start_time = time.time()
large_df.to_csv('compressed_write.csv.gz', index=False, compression='gzip')
compressed_time = time.time() - start_time

# 方法3:Parquet 格式(如果需要频繁读取)
start_time = time.time()
large_df.to_parquet('data.parquet', index=False)
parquet_time = time.time() - start_time

print(f"标准 CSV 写入时间: {standard_time:.4f} 秒")
print(f"压缩 CSV 写入时间: {compressed_time:.4f} 秒")
print(f"Parquet 写入时间: {parquet_time:.4f} 秒")

# 文件大小对比
import os
print(f"\n文件大小对比:")
print(f"标准 CSV: {os.path.getsize('standard_write.csv')} bytes")
print(f"压缩 CSV: {os.path.getsize('compressed_write.csv.gz')} bytes")
print(f"Parquet: {os.path.getsize('data.parquet')} bytes")

🔍 错误处理和调试

常见错误处理

python
# 处理编码错误
def safe_read_csv(filename, encodings=['utf-8', 'gbk', 'gb2312', 'latin1']):
    """安全读取 CSV 文件,自动尝试不同编码"""
    for encoding in encodings:
        try:
            df = pd.read_csv(filename, encoding=encoding)
            print(f"成功使用 {encoding} 编码读取文件")
            return df
        except UnicodeDecodeError:
            print(f"{encoding} 编码失败,尝试下一个...")
            continue
    
    raise ValueError("所有编码都失败了")

# 处理文件不存在错误
def safe_read_file(filename):
    """安全读取文件"""
    try:
        if filename.endswith('.csv'):
            return pd.read_csv(filename)
        elif filename.endswith(('.xlsx', '.xls')):
            return pd.read_excel(filename)
        else:
            raise ValueError("不支持的文件格式")
    except FileNotFoundError:
        print(f"文件 {filename} 不存在")
        return None
    except Exception as e:
        print(f"读取文件时发生错误: {e}")
        return None

# 测试错误处理
result = safe_read_file('nonexistent.csv')
if result is not None:
    print(result.head())
else:
    print("文件读取失败")

📝 本章小结

通过本章学习,您应该已经掌握:

CSV 文件处理:读取、写入和各种参数设置
Excel 文件处理:多工作表操作和格式化
大文件处理:分块读取和内存优化
文件格式转换:CSV 与 Excel 互转
批量处理:处理多个文件
数据清洗:处理脏数据和标准化
性能优化:提高读写效率
错误处理:处理常见的文件操作错误

关键要点

  1. 选择合适的文件格式:CSV 适合简单数据,Excel 适合复杂报表
  2. 优化数据类型:可以显著节省内存和提高性能
  3. 处理大文件:使用分块读取避免内存溢出
  4. 数据清洗的重要性:确保数据质量是分析的基础
  5. 错误处理:编写健壮的代码处理各种异常情况

最佳实践

  • 读取文件前先了解数据结构
  • 指定合适的数据类型以优化性能
  • 处理缺失值和异常数据
  • 使用压缩格式节省存储空间
  • 为重要操作添加错误处理

下一步

现在您已经掌握了文件处理技能,接下来将学习 Pandas 的数据清洗技术。


下一章:Pandas 数据清洗