Skip to content

Module 8 小结和复习

异常处理与调试 —— 让代码更健壮


知识点总结

1. 错误类型

语法错误 vs 运行时异常

  • 语法错误(Syntax Error):代码写错,无法运行
  • 运行时异常(Exception):程序运行时发生的错误

常见运行时异常

异常类型原因示例
NameError变量未定义print(undefined_var)
TypeError类型错误"5" + 5
ValueError值错误int("abc")
IndexError索引超出范围list[999]
KeyError字典键不存在dict['missing_key']
FileNotFoundError文件不存在open('missing.txt')
ZeroDivisionError除以零10 / 0
AttributeError属性不存在list.push()

2. try-except 异常处理

基本语法

python
try:
    # 可能出错的代码
    risky_operation()
except ExceptionType:
    # 出错时的处理
    handle_error()

完整语法

python
try:
    # 尝试执行
    result = operation()
except ValueError:
    # 处理特定异常
    print("值错误")
except FileNotFoundError:
    # 处理另一种异常
    print("文件不存在")
except Exception as e:
    # 捕获所有其他异常
    print(f"其他错误: {e}")
else:
    # 没有异常时执行
    print(f"成功: {result}")
finally:
    # 无论如何都执行(清理资源)
    cleanup()

多异常捕获

python
# 方法1: 分别捕获
try:
    operation()
except ValueError:
    handle_value_error()
except TypeError:
    handle_type_error()

# 方法2: 一起捕获
try:
    operation()
except (ValueError, TypeError) as e:
    handle_error(e)

3. 调试技巧

调试层次

  1. Print 调试:最简单,快速定位
  2. 断言(Assert):验证假设
  3. 日志(Logging):持久化调试信息
  4. 调试器(pdb):交互式调试

Print 调试

python
def calculate_tax(income, rate):
    print(f"Debug: income={income}, rate={rate}")
    tax = income * rate
    print(f"Debug: tax={tax}")
    return tax

断言

python
def calculate_mean(data):
    assert len(data) > 0, "数据不能为空"
    assert all(isinstance(x, (int, float)) for x in data), "必须是数字"
    return sum(data) / len(data)

日志

python
import logging

logging.basicConfig(level=logging.DEBUG,
                   format='%(asctime)s - %(levelname)s - %(message)s')

logging.debug("详细信息")
logging.info("一般信息")
logging.warning("警告")
logging.error("错误")
logging.critical("严重错误")

️ 常见错误

1. 捕获太宽泛的异常

python
#  错误:捕获所有异常
try:
    result = operation()
except:
    print("出错了")

#  正确:捕获具体异常
try:
    result = operation()
except ValueError:
    print("值错误")
except FileNotFoundError:
    print("文件不存在")

2. 忽略异常信息

python
#  错误:不保存异常信息
try:
    operation()
except Exception:
    print("出错了")

#  正确:保存异常信息
try:
    operation()
except Exception as e:
    print(f"出错了: {e}")
    logging.error(f"详细错误: {e}", exc_info=True)

3. 过度使用 try-except

python
#  错误:用异常代替正常逻辑
try:
    value = dict['key']
except KeyError:
    value = None

#  正确:使用条件判断
value = dict.get('key', None)

最佳实践

1. 捕获具体的异常

python
#  好习惯
try:
    df = pd.read_csv('data.csv')
except FileNotFoundError:
    print("文件不存在")
except pd.errors.EmptyDataError:
    print("文件为空")
except Exception as e:
    print(f"其他错误: {e}")

2. 使用 else 子句

python
try:
    df = pd.read_csv('data.csv')
except FileNotFoundError:
    print("文件不存在")
else:
    # 只在成功时执行
    print(f"成功加载 {len(df)} 行")

3. finally 清理资源

python
# 方法1: 使用 finally
file = None
try:
    file = open('data.txt', 'r')
    content = file.read()
except FileNotFoundError:
    print("文件不存在")
finally:
    if file:
        file.close()

# 方法2: 使用 with(更好)
with open('data.txt', 'r') as file:
    content = file.read()

4. 记录完整的错误信息

python
import logging
import traceback

try:
    operation()
except Exception as e:
    logging.error(f"错误: {e}")
    logging.error(traceback.format_exc())

编程练习

练习 1:安全的数据读取函数(基础)

难度:⭐⭐ 时间:15 分钟

