Pandas CSV与Excel处理
CSV 和 Excel 文件是数据分析中最常见的数据格式。本章将详细介绍如何使用 Pandas 读取、处理和保存这些文件格式的数据。
📚 文件格式概述
CSV 文件特点
- 逗号分隔值:Comma-Separated Values
- 纯文本格式:可用任何文本编辑器打开
- 跨平台兼容:几乎所有系统都支持
- 文件小巧:相比 Excel 文件更节省空间
- 处理速度快:读写效率高
Excel 文件特点
- 多工作表:一个文件可包含多个工作表
- 格式丰富:支持字体、颜色、公式等
- 数据类型保持:自动识别数字、日期等类型
- 商业标准:广泛用于商业环境
- 文件较大:包含格式信息,文件相对较大
📖 CSV 文件处理
读取 CSV 文件
基本读取
python
import pandas as pd
import numpy as np
# 创建示例 CSV 数据(模拟文件内容)
csv_content = """姓名,年龄,城市,薪资
张三,25,北京,8000
李四,30,上海,12000
王五,35,广州,15000
赵六,28,深圳,9500
钱七,32,杭州,11000"""
# 将内容写入文件(实际使用中,您已经有现成的 CSV 文件)
with open('employees.csv', 'w', encoding='utf-8') as f:
f.write(csv_content)
# 读取 CSV 文件
df = pd.read_csv('employees.csv')
print("基本读取结果:")
print(df)
print(f"\n数据类型:\n{df.dtypes}")指定参数读取
python
# 指定编码
df_encoding = pd.read_csv('employees.csv', encoding='utf-8')
# 指定分隔符(如果不是逗号)
# df_tab = pd.read_csv('data.txt', sep='\t') # 制表符分隔
# df_semicolon = pd.read_csv('data.csv', sep=';') # 分号分隔
# 指定索引列
df_index = pd.read_csv('employees.csv', index_col='姓名')
print("指定索引列:")
print(df_index)
# 选择特定列
df_selected = pd.read_csv('employees.csv', usecols=['姓名', '薪资'])
print("\n选择特定列:")
print(df_selected)
# 跳过行
df_skip = pd.read_csv('employees.csv', skiprows=1) # 跳过第一行
print("\n跳过第一行:")
print(df_skip)处理缺失值和特殊情况
python
# 创建包含缺失值的 CSV
csv_with_nan = """姓名,年龄,城市,薪资
张三,25,北京,8000
李四,,上海,12000
王五,35,,15000
赵六,28,深圳,
钱七,32,杭州,11000"""
with open('employees_nan.csv', 'w', encoding='utf-8') as f:
f.write(csv_with_nan)
# 读取时处理缺失值
df_nan = pd.read_csv('employees_nan.csv')
print("包含缺失值的数据:")
print(df_nan)
print(f"\n缺失值统计:\n{df_nan.isnull().sum()}")
# 指定缺失值标识
df_na_values = pd.read_csv('employees_nan.csv', na_values=['', 'NULL', 'N/A'])
print("\n指定缺失值标识后:")
print(df_na_values)数据类型指定
python
# 指定数据类型
dtype_dict = {
'姓名': 'string',
'年龄': 'Int64', # 可空整数类型
'城市': 'category',
'薪资': 'float64'
}
df_typed = pd.read_csv('employees.csv', dtype=dtype_dict)
print("指定数据类型后:")
print(df_typed.dtypes)
print(df_typed.info())写入 CSV 文件
基本写入
python
# 创建示例数据
data = {
'产品名称': ['笔记本电脑', '台式机', '平板电脑', '智能手机'],
'价格': [5999, 3999, 2999, 1999],
'库存': [50, 30, 80, 120],
'分类': ['电脑', '电脑', '电脑', '手机']
}
products_df = pd.DataFrame(data)
# 保存为 CSV
products_df.to_csv('products.csv', index=False, encoding='utf-8')
print("数据已保存到 products.csv")
# 验证保存结果
saved_df = pd.read_csv('products.csv')
print("\n保存的数据:")
print(saved_df)高级写入选项
python
# 保留索引
products_df.to_csv('products_with_index.csv', encoding='utf-8')
# 指定分隔符
products_df.to_csv('products_tab.txt', sep='\t', index=False)
# 选择特定列
products_df[['产品名称', '价格']].to_csv('products_simple.csv', index=False)
# 追加模式(注意:需要先创建文件)
new_product = pd.DataFrame({
'产品名称': ['智能手表'],
'价格': [1299],
'库存': [60],
'分类': ['电子产品']
})
# 追加到现有文件
new_product.to_csv('products.csv', mode='a', header=False, index=False)
print("\n追加数据后:")
print(pd.read_csv('products.csv'))📊 Excel 文件处理
安装依赖
python
# 需要安装额外的库来处理 Excel 文件
# pip install openpyxl # 用于 .xlsx 文件
# pip install xlrd # 用于 .xls 文件
# pip install xlsxwriter # 用于写入 Excel 文件读取 Excel 文件
基本读取
python
# 首先创建一个 Excel 文件用于演示
with pd.ExcelWriter('company_data.xlsx', engine='openpyxl') as writer:
# 员工信息表
employees = pd.DataFrame({
'员工ID': ['E001', 'E002', 'E003', 'E004', 'E005'],
'姓名': ['张三', '李四', '王五', '赵六', '钱七'],
'部门': ['技术部', '销售部', '技术部', '人事部', '财务部'],
'薪资': [8000, 12000, 15000, 9500, 11000],
'入职日期': pd.date_range('2020-01-01', periods=5, freq='3M')
})
# 部门信息表
departments = pd.DataFrame({
'部门名称': ['技术部', '销售部', '人事部', '财务部'],
'部门经理': ['张经理', '李经理', '王经理', '赵经理'],
'员工数量': [15, 8, 5, 6]
})
employees.to_excel(writer, sheet_name='员工信息', index=False)
departments.to_excel(writer, sheet_name='部门信息', index=False)
print("Excel 文件已创建")
# 读取 Excel 文件(默认读取第一个工作表)
df_excel = pd.read_excel('company_data.xlsx')
print("\n读取第一个工作表:")
print(df_excel)读取特定工作表
python
# 读取指定工作表
employees_sheet = pd.read_excel('company_data.xlsx', sheet_name='员工信息')
departments_sheet = pd.read_excel('company_data.xlsx', sheet_name='部门信息')
print("员工信息表:")
print(employees_sheet)
print("\n部门信息表:")
print(departments_sheet)
# 读取多个工作表
all_sheets = pd.read_excel('company_data.xlsx', sheet_name=None)
print(f"\n工作表名称: {list(all_sheets.keys())}")
# 读取指定的多个工作表
selected_sheets = pd.read_excel('company_data.xlsx',
sheet_name=['员工信息', '部门信息'])
print(f"\n选择的工作表: {list(selected_sheets.keys())}")高级读取选项
python
# 指定读取范围
df_range = pd.read_excel('company_data.xlsx',
sheet_name='员工信息',
usecols='A:D', # 只读取 A 到 D 列
nrows=3) # 只读取前 3 行数据
print("指定范围读取:")
print(df_range)
# 跳过行和指定标题行
df_skip = pd.read_excel('company_data.xlsx',
sheet_name='员工信息',
skiprows=1, # 跳过第一行
header=0) # 第二行作为标题
print("\n跳过行后:")
print(df_skip)写入 Excel 文件
基本写入
python
# 创建销售数据
sales_data = pd.DataFrame({
'月份': pd.date_range('2024-01-01', periods=12, freq='M'),
'销售额': [120000, 135000, 142000, 158000, 163000, 171000,
185000, 192000, 178000, 165000, 155000, 148000],
'成本': [80000, 85000, 90000, 95000, 98000, 102000,
108000, 115000, 110000, 105000, 98000, 92000]
})
sales_data['利润'] = sales_data['销售额'] - sales_data['成本']
sales_data['利润率'] = (sales_data['利润'] / sales_data['销售额'] * 100).round(2)
# 保存到 Excel
sales_data.to_excel('sales_report.xlsx', sheet_name='月度销售', index=False)
print("销售报告已保存")多工作表写入
python
# 创建多个数据表
q1_data = sales_data.iloc[:3].copy()
q2_data = sales_data.iloc[3:6].copy()
q3_data = sales_data.iloc[6:9].copy()
q4_data = sales_data.iloc[9:12].copy()
# 写入多个工作表
with pd.ExcelWriter('quarterly_report.xlsx', engine='openpyxl') as writer:
q1_data.to_excel(writer, sheet_name='Q1', index=False)
q2_data.to_excel(writer, sheet_name='Q2', index=False)
q3_data.to_excel(writer, sheet_name='Q3', index=False)
q4_data.to_excel(writer, sheet_name='Q4', index=False)
# 年度汇总
annual_summary = pd.DataFrame({
'季度': ['Q1', 'Q2', 'Q3', 'Q4'],
'销售额': [q1_data['销售额'].sum(), q2_data['销售额'].sum(),
q3_data['销售额'].sum(), q4_data['销售额'].sum()],
'利润': [q1_data['利润'].sum(), q2_data['利润'].sum(),
q3_data['利润'].sum(), q4_data['利润'].sum()]
})
annual_summary.to_excel(writer, sheet_name='年度汇总', index=False)
print("季度报告已保存")格式化 Excel 输出
python
# 使用 xlsxwriter 引擎进行格式化
with pd.ExcelWriter('formatted_report.xlsx', engine='xlsxwriter') as writer:
sales_data.to_excel(writer, sheet_name='销售数据', index=False)
# 获取工作簿和工作表对象
workbook = writer.book
worksheet = writer.sheets['销售数据']
# 定义格式
header_format = workbook.add_format({
'bold': True,
'text_wrap': True,
'valign': 'top',
'fg_color': '#D7E4BC',
'border': 1
})
currency_format = workbook.add_format({'num_format': '¥#,##0'})
percent_format = workbook.add_format({'num_format': '0.00%'})
date_format = workbook.add_format({'num_format': 'yyyy-mm-dd'})
# 应用格式
for col_num, value in enumerate(sales_data.columns.values):
worksheet.write(0, col_num, value, header_format)
# 设置列宽
worksheet.set_column('A:A', 12) # 月份列
worksheet.set_column('B:D', 15) # 数值列
# 格式化数据列
worksheet.set_column('B:C', 15, currency_format) # 销售额和成本
worksheet.set_column('D:D', 15, currency_format) # 利润
print("格式化报告已保存")🔧 高级文件处理技巧
大文件处理
分块读取
python
# 创建大文件示例
large_data = pd.DataFrame({
'ID': range(1, 10001),
'数值1': np.random.randn(10000),
'数值2': np.random.randn(10000),
'分类': np.random.choice(['A', 'B', 'C'], 10000)
})
large_data.to_csv('large_file.csv', index=False)
# 分块读取大文件
chunk_size = 1000
results = []
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
# 对每个块进行处理
processed_chunk = chunk[chunk['数值1'] > 0] # 示例处理:筛选正数
results.append(processed_chunk)
# 合并结果
final_result = pd.concat(results, ignore_index=True)
print(f"原始数据: {len(large_data)} 行")
print(f"处理后数据: {len(final_result)} 行")内存优化读取
python
# 优化数据类型以节省内存
dtype_optimization = {
'ID': 'int32',
'数值1': 'float32',
'数值2': 'float32',
'分类': 'category'
}
# 比较内存使用
df_default = pd.read_csv('large_file.csv')
df_optimized = pd.read_csv('large_file.csv', dtype=dtype_optimization)
print(f"默认类型内存使用: {df_default.memory_usage(deep=True).sum()} bytes")
print(f"优化类型内存使用: {df_optimized.memory_usage(deep=True).sum()} bytes")
print(f"内存节省: {(1 - df_optimized.memory_usage(deep=True).sum()/df_default.memory_usage(deep=True).sum())*100:.1f}%")文件格式转换
CSV 与 Excel 互转
python
# CSV 转 Excel
csv_df = pd.read_csv('employees.csv')
csv_df.to_excel('employees_from_csv.xlsx', index=False)
# Excel 转 CSV
excel_df = pd.read_excel('company_data.xlsx', sheet_name='员工信息')
excel_df.to_csv('employees_from_excel.csv', index=False)
print("文件格式转换完成")批量处理多个文件
python
import os
import glob
# 创建多个 CSV 文件用于演示
for i in range(3):
sample_data = pd.DataFrame({
'日期': pd.date_range(f'2024-0{i+1}-01', periods=5, freq='D'),
'销量': np.random.randint(50, 200, 5),
'收入': np.random.randint(1000, 5000, 5)
})
sample_data.to_csv(f'sales_month_{i+1}.csv', index=False)
# 批量读取并合并
csv_files = glob.glob('sales_month_*.csv')
all_data = []
for file in csv_files:
df = pd.read_csv(file)
df['来源文件'] = file
all_data.append(df)
combined_data = pd.concat(all_data, ignore_index=True)
print("合并后的数据:")
print(combined_data.head(10))
# 保存合并结果
combined_data.to_excel('combined_sales.xlsx', index=False)
print("\n合并数据已保存到 Excel")🎨 实际应用示例
示例1:财务报表处理
python
# 创建财务数据
financial_data = {
'科目': ['营业收入', '营业成本', '销售费用', '管理费用', '财务费用', '营业利润'],
'本月': [1000000, 600000, 80000, 120000, 15000, 185000],
'上月': [950000, 570000, 75000, 115000, 12000, 178000],
'去年同期': [900000, 540000, 70000, 110000, 10000, 170000]
}
financial_df = pd.DataFrame(financial_data)
# 计算增长率
financial_df['环比增长率'] = ((financial_df['本月'] - financial_df['上月']) / financial_df['上月'] * 100).round(2)
financial_df['同比增长率'] = ((financial_df['本月'] - financial_df['去年同期']) / financial_df['去年同期'] * 100).round(2)
print("财务报表:")
print(financial_df)
# 保存到 Excel 并格式化
with pd.ExcelWriter('financial_report.xlsx', engine='xlsxwriter') as writer:
financial_df.to_excel(writer, sheet_name='财务报表', index=False)
workbook = writer.book
worksheet = writer.sheets['财务报表']
# 格式定义
header_format = workbook.add_format({
'bold': True, 'bg_color': '#4F81BD', 'font_color': 'white'
})
currency_format = workbook.add_format({'num_format': '¥#,##0'})
percent_format = workbook.add_format({'num_format': '0.00%'})
# 应用格式
for col_num, value in enumerate(financial_df.columns.values):
worksheet.write(0, col_num, value, header_format)
# 格式化数值列
worksheet.set_column('B:D', 15, currency_format)
worksheet.set_column('E:F', 12, percent_format)
print("\n财务报表已保存")示例2:销售数据分析
python
# 创建销售明细数据
sales_detail = pd.DataFrame({
'订单号': [f'ORD{str(i).zfill(4)}' for i in range(1, 101)],
'客户': np.random.choice(['客户A', '客户B', '客户C', '客户D', '客户E'], 100),
'产品': np.random.choice(['产品1', '产品2', '产品3', '产品4'], 100),
'数量': np.random.randint(1, 20, 100),
'单价': np.random.choice([100, 150, 200, 250, 300], 100),
'日期': pd.date_range('2024-01-01', periods=100, freq='D')
})
sales_detail['金额'] = sales_detail['数量'] * sales_detail['单价']
# 数据分析
customer_summary = sales_detail.groupby('客户').agg({
'订单号': 'count',
'数量': 'sum',
'金额': 'sum'
}).rename(columns={'订单号': '订单数量'})
product_summary = sales_detail.groupby('产品').agg({
'订单号': 'count',
'数量': 'sum',
'金额': 'sum'
}).rename(columns={'订单号': '订单数量'})
# 月度汇总
sales_detail['月份'] = sales_detail['日期'].dt.to_period('M')
monthly_summary = sales_detail.groupby('月份').agg({
'订单号': 'count',
'金额': 'sum'
}).rename(columns={'订单号': '订单数量'})
# 保存分析结果
with pd.ExcelWriter('sales_analysis.xlsx') as writer:
sales_detail.to_excel(writer, sheet_name='销售明细', index=False)
customer_summary.to_excel(writer, sheet_name='客户分析')
product_summary.to_excel(writer, sheet_name='产品分析')
monthly_summary.to_excel(writer, sheet_name='月度汇总')
print("销售分析报告已生成")
print("\n客户分析:")
print(customer_summary.sort_values('金额', ascending=False))示例3:数据清洗和标准化
python
# 创建需要清洗的数据
dirty_data = pd.DataFrame({
'姓名': ['张三', ' 李四 ', 'WANG WU', '赵六', '钱七'],
'电话': ['138-0000-0000', '139 0000 0000', '(140)0000-0000', '141-0000-0000', '142.0000.0000'],
'邮箱': ['zhang@email.com', 'LI@EMAIL.COM', 'wang@Email.Com', 'zhao@email.com', 'qian@email.com'],
'年龄': ['25', '30岁', '35', 'unknown', '28'],
'薪资': ['8000', '12,000', '15000元', 'N/A', '9500']
})
print("原始数据:")
print(dirty_data)
# 数据清洗
cleaned_data = dirty_data.copy()
# 清洗姓名:去除空格,统一大小写
cleaned_data['姓名'] = cleaned_data['姓名'].str.strip().str.title()
# 清洗电话:统一格式
cleaned_data['电话'] = cleaned_data['电话'].str.replace(r'[^\d]', '', regex=True)
cleaned_data['电话'] = cleaned_data['电话'].str.replace(r'(\d{3})(\d{4})(\d{4})', r'\1-\2-\3', regex=True)
# 清洗邮箱:统一小写
cleaned_data['邮箱'] = cleaned_data['邮箱'].str.lower()
# 清洗年龄:提取数字
cleaned_data['年龄'] = cleaned_data['年龄'].str.extract(r'(\d+)').astype(float)
# 清洗薪资:提取数字,处理千分位
cleaned_data['薪资'] = cleaned_data['薪资'].str.replace(r'[^\d,]', '', regex=True)
cleaned_data['薪资'] = cleaned_data['薪资'].str.replace(',', '')
cleaned_data['薪资'] = pd.to_numeric(cleaned_data['薪资'], errors='coerce')
print("\n清洗后数据:")
print(cleaned_data)
# 保存清洗结果
with pd.ExcelWriter('data_cleaning_result.xlsx') as writer:
dirty_data.to_excel(writer, sheet_name='原始数据', index=False)
cleaned_data.to_excel(writer, sheet_name='清洗后数据', index=False)
# 数据质量报告
quality_report = pd.DataFrame({
'字段': cleaned_data.columns,
'缺失值数量': [cleaned_data[col].isnull().sum() for col in cleaned_data.columns],
'缺失值比例': [f"{cleaned_data[col].isnull().sum()/len(cleaned_data)*100:.1f}%" for col in cleaned_data.columns]
})
quality_report.to_excel(writer, sheet_name='数据质量报告', index=False)
print("\n数据清洗报告已保存")🚀 性能优化技巧
读取性能优化
python
import time
# 创建测试数据
test_data = pd.DataFrame({
'col1': range(100000),
'col2': np.random.randn(100000),
'col3': np.random.choice(['A', 'B', 'C'], 100000)
})
test_data.to_csv('performance_test.csv', index=False)
# 方法1:默认读取
start_time = time.time()
df1 = pd.read_csv('performance_test.csv')
default_time = time.time() - start_time
# 方法2:指定数据类型
start_time = time.time()
df2 = pd.read_csv('performance_test.csv', dtype={
'col1': 'int32',
'col2': 'float32',
'col3': 'category'
})
typed_time = time.time() - start_time
# 方法3:使用 pyarrow 引擎(如果安装了)
try:
start_time = time.time()
df3 = pd.read_csv('performance_test.csv', engine='pyarrow')
pyarrow_time = time.time() - start_time
except:
pyarrow_time = None
print(f"默认读取时间: {default_time:.4f} 秒")
print(f"指定类型读取时间: {typed_time:.4f} 秒")
if pyarrow_time:
print(f"PyArrow 引擎读取时间: {pyarrow_time:.4f} 秒")
print(f"\n内存使用对比:")
print(f"默认类型: {df1.memory_usage(deep=True).sum()} bytes")
print(f"优化类型: {df2.memory_usage(deep=True).sum()} bytes")写入性能优化
python
# 比较不同写入方法的性能
large_df = pd.DataFrame({
'A': range(50000),
'B': np.random.randn(50000),
'C': np.random.choice(['X', 'Y', 'Z'], 50000)
})
# 方法1:标准 CSV 写入
start_time = time.time()
large_df.to_csv('standard_write.csv', index=False)
standard_time = time.time() - start_time
# 方法2:压缩写入
start_time = time.time()
large_df.to_csv('compressed_write.csv.gz', index=False, compression='gzip')
compressed_time = time.time() - start_time
# 方法3:Parquet 格式(如果需要频繁读取)
start_time = time.time()
large_df.to_parquet('data.parquet', index=False)
parquet_time = time.time() - start_time
print(f"标准 CSV 写入时间: {standard_time:.4f} 秒")
print(f"压缩 CSV 写入时间: {compressed_time:.4f} 秒")
print(f"Parquet 写入时间: {parquet_time:.4f} 秒")
# 文件大小对比
import os
print(f"\n文件大小对比:")
print(f"标准 CSV: {os.path.getsize('standard_write.csv')} bytes")
print(f"压缩 CSV: {os.path.getsize('compressed_write.csv.gz')} bytes")
print(f"Parquet: {os.path.getsize('data.parquet')} bytes")🔍 错误处理和调试
常见错误处理
python
# 处理编码错误
def safe_read_csv(filename, encodings=['utf-8', 'gbk', 'gb2312', 'latin1']):
"""安全读取 CSV 文件,自动尝试不同编码"""
for encoding in encodings:
try:
df = pd.read_csv(filename, encoding=encoding)
print(f"成功使用 {encoding} 编码读取文件")
return df
except UnicodeDecodeError:
print(f"{encoding} 编码失败,尝试下一个...")
continue
raise ValueError("所有编码都失败了")
# 处理文件不存在错误
def safe_read_file(filename):
"""安全读取文件"""
try:
if filename.endswith('.csv'):
return pd.read_csv(filename)
elif filename.endswith(('.xlsx', '.xls')):
return pd.read_excel(filename)
else:
raise ValueError("不支持的文件格式")
except FileNotFoundError:
print(f"文件 {filename} 不存在")
return None
except Exception as e:
print(f"读取文件时发生错误: {e}")
return None
# 测试错误处理
result = safe_read_file('nonexistent.csv')
if result is not None:
print(result.head())
else:
print("文件读取失败")📝 本章小结
通过本章学习,您应该已经掌握:
✅ CSV 文件处理:读取、写入和各种参数设置
✅ Excel 文件处理:多工作表操作和格式化
✅ 大文件处理:分块读取和内存优化
✅ 文件格式转换:CSV 与 Excel 互转
✅ 批量处理:处理多个文件
✅ 数据清洗:处理脏数据和标准化
✅ 性能优化:提高读写效率
✅ 错误处理:处理常见的文件操作错误
关键要点
- 选择合适的文件格式:CSV 适合简单数据,Excel 适合复杂报表
- 优化数据类型:可以显著节省内存和提高性能
- 处理大文件:使用分块读取避免内存溢出
- 数据清洗的重要性:确保数据质量是分析的基础
- 错误处理:编写健壮的代码处理各种异常情况
最佳实践
- 读取文件前先了解数据结构
- 指定合适的数据类型以优化性能
- 处理缺失值和异常数据
- 使用压缩格式节省存储空间
- 为重要操作添加错误处理
下一步
现在您已经掌握了文件处理技能,接下来将学习 Pandas 的数据清洗技术。
下一章:Pandas 数据清洗