Module 7 小结和复习
文件操作 —— 数据的读取与存储
知识点总结
1. 文本文件操作
基本模式:
'r': 只读(默认)'w': 写入(覆盖)'a': 追加'r+': 读写
读取方式:
python
# 方法1: 一次读取全部
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 方法2: 按行读取
with open('file.txt', 'r', encoding='utf-8') as f:
for line in f:
print(line.strip())
# 方法3: 读取所有行到列表
with open('file.txt', 'r', encoding='utf-8') as f:
lines = f.readlines()写入方式:
python
# 覆盖写入
with open('output.txt', 'w', encoding='utf-8') as f:
f.write("内容\n")
# 追加写入
with open('output.txt', 'a', encoding='utf-8') as f:
f.write("追加内容\n")2. CSV 文件处理
使用 csv 模块:
python
import csv
# 写入
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Name', 'Age'])
writer.writerow(['Alice', 25])
# 读取
with open('data.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
print(row)使用 Pandas(推荐):
python
import pandas as pd
# 读取
df = pd.read_csv('data.csv', encoding='utf-8')
# 写入
df.to_csv('output.csv', index=False, encoding='utf-8-sig') # utf-8-sig 避免 Excel 乱码3. Excel 文件处理
安装依赖:
bash
pip install openpyxl # 用于 .xlsx读取 Excel:
python
import pandas as pd
# 读取单个工作表
df = pd.read_excel('data.xlsx', sheet_name='Sheet1')
# 读取多个工作表
excel_file = pd.ExcelFile('data.xlsx')
df1 = pd.read_excel(excel_file, sheet_name='2023')
df2 = pd.read_excel(excel_file, sheet_name='2024')写入 Excel:
python
# 单个工作表
df.to_excel('output.xlsx', sheet_name='Results', index=False)
# 多个工作表
with pd.ExcelWriter('report.xlsx') as writer:
df1.to_excel(writer, sheet_name='2023', index=False)
df2.to_excel(writer, sheet_name='2024', index=False)4. Stata 文件读写
读取 .dta 文件:
python
import pandas as pd
# 基本读取
df = pd.read_stata('survey.dta')
# 保留分类变量和标签
df = pd.read_stata('survey.dta', convert_categoricals=True)写入 .dta 文件:
python
# 保存为 Stata 13/14 格式
df.to_stata('output.dta', write_index=False, version=117)
# 添加变量标签
variable_labels = {
'age': 'Respondent Age',
'income': 'Annual Income'
}
df.to_stata('output.dta', write_index=False, variable_labels=variable_labels)版本对照:
- 117 = Stata 13/14
- 118 = Stata 15/16
- 119 = Stata 17
5. JSON 数据处理
基本操作:
python
import json
# Python → JSON 字符串
data = {'name': 'Alice', 'age': 25}
json_str = json.dumps(data, indent=2, ensure_ascii=False)
# JSON 字符串 → Python
data = json.loads(json_str)
# 写入文件
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# 读取文件
with open('data.json', 'r', encoding='utf-8') as f:
data = json.load(f)Pandas 与 JSON 互转:
python
# DataFrame → JSON
df.to_json('data.json', orient='records', indent=2, force_ascii=False)
# JSON → DataFrame
df = pd.read_json('data.json')6. 路径操作
使用 pathlib(推荐):
python
from pathlib import Path
# 创建路径
data_dir = Path('data')
file_path = data_dir / 'survey.csv'
# 检查存在
if file_path.exists():
print("文件存在")
# 创建目录
data_dir.mkdir(parents=True, exist_ok=True)
# 列出文件
for file in data_dir.glob('*.csv'):
print(file.name)文件格式对比
| 格式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| CSV | 小、快、通用 | 无格式、单表 | 大数据、纯数据 |
| Excel | 多表、格式 | 慢、大 | 报告、展示 |
| Stata (.dta) | 保留标签 | 社科专用 | Stata 数据互通 |
| JSON | 嵌套结构 | 较大 | API、配置 |
️ 常见错误
1. 忘记指定编码
python
# 错误:可能出现乱码
with open('file.txt', 'r') as f:
content = f.read()
# 正确:明确指定 UTF-8
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()2. 忘记关闭文件
python
# 错误:需要手动关闭
f = open('file.txt', 'r')
content = f.read()
f.close()
# 正确:使用 with 语句自动关闭
with open('file.txt', 'r') as f:
content = f.read()3. CSV 写入忘记 newline=''
python
# 错误:Windows 上会出现空行
with open('data.csv', 'w') as f:
writer = csv.writer(f)
# 正确
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)4. Excel 中文乱码
python
# 错误:Excel 打开中文乱码
df.to_csv('output.csv', index=False, encoding='utf-8')
# 正确:使用 utf-8-sig
df.to_csv('output.csv', index=False, encoding='utf-8-sig')最佳实践
1. 始终使用 with 语句
python
# 好习惯
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()2. 检查文件是否存在
python
from pathlib import Path
file_path = Path('data.csv')
if file_path.exists():
df = pd.read_csv(file_path)
else:
print("文件不存在")3. 大文件分块处理
python
# CSV 大文件
for chunk in pd.read_csv('large.csv', chunksize=10000):
process(chunk)
# Stata 大文件
for chunk in pd.read_stata('large.dta', chunksize=10000):
process(chunk)4. 保存时不包含索引
python
# 通常不需要保存索引
df.to_csv('output.csv', index=False)
df.to_excel('output.xlsx', index=False)
df.to_stata('output.dta', write_index=False)编程练习
练习 1:文本文件日志分析(基础)
难度:⭐⭐ 时间:15 分钟
分析服务器日志文件。
python
"""
任务:
1. 读取 server.log 文件
2. 统计包含 "ERROR" 的行数
3. 提取所有错误信息
4. 将结果保存到 error_report.txt
"""
# 示例日志内容
# 2024-01-15 10:30:00 INFO Server started
# 2024-01-15 10:30:15 ERROR Connection failed
# 2024-01-15 10:30:20 INFO Request received
# 2024-01-15 10:30:25 ERROR Database timeout参考答案
python
from datetime import datetime
def analyze_log(log_file):
"""分析日志文件,提取错误信息"""
error_count = 0
errors = []
# 读取日志
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
if 'ERROR' in line:
error_count += 1
errors.append(line.strip())
# 生成报告
report = []
report.append("=" * 50)
report.append("错误日志分析报告")
report.append("=" * 50)
report.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"总错误数: {error_count}\n")
report.append("错误详情:")
report.append("-" * 50)
for i, error in enumerate(errors, 1):
report.append(f"{i}. {error}")
report.append("=" * 50)
# 保存报告
with open('error_report.txt', 'w', encoding='utf-8') as f:
f.write('\n'.join(report))
print(f" 分析完成:发现 {error_count} 个错误")
print(f" 报告已保存到 error_report.txt")
return error_count, errors
# 测试:创建示例日志
sample_log = """2024-01-15 10:30:00 INFO Server started
2024-01-15 10:30:15 ERROR Connection failed to database
2024-01-15 10:30:20 INFO Request received from client
2024-01-15 10:30:25 ERROR Database timeout after 30s
2024-01-15 10:30:30 INFO Processing data
2024-01-15 10:30:35 ERROR Failed to write to disk
2024-01-15 10:30:40 INFO Task completed"""
with open('server.log', 'w', encoding='utf-8') as f:
f.write(sample_log)
# 分析
error_count, errors = analyze_log('server.log')练习 2:CSV 数据清洗与转换(基础)
难度:⭐⭐ 时间:20 分钟
python
"""
任务:
给定 students.csv 文件(包含:name, age, major, gpa)
1. 读取 CSV 文件
2. 筛选 GPA >= 3.5 的学生
3. 按专业分组统计平均 GPA
4. 保存结果到 high_performers.csv
"""参考答案
python
import pandas as pd
def clean_student_data(input_file, output_file, min_gpa=3.5):
"""清洗学生数据并生成报告"""
# 读取数据
df = pd.read_csv(input_file)
print(f"原始数据: {len(df)} 条记录")
# 筛选高 GPA 学生
high_performers = df[df['gpa'] >= min_gpa].copy()
print(f"GPA >= {min_gpa}: {len(high_performers)} 人")
# 按专业分组统计
major_stats = high_performers.groupby('major').agg({
'gpa': ['count', 'mean', 'min', 'max']
}).round(2)
print(f"\n按专业统计:")
print(major_stats)
# 保存结果
high_performers.to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"\n 结果已保存到 {output_file}")
# 保存统计结果
major_stats.to_csv('major_statistics.csv', encoding='utf-8-sig')
print(f" 统计结果已保存到 major_statistics.csv")
return high_performers, major_stats
# 测试:创建示例数据
sample_data = pd.DataFrame({
'name': ['Alice', 'Bob', 'Carol', 'David', 'Eve', 'Frank'],
'age': [20, 21, 22, 20, 21, 22],
'major': ['Economics', 'Sociology', 'Economics', 'Political Science', 'Sociology', 'Economics'],
'gpa': [3.8, 3.4, 3.9, 3.2, 3.7, 3.6]
})
sample_data.to_csv('students.csv', index=False)
# 处理
high_performers, stats = clean_student_data('students.csv', 'high_performers.csv', min_gpa=3.5)练习 3:多格式数据转换(中等)
难度:⭐⭐⭐ 时间:30 分钟
python
"""
任务:创建一个多格式数据转换工具
功能:
1. 读取任意格式(CSV, Excel, Stata, JSON)
2. 转换为其他格式
3. 支持数据清洗(删除缺失值、异常值)
"""参考答案
python
import pandas as pd
from pathlib import Path
import json
class DataConverter:
"""多格式数据转换工具"""
def __init__(self):
self.df = None
self.original_shape = None
def read_file(self, file_path):
"""自动识别并读取文件"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
# 根据扩展名选择读取方法
suffix = file_path.suffix.lower()
if suffix == '.csv':
self.df = pd.read_csv(file_path)
elif suffix in ['.xlsx', '.xls']:
self.df = pd.read_excel(file_path)
elif suffix == '.dta':
self.df = pd.read_stata(file_path)
elif suffix == '.json':
self.df = pd.read_json(file_path)
else:
raise ValueError(f"不支持的文件格式: {suffix}")
self.original_shape = self.df.shape
print(f" 读取成功: {file_path.name}")
print(f" 数据维度: {self.df.shape[0]} 行 × {self.df.shape[1]} 列")
return self
def clean_data(self, drop_na=True, remove_outliers=False, outlier_columns=None):
"""数据清洗"""
if self.df is None:
raise ValueError("请先读取数据")
# 删除缺失值
if drop_na:
before = len(self.df)
self.df = self.df.dropna()
removed = before - len(self.df)
if removed > 0:
print(f" 删除 {removed} 行缺失数据")
# 删除异常值(使用 IQR 方法)
if remove_outliers and outlier_columns:
for col in outlier_columns:
if col in self.df.columns and pd.api.types.is_numeric_dtype(self.df[col]):
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
before = len(self.df)
self.df = self.df[
(self.df[col] >= Q1 - 1.5 * IQR) &
(self.df[col] <= Q3 + 1.5 * IQR)
]
removed = before - len(self.df)
if removed > 0:
print(f" 列 '{col}': 删除 {removed} 个异常值")
return self
def save_as(self, output_path, **kwargs):
"""保存为指定格式"""
if self.df is None:
raise ValueError("没有数据可保存")
output_path = Path(output_path)
suffix = output_path.suffix.lower()
try:
if suffix == '.csv':
self.df.to_csv(output_path, index=False, encoding='utf-8-sig', **kwargs)
elif suffix in ['.xlsx', '.xls']:
self.df.to_excel(output_path, index=False, **kwargs)
elif suffix == '.dta':
self.df.to_stata(output_path, write_index=False, version=117, **kwargs)
elif suffix == '.json':
self.df.to_json(output_path, orient='records', indent=2, force_ascii=False, **kwargs)
else:
raise ValueError(f"不支持的输出格式: {suffix}")
print(f" 保存成功: {output_path.name}")
print(f" 数据维度: {self.df.shape[0]} 行 × {self.df.shape[1]} 列")
except Exception as e:
print(f" 保存失败: {e}")
def get_summary(self):
"""显示数据摘要"""
if self.df is None:
print("没有数据")
return
print("\n" + "=" * 60)
print("数据摘要")
print("=" * 60)
print(f"原始维度: {self.original_shape[0]} 行 × {self.original_shape[1]} 列")
print(f"当前维度: {self.df.shape[0]} 行 × {self.df.shape[1]} 列")
print(f"\n列信息:")
print(self.df.dtypes)
print(f"\n前5行:")
print(self.df.head())
print("=" * 60)
# 测试
if __name__ == "__main__":
# 创建测试数据
test_data = pd.DataFrame({
'id': range(1, 11),
'age': [25, 30, None, 28, 32, 200, 27, 33, 38, 29], # 含缺失和异常
'income': [50000, 75000, 85000, None, 70000, 90000, 55000, 80000, 95000, 65000]
})
test_data.to_csv('test_input.csv', index=False)
# 转换流程
print("示例1: CSV → Excel(带清洗)")
converter = DataConverter()
(converter
.read_file('test_input.csv')
.clean_data(drop_na=True, remove_outliers=True, outlier_columns=['age'])
.save_as('test_output.xlsx'))
print("\n示例2: CSV → Stata")
converter2 = DataConverter()
(converter2
.read_file('test_input.csv')
.clean_data(drop_na=True)
.save_as('test_output.dta'))
print("\n示例3: CSV → JSON")
converter3 = DataConverter()
(converter3
.read_file('test_input.csv')
.clean_data(drop_na=True)
.save_as('test_output.json'))
# 显示摘要
converter.get_summary()练习 4:问卷数据管理系统(中等)
难度:⭐⭐⭐ 时间:35 分钟
创建一个完整的问卷数据管理系统。
参考答案
python
import pandas as pd
import json
from pathlib import Path
from datetime import datetime
class SurveyManager:
"""问卷数据管理系统"""
def __init__(self, survey_name, base_dir='surveys'):
self.survey_name = survey_name
self.base_dir = Path(base_dir)
self.base_dir.mkdir(exist_ok=True)
# 创建子目录
self.raw_dir = self.base_dir / 'raw'
self.clean_dir = self.base_dir / 'clean'
self.reports_dir = self.base_dir / 'reports'
for dir in [self.raw_dir, self.clean_dir, self.reports_dir]:
dir.mkdir(exist_ok=True)
self.df = None
self.metadata = {
'survey_name': survey_name,
'created_at': datetime.now().isoformat(),
'total_responses': 0,
'valid_responses': 0
}
def import_data(self, file_path, file_format='csv'):
"""导入数据"""
file_path = Path(file_path)
if file_format == 'csv':
self.df = pd.read_csv(file_path)
elif file_format == 'excel':
self.df = pd.read_excel(file_path)
elif file_format == 'stata':
self.df = pd.read_stata(file_path)
else:
raise ValueError(f"不支持的格式: {file_format}")
self.metadata['total_responses'] = len(self.df)
print(f" 导入 {len(self.df)} 条响应")
# 保存原始数据
self._save_raw()
return self
def validate_and_clean(self, age_range=(18, 100), income_min=0):
"""验证和清洗数据"""
if self.df is None:
raise ValueError("请先导入数据")
before = len(self.df)
# 删除缺失值
self.df = self.df.dropna(subset=['age', 'income'])
# 年龄筛选
self.df = self.df[
(self.df['age'] >= age_range[0]) &
(self.df['age'] <= age_range[1])
]
# 收入筛选
self.df = self.df[self.df['income'] >= income_min]
after = len(self.df)
removed = before - after
self.metadata['valid_responses'] = after
self.metadata['removed_responses'] = removed
print(f" 清洗完成: 删除 {removed} 条无效数据,保留 {after} 条")
# 保存清洗后数据
self._save_clean()
return self
def generate_report(self):
"""生成分析报告"""
if self.df is None or len(self.df) == 0:
print("没有数据可分析")
return
report = {
'metadata': self.metadata,
'summary': {
'age': {
'mean': float(self.df['age'].mean()),
'median': float(self.df['age'].median()),
'min': int(self.df['age'].min()),
'max': int(self.df['age'].max())
},
'income': {
'mean': float(self.df['income'].mean()),
'median': float(self.df['income'].median()),
'min': float(self.df['income'].min()),
'max': float(self.df['income'].max())
}
}
}
# 保存 JSON 报告
report_file = self.reports_dir / f'{self.survey_name}_report.json'
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
# 生成文本报告
self._generate_text_report(report)
print(f" 报告已生成")
return report
def export(self, format='csv'):
"""导出数据"""
if self.df is None:
raise ValueError("没有数据可导出")
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{self.survey_name}_{timestamp}"
if format == 'csv':
output_path = self.clean_dir / f"{filename}.csv"
self.df.to_csv(output_path, index=False, encoding='utf-8-sig')
elif format == 'excel':
output_path = self.clean_dir / f"{filename}.xlsx"
self.df.to_excel(output_path, index=False)
elif format == 'stata':
output_path = self.clean_dir / f"{filename}.dta"
self.df.to_stata(output_path, write_index=False, version=117)
else:
raise ValueError(f"不支持的格式: {format}")
print(f" 导出成功: {output_path}")
def _save_raw(self):
"""保存原始数据"""
raw_file = self.raw_dir / f'{self.survey_name}_raw.csv'
self.df.to_csv(raw_file, index=False, encoding='utf-8-sig')
def _save_clean(self):
"""保存清洗后数据"""
clean_file = self.clean_dir / f'{self.survey_name}_clean.csv'
self.df.to_csv(clean_file, index=False, encoding='utf-8-sig')
def _generate_text_report(self, report):
"""生成文本报告"""
lines = []
lines.append("=" * 60)
lines.append(f"问卷分析报告: {self.survey_name}")
lines.append("=" * 60)
lines.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"\n数据概况:")
lines.append(f" 总响应数: {self.metadata['total_responses']}")
lines.append(f" 有效响应数: {self.metadata['valid_responses']}")
lines.append(f" 删除数: {self.metadata.get('removed_responses', 0)}")
lines.append(f"\n年龄统计:")
age_stats = report['summary']['age']
lines.append(f" 平均值: {age_stats['mean']:.1f}")
lines.append(f" 中位数: {age_stats['median']:.1f}")
lines.append(f" 范围: {age_stats['min']} - {age_stats['max']}")
lines.append(f"\n收入统计:")
income_stats = report['summary']['income']
lines.append(f" 平均值: ${income_stats['mean']:,.0f}")
lines.append(f" 中位数: ${income_stats['median']:,.0f}")
lines.append(f" 范围: ${income_stats['min']:,.0f} - ${income_stats['max']:,.0f}")
lines.append("=" * 60)
report_file = self.reports_dir / f'{self.survey_name}_report.txt'
with open(report_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
# 测试
if __name__ == "__main__":
# 创建测试数据
test_data = pd.DataFrame({
'id': range(1, 21),
'age': [25, 30, 22, 28, 32, 15, 27, 33, 38, 29,
35, 40, 26, 31, 24, None, 36, 29, 34, 150],
'income': [50000, 75000, 45000, 60000, 70000, 30000, 55000, 80000, 95000, 65000,
85000, 90000, 52000, 72000, 48000, None, 88000, 67000, 78000, -1000]
})
test_data.to_csv('survey_raw.csv', index=False)
# 使用管理系统
manager = SurveyManager('income_survey_2024')
(manager
.import_data('survey_raw.csv', file_format='csv')
.validate_and_clean(age_range=(18, 65), income_min=0)
.generate_report()
.export(format='excel'))
print("\n 处理完成!检查 surveys/ 目录查看结果")练习 5:批量文件处理(进阶)
难度:⭐⭐⭐⭐ 时间:40 分钟
批量处理多个数据文件并合并。
python
"""
任务:
1. 读取 data/ 文件夹中所有 CSV 文件
2. 每个文件代表一年的调查数据
3. 合并所有数据,添加年份列
4. 清洗异常值
5. 生成年度对比报告
6. 保存为多格式(CSV, Excel, Stata)
"""参考答案(精简版)
python
import pandas as pd
from pathlib import Path
import re
class BatchProcessor:
"""批量文件处理器"""
def __init__(self, input_dir, output_dir='output'):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.all_data = []
def process_all_files(self, pattern='*.csv'):
"""处理所有文件"""
files = list(self.input_dir.glob(pattern))
print(f"发现 {len(files)} 个文件")
for file in files:
# 从文件名提取年份
year_match = re.search(r'(\d{4})', file.stem)
year = int(year_match.group(1)) if year_match else None
df = pd.read_csv(file)
df['year'] = year
df['source_file'] = file.name
self.all_data.append(df)
print(f" {file.name}: {len(df)} 行")
# 合并
self.combined_df = pd.concat(self.all_data, ignore_index=True)
print(f"\n合并完成: {len(self.combined_df)} 行")
return self
def clean_data(self):
"""清洗数据"""
before = len(self.combined_df)
self.combined_df = self.combined_df.dropna()
self.combined_df = self.combined_df[
(self.combined_df['age'] >= 18) &
(self.combined_df['age'] <= 100) &
(self.combined_df['income'] > 0)
]
after = len(self.combined_df)
print(f"清洗: 删除 {before - after} 行,保留 {after} 行")
return self
def generate_yearly_report(self):
"""生成年度报告"""
yearly_stats = self.combined_df.groupby('year').agg({
'age': ['count', 'mean'],
'income': ['mean', 'median']
}).round(2)
print("\n年度统计:")
print(yearly_stats)
# 保存报告
yearly_stats.to_csv(self.output_dir / 'yearly_report.csv')
return self
def export_all_formats(self, base_name='combined'):
"""导出所有格式"""
self.combined_df.to_csv(
self.output_dir / f'{base_name}.csv',
index=False, encoding='utf-8-sig'
)
self.combined_df.to_excel(
self.output_dir / f'{base_name}.xlsx',
index=False
)
self.combined_df.to_stata(
self.output_dir / f'{base_name}.dta',
write_index=False, version=117
)
print(f" 导出完成: {base_name}.{{csv,xlsx,dta}}")
# 使用示例
# processor = BatchProcessor('data/')
# (processor
# .process_all_files(pattern='survey_*.csv')
# .clean_data()
# .generate_yearly_report()
# .export_all_formats('panel_data'))下一步
完成本章后,你已经掌握了:
- 文本文件读写
- CSV/Excel 处理
- Stata 文件读写
- JSON 数据处理
- 路径操作和文件管理
恭喜你完成 Module 7!
在 Module 8 中,我们将学习异常处理和调试。
扩展阅读
准备好学习错误处理了吗?