python
"""
任务:创建一个安全的 CSV 读取函数

要求:
1. 检查文件是否存在
2. 处理文件为空的情况
3. 验证必要的列是否存在
4. 返回 DataFrame 或 None
"""

import pandas as pd
from pathlib import Path

def safe_read_csv(filename, required_columns=None):
    """安全地读取 CSV 文件"""
    # 你的代码
    pass

# 测试
df = safe_read_csv('survey.csv', required_columns=['id', 'age', 'income'])
if df is not None:
    print(f"成功: {len(df)} 行")
参考答案
python
import pandas as pd
from pathlib import Path

def safe_read_csv(filename, required_columns=None):
    """安全地读取 CSV 文件

    参数:
        filename: CSV 文件路径
        required_columns: 必须存在的列名列表

    返回:
        DataFrame 或 None
    """
    try:
        # 检查文件是否存在
        file_path = Path(filename)
        if not file_path.exists():
            print(f" 文件不存在: {filename}")
            return None

        # 读取 CSV
        df = pd.read_csv(file_path)

        # 检查是否为空
        if len(df) == 0:
            print(f"️  文件为空: {filename}")
            return None

        # 验证必要列
        if required_columns:
            missing_columns = set(required_columns) - set(df.columns)
            if missing_columns:
                print(f" 缺少列: {missing_columns}")
                print(f"   可用列: {list(df.columns)}")
                return None

        print(f" 成功加载: {len(df)} 行 × {len(df.columns)} 列")
        return df

    except pd.errors.EmptyDataError:
        print(f" 文件损坏或为空: {filename}")
        return None

    except pd.errors.ParserError as e:
        print(f" 解析错误: {e}")
        return None

    except Exception as e:
        print(f" 未知错误: {e}")
        return None


# 测试
if __name__ == "__main__":
    # 创建测试文件
    test_data = pd.DataFrame({
        'id': [1, 2, 3],
        'age': [25, 30, 35],
        'income': [50000, 75000, 85000]
    })
    test_data.to_csv('test_survey.csv', index=False)

    # 测试成功情况
    print("测试1: 正常读取")
    df = safe_read_csv('test_survey.csv', required_columns=['id', 'age', 'income'])

    # 测试文件不存在
    print("\n测试2: 文件不存在")
    df = safe_read_csv('missing.csv')

    # 测试缺少列
    print("\n测试3: 缺少必要列")
    df = safe_read_csv('test_survey.csv', required_columns=['id', 'age', 'education'])

练习 2:批量数据清洗(基础)

难度:⭐⭐ 时间:20 分钟

python
"""
任务:清洗问卷数据,处理各种异常值

数据可能的问题:
- age 可能是字符串或非法值
- income 可能是字符串或负数
- 字段可能缺失
"""

responses = [
    {'id': 1, 'age': '25', 'income': '50000'},
    {'id': 2, 'age': 'N/A', 'income': '75000'},
    {'id': 3, 'age': '35', 'income': 'unknown'},
    {'id': 4, 'age': '40', 'income': '85000'},
    {'id': 5, 'age': '150', 'income': '-1000'},
]

def clean_responses(responses):
    """清洗响应数据"""
    # 你的代码
    pass
参考答案
python
def clean_responses(responses):
    """清洗响应数据

    参数:
        responses: 响应列表(字典列表)

    返回:
        (valid_data, errors): 有效数据列表和错误列表
    """
    valid_data = []
    errors = []

    for resp in responses:
        try:
            resp_id = resp.get('id', 'unknown')

            # 验证和转换年龄
            age_str = resp.get('age')
            if age_str is None:
                raise ValueError("缺少 age 字段")

            age = int(age_str)
            if not (0 < age < 120):
                raise ValueError(f"年龄超出范围: {age}")

            # 验证和转换收入
            income_str = resp.get('income')
            if income_str is None:
                raise ValueError("缺少 income 字段")

            income = float(income_str)
            if income < 0:
                raise ValueError(f"收入为负数: {income}")

            # 创建清洗后的数据
            clean_resp = {
                'id': resp_id,
                'age': age,
                'income': income
            }
            valid_data.append(clean_resp)

        except (ValueError, TypeError) as e:
            errors.append({
                'id': resp.get('id', 'unknown'),
                'error': str(e),
                'original_data': resp
            })

    return valid_data, errors


