Module 8 小结和复习
异常处理与调试 —— 让代码更健壮
知识点总结
1. 错误类型
语法错误 vs 运行时异常:
- 语法错误(Syntax Error):代码写错,无法运行
- 运行时异常(Exception):程序运行时发生的错误
常见运行时异常:
| 异常类型 | 原因 | 示例 |
|---|---|---|
NameError | 变量未定义 | print(undefined_var) |
TypeError | 类型错误 | "5" + 5 |
ValueError | 值错误 | int("abc") |
IndexError | 索引超出范围 | list[999] |
KeyError | 字典键不存在 | dict['missing_key'] |
FileNotFoundError | 文件不存在 | open('missing.txt') |
ZeroDivisionError | 除以零 | 10 / 0 |
AttributeError | 属性不存在 | list.push() |
2. try-except 异常处理
基本语法:
python
try:
# 可能出错的代码
risky_operation()
except ExceptionType:
# 出错时的处理
handle_error()完整语法:
python
try:
# 尝试执行
result = operation()
except ValueError:
# 处理特定异常
print("值错误")
except FileNotFoundError:
# 处理另一种异常
print("文件不存在")
except Exception as e:
# 捕获所有其他异常
print(f"其他错误: {e}")
else:
# 没有异常时执行
print(f"成功: {result}")
finally:
# 无论如何都执行(清理资源)
cleanup()多异常捕获:
python
# 方法1: 分别捕获
try:
operation()
except ValueError:
handle_value_error()
except TypeError:
handle_type_error()
# 方法2: 一起捕获
try:
operation()
except (ValueError, TypeError) as e:
handle_error(e)3. 调试技巧
调试层次:
- Print 调试:最简单,快速定位
- 断言(Assert):验证假设
- 日志(Logging):持久化调试信息
- 调试器(pdb):交互式调试
Print 调试:
python
def calculate_tax(income, rate):
print(f"Debug: income={income}, rate={rate}")
tax = income * rate
print(f"Debug: tax={tax}")
return tax断言:
python
def calculate_mean(data):
assert len(data) > 0, "数据不能为空"
assert all(isinstance(x, (int, float)) for x in data), "必须是数字"
return sum(data) / len(data)日志:
python
import logging
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
logging.debug("详细信息")
logging.info("一般信息")
logging.warning("警告")
logging.error("错误")
logging.critical("严重错误")️ 常见错误
1. 捕获太宽泛的异常
python
# 错误:捕获所有异常
try:
result = operation()
except:
print("出错了")
# 正确:捕获具体异常
try:
result = operation()
except ValueError:
print("值错误")
except FileNotFoundError:
print("文件不存在")2. 忽略异常信息
python
# 错误:不保存异常信息
try:
operation()
except Exception:
print("出错了")
# 正确:保存异常信息
try:
operation()
except Exception as e:
print(f"出错了: {e}")
logging.error(f"详细错误: {e}", exc_info=True)3. 过度使用 try-except
python
# 错误:用异常代替正常逻辑
try:
value = dict['key']
except KeyError:
value = None
# 正确:使用条件判断
value = dict.get('key', None)最佳实践
1. 捕获具体的异常
python
# 好习惯
try:
df = pd.read_csv('data.csv')
except FileNotFoundError:
print("文件不存在")
except pd.errors.EmptyDataError:
print("文件为空")
except Exception as e:
print(f"其他错误: {e}")2. 使用 else 子句
python
try:
df = pd.read_csv('data.csv')
except FileNotFoundError:
print("文件不存在")
else:
# 只在成功时执行
print(f"成功加载 {len(df)} 行")3. finally 清理资源
python
# 方法1: 使用 finally
file = None
try:
file = open('data.txt', 'r')
content = file.read()
except FileNotFoundError:
print("文件不存在")
finally:
if file:
file.close()
# 方法2: 使用 with(更好)
with open('data.txt', 'r') as file:
content = file.read()4. 记录完整的错误信息
python
import logging
import traceback
try:
operation()
except Exception as e:
logging.error(f"错误: {e}")
logging.error(traceback.format_exc())编程练习
练习 1:安全的数据读取函数(基础)
难度:⭐⭐ 时间:15 分钟
python
"""
任务:创建一个安全的 CSV 读取函数
要求:
1. 检查文件是否存在
2. 处理文件为空的情况
3. 验证必要的列是否存在
4. 返回 DataFrame 或 None
"""
import pandas as pd
from pathlib import Path
def safe_read_csv(filename, required_columns=None):
"""安全地读取 CSV 文件"""
# 你的代码
pass
# 测试
df = safe_read_csv('survey.csv', required_columns=['id', 'age', 'income'])
if df is not None:
print(f"成功: {len(df)} 行")参考答案
python
import pandas as pd
from pathlib import Path
def safe_read_csv(filename, required_columns=None):
"""安全地读取 CSV 文件
参数:
filename: CSV 文件路径
required_columns: 必须存在的列名列表
返回:
DataFrame 或 None
"""
try:
# 检查文件是否存在
file_path = Path(filename)
if not file_path.exists():
print(f" 文件不存在: {filename}")
return None
# 读取 CSV
df = pd.read_csv(file_path)
# 检查是否为空
if len(df) == 0:
print(f"️ 文件为空: {filename}")
return None
# 验证必要列
if required_columns:
missing_columns = set(required_columns) - set(df.columns)
if missing_columns:
print(f" 缺少列: {missing_columns}")
print(f" 可用列: {list(df.columns)}")
return None
print(f" 成功加载: {len(df)} 行 × {len(df.columns)} 列")
return df
except pd.errors.EmptyDataError:
print(f" 文件损坏或为空: {filename}")
return None
except pd.errors.ParserError as e:
print(f" 解析错误: {e}")
return None
except Exception as e:
print(f" 未知错误: {e}")
return None
# 测试
if __name__ == "__main__":
# 创建测试文件
test_data = pd.DataFrame({
'id': [1, 2, 3],
'age': [25, 30, 35],
'income': [50000, 75000, 85000]
})
test_data.to_csv('test_survey.csv', index=False)
# 测试成功情况
print("测试1: 正常读取")
df = safe_read_csv('test_survey.csv', required_columns=['id', 'age', 'income'])
# 测试文件不存在
print("\n测试2: 文件不存在")
df = safe_read_csv('missing.csv')
# 测试缺少列
print("\n测试3: 缺少必要列")
df = safe_read_csv('test_survey.csv', required_columns=['id', 'age', 'education'])练习 2:批量数据清洗(基础)
难度:⭐⭐ 时间:20 分钟
python
"""
任务:清洗问卷数据,处理各种异常值
数据可能的问题:
- age 可能是字符串或非法值
- income 可能是字符串或负数
- 字段可能缺失
"""
responses = [
{'id': 1, 'age': '25', 'income': '50000'},
{'id': 2, 'age': 'N/A', 'income': '75000'},
{'id': 3, 'age': '35', 'income': 'unknown'},
{'id': 4, 'age': '40', 'income': '85000'},
{'id': 5, 'age': '150', 'income': '-1000'},
]
def clean_responses(responses):
"""清洗响应数据"""
# 你的代码
pass参考答案
python
def clean_responses(responses):
"""清洗响应数据
参数:
responses: 响应列表(字典列表)
返回:
(valid_data, errors): 有效数据列表和错误列表
"""
valid_data = []
errors = []
for resp in responses:
try:
resp_id = resp.get('id', 'unknown')
# 验证和转换年龄
age_str = resp.get('age')
if age_str is None:
raise ValueError("缺少 age 字段")
age = int(age_str)
if not (0 < age < 120):
raise ValueError(f"年龄超出范围: {age}")
# 验证和转换收入
income_str = resp.get('income')
if income_str is None:
raise ValueError("缺少 income 字段")
income = float(income_str)
if income < 0:
raise ValueError(f"收入为负数: {income}")
# 创建清洗后的数据
clean_resp = {
'id': resp_id,
'age': age,
'income': income
}
valid_data.append(clean_resp)
except (ValueError, TypeError) as e:
errors.append({
'id': resp.get('id', 'unknown'),
'error': str(e),
'original_data': resp
})
return valid_data, errors
# 测试
responses = [
{'id': 1, 'age': '25', 'income': '50000'},
{'id': 2, 'age': 'N/A', 'income': '75000'},
{'id': 3, 'age': '35', 'income': 'unknown'},
{'id': 4, 'age': '40', 'income': '85000'},
{'id': 5, 'age': '150', 'income': '-1000'},
]
valid_data, errors = clean_responses(responses)
print(f" 有效数据: {len(valid_data)} 条")
for data in valid_data:
print(f" ID{data['id']}: {data['age']}岁, ${data['income']:,.0f}")
print(f"\n 错误数据: {len(errors)} 条")
for error in errors:
print(f" ID{error['id']}: {error['error']}")练习 3:带重试的 API 请求(中等)
难度:⭐⭐⭐ 时间:25 分钟
python
"""
任务:创建一个带重试机制的 API 请求函数
要求:
1. 支持最多 N 次重试
2. 处理超时错误
3. 处理 HTTP 错误
4. 使用指数退避(每次等待时间翻倍)
5. 记录每次重试
"""
import requests
import time
def fetch_with_retry(url, max_retries=3, timeout=5):
"""带重试的 API 请求"""
# 你的代码
pass参考答案
python
import requests
import time
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
def fetch_with_retry(url, max_retries=3, timeout=5):
"""带重试机制的 API 请求
参数:
url: API 地址
max_retries: 最大重试次数
timeout: 超时时间(秒)
返回:
响应数据(字典)或 None
"""
for attempt in range(1, max_retries + 1):
try:
logging.info(f"尝试 {attempt}/{max_retries}: {url}")
# 发送请求
response = requests.get(url, timeout=timeout)
# 检查 HTTP 状态码
response.raise_for_status()
# 解析 JSON
data = response.json()
logging.info(f" 成功获取数据")
return data
except requests.exceptions.Timeout:
wait_time = 2 ** (attempt - 1) # 指数退避: 1, 2, 4, 8...
logging.warning(f"⏱️ 超时,等待 {wait_time} 秒后重试...")
if attempt < max_retries:
time.sleep(wait_time)
else:
logging.error(f" 达到最大重试次数,放弃")
return None
except requests.exceptions.HTTPError as e:
logging.error(f" HTTP 错误: {e}")
logging.error(f" 状态码: {e.response.status_code}")
return None
except requests.exceptions.RequestException as e:
logging.error(f" 请求错误: {e}")
return None
except ValueError as e:
logging.error(f" JSON 解析错误: {e}")
return None
return None
# 测试
if __name__ == "__main__":
# 测试1: 成功请求(使用公开 API)
print("测试1: 正常请求")
data = fetch_with_retry('https://api.github.com/users/github')
if data:
print(f"用户名: {data.get('login')}")
print(f"仓库数: {data.get('public_repos')}")
# 测试2: 超时(使用一个很慢的地址)
print("\n测试2: 超时重试")
data = fetch_with_retry('https://httpbin.org/delay/10', max_retries=2, timeout=1)
# 测试3: 404 错误
print("\n测试3: HTTP 错误")
data = fetch_with_retry('https://api.github.com/nonexistent', max_retries=2)练习 4:自定义异常类(中等)
难度:⭐⭐⭐ 时间:30 分钟
python
"""
任务:创建一个问卷数据验证系统,使用自定义异常
要求:
1. 定义多个自定义异常类
2. 在验证函数中抛出相应异常
3. 在主程序中统一处理
"""
# 定义自定义异常
class SurveyValidationError(Exception):
"""问卷验证基础异常"""
pass
class InvalidAgeError(SurveyValidationError):
"""年龄无效异常"""
pass
# 继续定义其他异常...参考答案
python
# 自定义异常类
class SurveyValidationError(Exception):
"""问卷验证基础异常"""
pass
class InvalidAgeError(SurveyValidationError):
"""年龄无效异常"""
def __init__(self, age, min_age=18, max_age=100):
self.age = age
self.min_age = min_age
self.max_age = max_age
super().__init__(f"年龄 {age} 超出范围 [{min_age}, {max_age}]")
class InvalidIncomeError(SurveyValidationError):
"""收入无效异常"""
def __init__(self, income):
self.income = income
super().__init__(f"收入无效: {income}")
class MissingFieldError(SurveyValidationError):
"""缺少必要字段异常"""
def __init__(self, field_name):
self.field_name = field_name
super().__init__(f"缺少必要字段: {field_name}")
# 验证函数
def validate_response(response, min_age=18, max_age=100):
"""验证单个响应
参数:
response: 响应字典
min_age: 最小年龄
max_age: 最大年龄
抛出:
MissingFieldError: 缺少字段
InvalidAgeError: 年龄无效
InvalidIncomeError: 收入无效
"""
# 检查必要字段
required_fields = ['id', 'age', 'income']
for field in required_fields:
if field not in response:
raise MissingFieldError(field)
# 验证年龄
age = response['age']
if not isinstance(age, (int, float)):
raise InvalidAgeError(age, min_age, max_age)
if not (min_age <= age <= max_age):
raise InvalidAgeError(age, min_age, max_age)
# 验证收入
income = response['income']
if not isinstance(income, (int, float)):
raise InvalidIncomeError(income)
if income < 0:
raise InvalidIncomeError(income)
# 批量验证
def validate_all_responses(responses, min_age=18, max_age=100):
"""验证所有响应
返回:
(valid_responses, errors): 有效响应和错误列表
"""
valid_responses = []
errors = []
for i, resp in enumerate(responses):
try:
validate_response(resp, min_age, max_age)
valid_responses.append(resp)
except MissingFieldError as e:
errors.append({
'index': i,
'id': resp.get('id', 'unknown'),
'error_type': 'MissingField',
'error': str(e)
})
except InvalidAgeError as e:
errors.append({
'index': i,
'id': resp.get('id', 'unknown'),
'error_type': 'InvalidAge',
'error': str(e),
'value': e.age
})
except InvalidIncomeError as e:
errors.append({
'index': i,
'id': resp.get('id', 'unknown'),
'error_type': 'InvalidIncome',
'error': str(e),
'value': e.income
})
except Exception as e:
errors.append({
'index': i,
'id': resp.get('id', 'unknown'),
'error_type': 'Unknown',
'error': str(e)
})
return valid_responses, errors
# 测试
if __name__ == "__main__":
responses = [
{'id': 1, 'age': 25, 'income': 50000}, # 有效
{'id': 2, 'age': 15, 'income': 30000}, # 年龄太小
{'id': 3, 'income': 75000}, # 缺少 age
{'id': 4, 'age': 35, 'income': -5000}, # 收入为负
{'id': 5, 'age': 150, 'income': 85000}, # 年龄太大
{'id': 6, 'age': 30, 'income': 70000}, # 有效
]
valid, errors = validate_all_responses(responses, min_age=18, max_age=100)
print(f" 有效响应: {len(valid)} 条")
for resp in valid:
print(f" ID{resp['id']}: {resp['age']}岁, ${resp['income']:,}")
print(f"\n 错误响应: {len(errors)} 条")
for error in errors:
print(f" ID{error['id']} ({error['error_type']}): {error['error']}")
# 按错误类型统计
print(f"\n错误类型统计:")
error_types = {}
for error in errors:
error_type = error['error_type']
error_types[error_type] = error_types.get(error_type, 0) + 1
for error_type, count in error_types.items():
print(f" {error_type}: {count} 个")练习 5:综合调试案例(进阶)
难度:⭐⭐⭐⭐ 时间:40 分钟
python
"""
任务:调试一个有多处错误的数据分析脚本
脚本功能:
1. 读取 CSV 文件
2. 清洗数据
3. 计算统计量
4. 保存结果
要求:
1. 找出所有错误
2. 使用 try-except 处理
3. 添加日志记录
4. 添加输入验证
"""
# 有错误的代码(找出并修复)
import pandas as pd
def analyze_survey(filename):
# 读取数据
df = pd.read_csv(filename)
# 计算平均年龄
avg_age = df['age'].mean()
# 计算收入中位数
median_income = df['imcome'].median() # 拼写错误
# 筛选高收入者
high_earners = df[df['income'] > median_income]
# 保存结果
high_earners.to_csv('high_earners.csv')
return avg_age, median_income参考答案(修复后)
python
import pandas as pd
import logging
from pathlib import Path
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('analysis.log'),
logging.StreamHandler()
]
)
def analyze_survey(filename, output_filename='high_earners.csv'):
"""分析问卷数据(带完整错误处理和日志)
参数:
filename: 输入 CSV 文件路径
output_filename: 输出文件路径
返回:
统计结果字典或 None
"""
try:
logging.info(f"开始分析: {filename}")
# 1. 检查文件是否存在
file_path = Path(filename)
if not file_path.exists():
logging.error(f"文件不存在: {filename}")
return None
# 2. 读取数据
try:
df = pd.read_csv(file_path)
logging.info(f"成功读取: {len(df)} 行")
except pd.errors.EmptyDataError:
logging.error("文件为空")
return None
except pd.errors.ParserError as e:
logging.error(f"CSV 解析错误: {e}")
return None
# 3. 验证必要列
required_columns = ['age', 'income']
missing_columns = set(required_columns) - set(df.columns)
if missing_columns:
logging.error(f"缺少列: {missing_columns}")
logging.error(f"可用列: {list(df.columns)}")
return None
# 4. 数据清洗
original_len = len(df)
df = df.dropna(subset=['age', 'income'])
df = df[(df['age'] > 0) & (df['age'] < 120)]
df = df[df['income'] >= 0]
cleaned_len = len(df)
if cleaned_len < original_len:
logging.warning(f"删除了 {original_len - cleaned_len} 行无效数据")
if cleaned_len == 0:
logging.error("清洗后没有数据")
return None
# 5. 计算统计量
try:
avg_age = float(df['age'].mean())
median_income = float(df['income'].median())
logging.info(f"平均年龄: {avg_age:.1f}")
logging.info(f"收入中位数: ${median_income:,.0f}")
except Exception as e:
logging.error(f"统计计算错误: {e}")
return None
# 6. 筛选高收入者
try:
high_earners = df[df['income'] > median_income].copy()
logging.info(f"高收入者: {len(high_earners)} 人 ({len(high_earners)/len(df)*100:.1f}%)")
except Exception as e:
logging.error(f"筛选错误: {e}")
return None
# 7. 保存结果
try:
high_earners.to_csv(output_filename, index=False, encoding='utf-8-sig')
logging.info(f"结果已保存: {output_filename}")
except Exception as e:
logging.error(f"保存失败: {e}")
return None
# 8. 返回统计结果
results = {
'total_responses': cleaned_len,
'avg_age': round(avg_age, 2),
'median_income': round(median_income, 2),
'high_earners_count': len(high_earners),
'high_earners_percentage': round(len(high_earners)/len(df)*100, 2)
}
logging.info("分析完成")
return results
except Exception as e:
logging.error(f"未知错误: {e}", exc_info=True)
return None
# 测试
if __name__ == "__main__":
# 创建测试数据
test_data = pd.DataFrame({
'id': range(1, 11),
'age': [25, 30, 35, 28, 32, 40, 27, 33, 38, 29],
'income': [50000, 75000, 85000, 60000, 70000, 90000, 55000, 80000, 95000, 65000]
})
test_data.to_csv('test_survey.csv', index=False)
# 运行分析
results = analyze_survey('test_survey.csv')
if results:
print("\n分析结果:")
print(f" 总人数: {results['total_responses']}")
print(f" 平均年龄: {results['avg_age']}")
print(f" 收入中位数: ${results['median_income']:,.0f}")
print(f" 高收入者: {results['high_earners_count']} 人 ({results['high_earners_percentage']}%)")下一步
完成本章后,你已经掌握了:
- 常见错误类型
- try-except 异常处理
- 调试技巧(print, assert, logging, pdb)
- 错误日志记录
- 自定义异常
恭喜你完成 Module 8!
在 Module 9 中,我们将学习数据科学核心库(NumPy、Pandas、Matplotlib 等)。
扩展阅读
准备好学习数据科学库了吗?