# 测试
responses = [
    {'id': 1, 'age': '25', 'income': '50000'},
    {'id': 2, 'age': 'N/A', 'income': '75000'},
    {'id': 3, 'age': '35', 'income': 'unknown'},
    {'id': 4, 'age': '40', 'income': '85000'},
    {'id': 5, 'age': '150', 'income': '-1000'},
]

valid_data, errors = clean_responses(responses)

print(f" 有效数据: {len(valid_data)} 条")
for data in valid_data:
    print(f"   ID{data['id']}: {data['age']}岁, ${data['income']:,.0f}")

print(f"\n 错误数据: {len(errors)} 条")
for error in errors:
    print(f"   ID{error['id']}: {error['error']}")

练习 3:带重试的 API 请求(中等)

难度:⭐⭐⭐ 时间:25 分钟

python
"""
任务:创建一个带重试机制的 API 请求函数

要求:
1. 支持最多 N 次重试
2. 处理超时错误
3. 处理 HTTP 错误
4. 使用指数退避(每次等待时间翻倍)
5. 记录每次重试
"""

import requests
import time

def fetch_with_retry(url, max_retries=3, timeout=5):
    """带重试的 API 请求"""
    # 你的代码
    pass
参考答案
python
import requests
import time
import logging

logging.basicConfig(level=logging.INFO,
                   format='%(asctime)s - %(levelname)s - %(message)s')

def fetch_with_retry(url, max_retries=3, timeout=5):
    """带重试机制的 API 请求

    参数:
        url: API 地址
        max_retries: 最大重试次数
        timeout: 超时时间(秒)

    返回:
        响应数据(字典)或 None
    """
    for attempt in range(1, max_retries + 1):
        try:
            logging.info(f"尝试 {attempt}/{max_retries}: {url}")

            # 发送请求
            response = requests.get(url, timeout=timeout)

            # 检查 HTTP 状态码
            response.raise_for_status()

            # 解析 JSON
            data = response.json()

            logging.info(f" 成功获取数据")
            return data

        except requests.exceptions.Timeout:
            wait_time = 2 ** (attempt - 1)  # 指数退避: 1, 2, 4, 8...
            logging.warning(f"⏱️  超时,等待 {wait_time} 秒后重试...")

            if attempt < max_retries:
                time.sleep(wait_time)
            else:
                logging.error(f" 达到最大重试次数,放弃")
                return None

        except requests.exceptions.HTTPError as e:
            logging.error(f" HTTP 错误: {e}")
            logging.error(f"   状态码: {e.response.status_code}")
            return None

        except requests.exceptions.RequestException as e:
            logging.error(f" 请求错误: {e}")
            return None

        except ValueError as e:
            logging.error(f" JSON 解析错误: {e}")
            return None

    return None


# 测试
if __name__ == "__main__":
    # 测试1: 成功请求(使用公开 API)
    print("测试1: 正常请求")
    data = fetch_with_retry('https://api.github.com/users/github')
    if data:
        print(f"用户名: {data.get('login')}")
        print(f"仓库数: {data.get('public_repos')}")

    # 测试2: 超时(使用一个很慢的地址)
    print("\n测试2: 超时重试")
    data = fetch_with_retry('https://httpbin.org/delay/10', max_retries=2, timeout=1)

    # 测试3: 404 错误
    print("\n测试3: HTTP 错误")
    data = fetch_with_retry('https://api.github.com/nonexistent', max_retries=2)

练习 4:自定义异常类(中等)

难度:⭐⭐⭐ 时间:30 分钟

python
"""
任务:创建一个问卷数据验证系统,使用自定义异常

要求:
1. 定义多个自定义异常类
2. 在验证函数中抛出相应异常
3. 在主程序中统一处理
"""

# 定义自定义异常
class SurveyValidationError(Exception):
    """问卷验证基础异常"""
    pass

class InvalidAgeError(SurveyValidationError):
    """年龄无效异常"""
    pass

# 继续定义其他异常...
参考答案
python
# 自定义异常类
class SurveyValidationError(Exception):
    """问卷验证基础异常"""
    pass

class InvalidAgeError(SurveyValidationError):
    """年龄无效异常"""
    def __init__(self, age, min_age=18, max_age=100):
        self.age = age
        self.min_age = min_age
        self.max_age = max_age
        super().__init__(f"年龄 {age} 超出范围 [{min_age}, {max_age}]")

class InvalidIncomeError(SurveyValidationError):
    """收入无效异常"""
    def __init__(self, income):
        self.income = income
        super().__init__(f"收入无效: {income}")

class MissingFieldError(SurveyValidationError):
    """缺少必要字段异常"""
    def __init__(self, field_name):
        self.field_name = field_name
        super().__init__(f"缺少必要字段: {field_name}")


# 验证函数
def validate_response(response, min_age=18, max_age=100):
    """验证单个响应

    参数:
        response: 响应字典
        min_age: 最小年龄
        max_age: 最大年龄

    抛出:
        MissingFieldError: 缺少字段
        InvalidAgeError: 年龄无效
        InvalidIncomeError: 收入无效
    """
    # 检查必要字段
    required_fields = ['id', 'age', 'income']
    for field in required_fields:
        if field not in response:
            raise MissingFieldError(field)

    # 验证年龄
    age = response['age']
    if not isinstance(age, (int, float)):
        raise InvalidAgeError(age, min_age, max_age)
    if not (min_age <= age <= max_age):
        raise InvalidAgeError(age, min_age, max_age)

    # 验证收入
    income = response['income']
    if not isinstance(income, (int, float)):
        raise InvalidIncomeError(income)
    if income < 0:
        raise InvalidIncomeError(income)


# 批量验证
def validate_all_responses(responses, min_age=18, max_age=100):
    """验证所有响应

    返回:
        (valid_responses, errors): 有效响应和错误列表
    """
    valid_responses = []
    errors = []

    for i, resp in enumerate(responses):
        try:
            validate_response(resp, min_age, max_age)
            valid_responses.append(resp)

        except MissingFieldError as e:
            errors.append({
                'index': i,
                'id': resp.get('id', 'unknown'),
                'error_type': 'MissingField',
                'error': str(e)
            })

        except InvalidAgeError as e:
            errors.append({
                'index': i,
                'id': resp.get('id', 'unknown'),
                'error_type': 'InvalidAge',
                'error': str(e),
                'value': e.age
            })

        except InvalidIncomeError as e:
            errors.append({
                'index': i,
                'id': resp.get('id', 'unknown'),
                'error_type': 'InvalidIncome',
                'error': str(e),
                'value': e.income
            })

        except Exception as e:
            errors.append({
                'index': i,
                'id': resp.get('id', 'unknown'),
                'error_type': 'Unknown',
                'error': str(e)
            })

    return valid_responses, errors


# 测试
if __name__ == "__main__":
    responses = [
        {'id': 1, 'age': 25, 'income': 50000},      # 有效
        {'id': 2, 'age': 15, 'income': 30000},      # 年龄太小
        {'id': 3, 'income': 75000},                 # 缺少 age
        {'id': 4, 'age': 35, 'income': -5000},      # 收入为负
        {'id': 5, 'age': 150, 'income': 85000},     # 年龄太大
        {'id': 6, 'age': 30, 'income': 70000},      # 有效
    ]

    valid, errors = validate_all_responses(responses, min_age=18, max_age=100)

    print(f" 有效响应: {len(valid)} 条")
    for resp in valid:
        print(f"   ID{resp['id']}: {resp['age']}岁, ${resp['income']:,}")

    print(f"\n 错误响应: {len(errors)} 条")
    for error in errors:
        print(f"   ID{error['id']} ({error['error_type']}): {error['error']}")

    # 按错误类型统计
    print(f"\n错误类型统计:")
    error_types = {}
    for error in errors:
        error_type = error['error_type']
        error_types[error_type] = error_types.get(error_type, 0) + 1

    for error_type, count in error_types.items():
        print(f"   {error_type}: {count} 个")

练习 5:综合调试案例(进阶)

难度:⭐⭐⭐⭐ 时间:40 分钟

python
"""
任务:调试一个有多处错误的数据分析脚本

脚本功能:
1. 读取 CSV 文件
2. 清洗数据
3. 计算统计量
4. 保存结果

要求:
1. 找出所有错误
2. 使用 try-except 处理
3. 添加日志记录
4. 添加输入验证
"""

# 有错误的代码(找出并修复)
import pandas as pd

def analyze_survey(filename):
    # 读取数据
    df = pd.read_csv(filename)

    # 计算平均年龄
    avg_age = df['age'].mean()

    # 计算收入中位数
    median_income = df['imcome'].median()  # 拼写错误

    # 筛选高收入者
    high_earners = df[df['income'] > median_income]

    # 保存结果
    high_earners.to_csv('high_earners.csv')

    return avg_age, median_income
参考答案(修复后)
python
import pandas as pd
import logging
from pathlib import Path

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('analysis.log'),
        logging.StreamHandler()
    ]
)

def analyze_survey(filename, output_filename='high_earners.csv'):
    """分析问卷数据(带完整错误处理和日志)

    参数:
        filename: 输入 CSV 文件路径
        output_filename: 输出文件路径

    返回:
        统计结果字典或 None
    """
    try:
        logging.info(f"开始分析: {filename}")

        # 1. 检查文件是否存在
        file_path = Path(filename)
        if not file_path.exists():
            logging.error(f"文件不存在: {filename}")
            return None

        # 2. 读取数据
        try:
            df = pd.read_csv(file_path)
            logging.info(f"成功读取: {len(df)} 行")
        except pd.errors.EmptyDataError:
            logging.error("文件为空")
            return None
        except pd.errors.ParserError as e:
            logging.error(f"CSV 解析错误: {e}")
            return None

        # 3. 验证必要列
        required_columns = ['age', 'income']
        missing_columns = set(required_columns) - set(df.columns)
        if missing_columns:
            logging.error(f"缺少列: {missing_columns}")
            logging.error(f"可用列: {list(df.columns)}")
            return None

        # 4. 数据清洗
        original_len = len(df)
        df = df.dropna(subset=['age', 'income'])
        df = df[(df['age'] > 0) & (df['age'] < 120)]
        df = df[df['income'] >= 0]

        cleaned_len = len(df)
        if cleaned_len < original_len:
            logging.warning(f"删除了 {original_len - cleaned_len} 行无效数据")

        if cleaned_len == 0:
            logging.error("清洗后没有数据")
            return None

        # 5. 计算统计量
        try:
            avg_age = float(df['age'].mean())
            median_income = float(df['income'].median())

            logging.info(f"平均年龄: {avg_age:.1f}")
            logging.info(f"收入中位数: ${median_income:,.0f}")
        except Exception as e:
            logging.error(f"统计计算错误: {e}")
            return None

        # 6. 筛选高收入者
        try:
            high_earners = df[df['income'] > median_income].copy()
            logging.info(f"高收入者: {len(high_earners)} 人 ({len(high_earners)/len(df)*100:.1f}%)")
        except Exception as e:
            logging.error(f"筛选错误: {e}")
            return None

        # 7. 保存结果
        try:
            high_earners.to_csv(output_filename, index=False, encoding='utf-8-sig')
            logging.info(f"结果已保存: {output_filename}")
        except Exception as e:
            logging.error(f"保存失败: {e}")
            return None

        # 8. 返回统计结果
        results = {
            'total_responses': cleaned_len,
            'avg_age': round(avg_age, 2),
            'median_income': round(median_income, 2),
            'high_earners_count': len(high_earners),
            'high_earners_percentage': round(len(high_earners)/len(df)*100, 2)
        }

        logging.info("分析完成")
        return results

    except Exception as e:
        logging.error(f"未知错误: {e}", exc_info=True)
        return None


# 测试
if __name__ == "__main__":
    # 创建测试数据
    test_data = pd.DataFrame({
        'id': range(1, 11),
        'age': [25, 30, 35, 28, 32, 40, 27, 33, 38, 29],
        'income': [50000, 75000, 85000, 60000, 70000, 90000, 55000, 80000, 95000, 65000]
    })
    test_data.to_csv('test_survey.csv', index=False)

    # 运行分析
    results = analyze_survey('test_survey.csv')

    if results:
        print("\n分析结果:")
        print(f"  总人数: {results['total_responses']}")
        print(f"  平均年龄: {results['avg_age']}")
        print(f"  收入中位数: ${results['median_income']:,.0f}")
        print(f"  高收入者: {results['high_earners_count']} 人 ({results['high_earners_percentage']}%)")

下一步

完成本章后,你已经掌握了:

  • 常见错误类型
  • try-except 异常处理
  • 调试技巧(print, assert, logging, pdb)
  • 错误日志记录
  • 自定义异常

恭喜你完成 Module 8!

Module 9 中,我们将学习数据科学核心库(NumPy、Pandas、Matplotlib 等)。


扩展阅读

准备好学习数据科学库了吗?

基于 MIT 许可证发布。内容版权归作者所有。