Skip to content

3.3 数据清洗

"Garbage in, garbage out.""垃圾进,垃圾出。"— George Fuechsel, IBM Programmer (IBM程序员)

垃圾进,垃圾出(Garbage In, Garbage Out)


本节目标

  • 科学处理缺失值(理解 MCAR/MAR/MNAR)
  • 掌握 MICE 多重插补理论与实践
  • 识别和处理异常值
  • 删除重复值
  • 统一数据类型和格式
  • 建立可重复的清洗流程
  • 理解 Heckman 样本选择模型

为什么数据清洗如此重要?

真实案例:Excel 错误导致的政策失误

2010 年,Reinhart-Rogoff 债务与增长研究

  • 哈佛经济学家发表论文:政府债务 > 90% GDP → 经济增长显著下降
  • 多国政府据此实施紧缩政策
  • 2013 年发现:Excel 公式错误(遗漏了 5 个国家的数据)
  • 后果:结论被推翻,政策建议失效

教训:数据清洗错误会导致错误的结论和政策!


缺失数据的统计理论

缺失机制的数学定义

为完整数据矩阵,, 为缺失指示矩阵:

缺失机制由条件分布 刻画,其中 为缺失机制参数。

1. MCAR (Missing Completely At Random)

缺失与所有变量(观测的和未观测的)都无关。

示例: 问卷调查中,受访者因随机中断(电话掉线)而导致的缺失。

2. MAR (Missing At Random)

给定观测数据后,缺失与未观测数据无关。

示例: 高收入者更可能拒绝回答收入问题,但给定教育、职业等观测变量后,缺失与真实收入无关。

3. MNAR (Missing Not At Random)

缺失与未观测数据相关,即使给定观测数据。

示例: 吸毒者更可能拒绝回答吸毒史,缺失与答案本身相关。

Little's MCAR Test

检验 MCAR 假设的统计检验:

其中:

  • 为缺失模式总数
  • 为第 个缺失模式的样本量
  • 为第 个缺失模式的均值向量
  • 为总体均值向量
  • 为合并协方差矩阵

在 MCAR 假设下, 近似服从卡方分布。


缺失值处理

缺失值的三种类型

类型英文定义处理策略
完全随机缺失MCAR缺失与任何变量无关删除 or 简单填充
随机缺失MAR缺失与观测变量相关条件填充 or 插值
非随机缺失MNAR缺失与未观测变量相关️ 需要建模 or 敏感性分析

检测缺失值

python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 1. 基本检测
df.isnull().sum()        # 各列缺失数
df.isnull().mean() * 100 # 缺失率
df.isnull().any(axis=1).sum()  # 含缺失的行数

# 2. 可视化缺失模式
import missingno as msno

# 缺失值矩阵图
msno.matrix(df, figsize=(10, 6))
plt.title('缺失值模式')

# 缺失值相关性热图
msno.heatmap(df, figsize=(10, 6))

# 缺失值树状图(显示哪些变量一起缺失)
msno.dendrogram(df)

Little's MCAR Test 实现

python
from scipy import stats
from scipy.linalg import inv

def littles_mcar_test(data):
    """
    Little's MCAR Test

    H0: Data is MCAR
    H1: Data is not MCAR

    Parameters:
    -----------
    data : pd.DataFrame
        包含缺失值的数据框

    Returns:
    --------
    chi2_stat : float
        卡方统计量
    p_value : float
        p值
    df : int
        自由度
    """
    # 移除全为缺失的列
    data = data.dropna(axis=1, how='all')

    # 识别缺失模式
    missing_patterns = data.isnull().astype(int)
    unique_patterns = missing_patterns.drop_duplicates()

    # 只保留数值列
    numeric_data = data.select_dtypes(include=[np.number])

    # 计算总体均值和协方差(完整案例)
    complete_cases = numeric_data.dropna()
    if len(complete_cases) < 2:
        return np.nan, np.nan, np.nan

    grand_mean = complete_cases.mean().values
    grand_cov = complete_cases.cov().values

    # 计算卡方统计量
    chi2_stat = 0
    total_df = 0

    for idx, pattern in unique_patterns.iterrows():
        # 获取该模式的数据
        mask = (missing_patterns == pattern).all(axis=1)
        pattern_data = numeric_data[mask]

        if len(pattern_data) == 0:
            continue

        # 观测变量
        observed_vars = ~pattern[numeric_data.columns].values

        if observed_vars.sum() == 0:
            continue

        # 该模式的均值
        pattern_mean = pattern_data.iloc[:, observed_vars].mean().values

        # 该模式的协方差矩阵
        sub_cov = grand_cov[np.ix_(observed_vars, observed_vars)]

        if np.linalg.matrix_rank(sub_cov) < len(pattern_mean):
            continue

        # 总体均值(仅观测变量)
        sub_mean = grand_mean[observed_vars]

        # 计算 Mahalanobis 距离
        diff = pattern_mean - sub_mean
        n_j = len(pattern_data)

        try:
            chi2_j = n_j * diff @ inv(sub_cov) @ diff
            chi2_stat += chi2_j
            total_df += len(pattern_mean)
        except np.linalg.LinAlgError:
            continue

    # 自由度
    n_patterns = len(unique_patterns)
    n_vars = observed_vars.sum()
    df = (n_patterns - 1) * n_vars

    # p值
    p_value = 1 - stats.chi2.cdf(chi2_stat, df)

    return chi2_stat, p_value, df

# 使用示例
chi2, p_value, df = littles_mcar_test(df)
print(f"Little's MCAR Test:")
print(f"  Chi-square = {chi2:.4f}")
print(f"  df = {df}")
print(f"  p-value = {p_value:.4f}")

if p_value > 0.05:
    print("  结论: 不能拒绝 MCAR 假设(数据可能是 MCAR)")
else:
    print("  结论: 拒绝 MCAR 假设(数据不是 MCAR)")

缺失值处理策略

策略 1:删除(Deletion)

完全删除(Listwise Deletion):

python
# 删除任何列有缺失的行
df_clean = df.dropna()

# 删除特定列有缺失的行
df_clean = df.dropna(subset=['income', 'education'])

# 删除全为缺失的行
df_clean = df.dropna(how='all')

# 删除缺失超过阈值的行(至少有 5 个非缺失值才保留)
df_clean = df.dropna(thresh=5)

删除缺失率高的列:

python
# 删除缺失率 > 50% 的列
missing_ratio = df.isnull().mean()
cols_to_drop = missing_ratio[missing_ratio > 0.5].index
df_clean = df.drop(columns=cols_to_drop)

print(f"删除列: {list(cols_to_drop)}")

何时使用删除:

  • 缺失率 < 5%
  • MCAR(完全随机缺失)
  • 样本量足够大
  • 缺失率高会导致信息损失
  • MAR/MNAR 会导致选择偏误

策略 2:简单填充(Simple Imputation)

python
# 1. 用均值填充
df['age'].fillna(df['age'].mean(), inplace=True)

# 2. 用中位数填充(对异常值稳健)
df['income'].fillna(df['income'].median(), inplace=True)

# 3. 用众数填充(分类变量)
df['region'].fillna(df['region'].mode()[0], inplace=True)

# 4. 用常数填充
df['score'].fillna(0, inplace=True)

# 5. 前向填充(时间序列)
df['sales'].fillna(method='ffill', inplace=True)

# 6. 后向填充
df['sales'].fillna(method='bfill', inplace=True)

# 7. 线性插值(时间序列)
df['temperature'] = df['temperature'].interpolate(method='linear')

批量填充:

python
# 数值列用中位数,分类列用众数
from sklearn.impute import SimpleImputer

# 数值列
num_cols = df.select_dtypes(include=[np.number]).columns
imputer_num = SimpleImputer(strategy='median')
df[num_cols] = imputer_num.fit_transform(df[num_cols])

# 分类列
cat_cols = df.select_dtypes(include=['object']).columns
imputer_cat = SimpleImputer(strategy='most_frequent')
df[cat_cols] = imputer_cat.fit_transform(df[cat_cols])

策略 3:高级填充(Advanced Imputation)

条件填充(按组填充):

python
# 用各地区的平均收入填充缺失值
df['income'] = df.groupby('region')['income'].transform(
    lambda x: x.fillna(x.median())
)

# 用各行业-教育组合的平均工资填充
df['wage'] = df.groupby(['industry', 'education'])['wage'].transform(
    lambda x: x.fillna(x.mean())
)

回归填充(用其他变量预测缺失值):

python
from sklearn.linear_model import LinearRegression

# 示例:用年龄和教育预测缺失的收入
# 1. 分离有/无缺失的数据
df_with_income = df[df['income'].notnull()]
df_missing_income = df[df['income'].isnull()]

# 2. 训练模型
X = df_with_income[['age', 'education']]
y = df_with_income['income']
model = LinearRegression().fit(X, y)

# 3. 预测缺失值
X_missing = df_missing_income[['age', 'education']]
predicted_income = model.predict(X_missing)

# 4. 填充
df.loc[df['income'].isnull(), 'income'] = predicted_income

KNN 填充:

python
from sklearn.impute import KNNImputer

# 用 K 近邻的平均值填充
imputer = KNNImputer(n_neighbors=5)
df_imputed = pd.DataFrame(
    imputer.fit_transform(df[['age', 'income', 'education']]),
    columns=['age', 'income', 'education']
)

MICE 多重插补理论

MICE 算法原理

MICE (Multivariate Imputation by Chained Equations) 是一种迭代插补算法。

算法步骤

对于变量 :

  1. 初始化: 用简单方法(如均值)填充所有缺失值

  2. 迭代过程: 对于第 次迭代,变量 :

其中 表示除 外的所有变量。

  1. 收敛: 重复步骤 2 直到收敛(通常 5-10 次迭代)

  2. 生成多个插补数据集: 重复上述过程 次(通常 )

Rubin's Rules

对于 个插补数据集,如何合并估计量?

为第 个插补数据集的参数估计, 为其方差估计。

合并估计量:

总方差:

其中:

  • Within-imputation variance (平均方差):
  • Between-imputation variance (插补间方差):

自由度:

置信区间:

Fraction of Missing Information

衡量由于缺失导致的信息损失比例。

MICE 完整实现

python
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import BayesianRidge
import pandas as pd
import numpy as np

class MICEImputer:
    """
    MICE (Multivariate Imputation by Chained Equations)
    完整实现 + Rubin's Rules
    """

    def __init__(self, n_imputations=5, max_iter=10, random_state=42):
        """
        参数:
        ------
        n_imputations : int
            生成的插补数据集数量
        max_iter : int
            每次插补的最大迭代次数
        random_state : int
            随机种子
        """
        self.n_imputations = n_imputations
        self.max_iter = max_iter
        self.random_state = random_state
        self.imputed_datasets = []

    def fit_transform(self, X):
        """
        执行 MICE 插补

        参数:
        ------
        X : pd.DataFrame
            包含缺失值的数据框

        返回:
        ------
        imputed_datasets : list
            m 个插补后的数据框
        """
        self.imputed_datasets = []

        for i in range(self.n_imputations):
            # 创建迭代插补器
            imputer = IterativeImputer(
                estimator=BayesianRidge(),
                max_iter=self.max_iter,
                random_state=self.random_state + i,
                verbose=0
            )

            # 插补
            X_imputed = imputer.fit_transform(X)

            # 转换为 DataFrame
            df_imputed = pd.DataFrame(
                X_imputed,
                columns=X.columns,
                index=X.index
            )

            self.imputed_datasets.append(df_imputed)

        return self.imputed_datasets

    def analyze(self, estimator_func):
        """
        对多个插补数据集进行分析并使用 Rubin's Rules 合并结果

        参数:
        ------
        estimator_func : function
            接受 DataFrame 并返回 (estimate, variance) 的函数

        返回:
        ------
        pooled_estimate : float
            合并估计量
        pooled_variance : float
            总方差
        pooled_se : float
            合并标准误
        ci_lower, ci_upper : float
            95% 置信区间
        """
        estimates = []
        variances = []

        # 对每个插补数据集进行估计
        for df in self.imputed_datasets:
            est, var = estimator_func(df)
            estimates.append(est)
            variances.append(var)

        # Rubin's Rules
        m = self.n_imputations

        # 合并估计量
        pooled_estimate = np.mean(estimates)

        # Within-imputation variance
        W = np.mean(variances)

        # Between-imputation variance
        B = np.var(estimates, ddof=1)

        # Total variance
        T = W + (1 + 1/m) * B

        # Standard error
        pooled_se = np.sqrt(T)

        # Degrees of freedom
        if B > 0:
            nu = (m - 1) * (1 + W / ((1 + 1/m) * B))**2
        else:
            nu = np.inf

        # 95% CI
        from scipy import stats
        t_crit = stats.t.ppf(0.975, nu) if nu < np.inf else 1.96
        ci_lower = pooled_estimate - t_crit * pooled_se
        ci_upper = pooled_estimate + t_crit * pooled_se

        # Fraction of missing information
        fmi = (B + B/m) / T if T > 0 else 0

        return {
            'estimate': pooled_estimate,
            'se': pooled_se,
            'ci_lower': ci_lower,
            'ci_upper': ci_upper,
            'W': W,
            'B': B,
            'T': T,
            'nu': nu,
            'fmi': fmi
        }

# 使用示例
# 创建模拟数据
np.random.seed(42)
n = 1000
df = pd.DataFrame({
    'age': np.random.normal(40, 10, n),
    'education': np.random.normal(12, 3, n),
    'income': np.random.normal(50000, 20000, n),
    'health': np.random.normal(70, 15, n)
})

# 引入 MAR 缺失(高收入者更可能缺失 income)
df.loc[df['income'] > 60000, 'income'] = np.where(
    np.random.random(sum(df['income'] > 60000)) < 0.3,
    np.nan,
    df.loc[df['income'] > 60000, 'income']
)

# 引入其他缺失
for col in ['age', 'education', 'health']:
    mask = np.random.random(n) < 0.1
    df.loc[mask, col] = np.nan

print(f"缺失率:\n{df.isnull().mean() * 100}")

# MICE 插补
mice = MICEImputer(n_imputations=5, max_iter=10)
imputed_datasets = mice.fit_transform(df)

# 定义估计函数(例如:估计收入均值)
def estimate_mean_income(df):
    """估计收入均值及其方差"""
    mean = df['income'].mean()
    var = df['income'].var() / len(df)  # 均值的方差
    return mean, var

# 使用 Rubin's Rules 合并结果
results = mice.analyze(estimate_mean_income)

print("\n=== MICE + Rubin's Rules 结果 ===")
print(f"合并估计量: {results['estimate']:.2f}")
print(f"标准误: {results['se']:.2f}")
print(f"95% CI: [{results['ci_lower']:.2f}, {results['ci_upper']:.2f}]")
print(f"Within variance (W): {results['W']:.2f}")
print(f"Between variance (B): {results['B']:.2f}")
print(f"Total variance (T): {results['T']:.2f}")
print(f"自由度: {results['nu']:.2f}")
print(f"缺失信息比例 (FMI): {results['fmi']:.4f}")

# 对比完整案例分析
complete_case_mean = df['income'].dropna().mean()
print(f"\n完整案例均值: {complete_case_mean:.2f}")
print(f"差异: {results['estimate'] - complete_case_mean:.2f}")

策略 4:缺失指示变量(Missing Indicator)

python
# 创建缺失指示变量
df['income_missing'] = df['income'].isnull().astype(int)

# 然后填充原变量(如用中位数)
df['income'].fillna(df['income'].median(), inplace=True)

# 回归时同时包含原变量和缺失指示变量
# income_missing 的系数反映缺失本身的效应

何时使用:

  • 怀疑缺失本身有信息含量(MNAR)
  • 缺失率适中(10-30%)

生产级数据清洗类

python
import pandas as pd
import numpy as np
from scipy import stats
from scipy.stats.mstats import winsorize
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import BayesianRidge
from sklearn.ensemble import IsolationForest
import warnings
warnings.filterwarnings('ignore')

class DataCleaner:
    """
    生产级数据清洗管道 - Nobel Prize Standards

    功能:
    -----
    1. 缺失值诊断 (MCAR/MAR/MNAR)
    2. MICE 多重插补
    3. 异常值检测与处理 (IQR, Z-score, Isolation Forest, Winsorization)
    4. 字符串清洗
    5. 重复值处理
    6. 数据类型统一
    7. 完整清洗报告

    示例:
    -----
    >>> cleaner = DataCleaner(df, name='CFPS 2018')
    >>> cleaner.diagnose_missing()
    >>> cleaner.impute_mice(n_imputations=5)
    >>> cleaner.handle_outliers(method='winsorize')
    >>> df_clean = cleaner.get_clean_data()
    >>> cleaner.generate_cleaning_report()
    """

    def __init__(self, df, name='Dataset'):
        """
        初始化清洗器

        参数:
        ------
        df : pd.DataFrame
            原始数据框
        name : str
            数据集名称
        """
        self.df_original = df.copy()
        self.df = df.copy()
        self.name = name
        self.cleaning_log = []
        self.missing_diagnostics = {}
        self.outlier_diagnostics = {}

        self._log(f"初始化 DataCleaner: {name}")
        self._log(f"  形状: {self.df.shape[0]} 行 × {self.df.shape[1]} 列")

    def _log(self, message):
        """记录清洗步骤"""
        self.cleaning_log.append(message)
        print(message)

    def diagnose_missing(self, plot=True):
        """
        诊断缺失机制: MCAR/MAR/MNAR

        步骤:
        -----
        1. Little's MCAR Test
        2. Missing pattern analysis
        3. Correlation heatmap
        4. Missingness correlation with observed variables
        """
        self._log("\n" + "="*70)
        self._log("缺失值诊断")
        self._log("="*70)

        # 基本统计
        missing_count = self.df.isnull().sum()
        missing_pct = self.df.isnull().mean() * 100

        missing_summary = pd.DataFrame({
            '缺失数': missing_count,
            '缺失率(%)': missing_pct
        })
        missing_summary = missing_summary[missing_summary['缺失数'] > 0].sort_values(
            '缺失率(%)', ascending=False
        )

        self._log(f"\n【1】缺失值概览")
        self._log(f"  总缺失值: {self.df.isnull().sum().sum()}")
        self._log(f"  含缺失的行: {self.df.isnull().any(axis=1).sum()} ({self.df.isnull().any(axis=1).mean()*100:.2f}%)")
        self._log(f"\n{missing_summary.to_string()}")

        # Little's MCAR Test
        self._log(f"\n【2】Little's MCAR Test")
        try:
            chi2, p_value, df_test = self._littles_mcar_test()
            self._log(f"  Chi-square = {chi2:.4f}")
            self._log(f"  df = {df_test}")
            self._log(f"  p-value = {p_value:.4f}")

            if p_value > 0.05:
                self._log(f"  结论: 不能拒绝 MCAR (p > 0.05)")
                self.missing_diagnostics['mechanism'] = 'MCAR'
            else:
                self._log(f"  结论: 拒绝 MCAR (p < 0.05), 数据可能是 MAR 或 MNAR")
                self.missing_diagnostics['mechanism'] = 'MAR/MNAR'
        except Exception as e:
            self._log(f"  无法执行 Little's MCAR Test: {e}")

        # 缺失模式分析
        self._log(f"\n【3】缺失模式分析")
        missing_patterns = self.df.isnull().astype(int)
        unique_patterns = missing_patterns.value_counts()
        self._log(f"  唯一缺失模式数: {len(unique_patterns)}")
        self._log(f"  Top 5 缺失模式:")
        for i, (pattern, count) in enumerate(unique_patterns.head(5).items(), 1):
            self._log(f"    模式 {i}: {count} 行 ({count/len(self.df)*100:.2f}%)")

        # 缺失相关性分析
        self._log(f"\n【4】缺失与观测变量的相关性")
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns

        for col in numeric_cols:
            if self.df[col].isnull().any():
                # 创建缺失指示变量
                missing_indicator = self.df[col].isnull().astype(int)

                # 计算与其他数值变量的相关性
                correlations = []
                for other_col in numeric_cols:
                    if other_col != col and not self.df[other_col].isnull().all():
                        try:
                            corr = missing_indicator.corr(self.df[other_col])
                            if abs(corr) > 0.1:  # 只显示显著相关
                                correlations.append((other_col, corr))
                        except:
                            pass

                if correlations:
                    self._log(f"\n  {col} 的缺失与以下变量相关:")
                    for other_col, corr in sorted(correlations, key=lambda x: abs(x[1]), reverse=True)[:3]:
                        self._log(f"    - {other_col}: {corr:.3f}")

        # 可视化
        if plot:
            self._plot_missing_patterns()

        self.missing_diagnostics['summary'] = missing_summary

        return missing_summary

    def _littles_mcar_test(self):
        """Little's MCAR Test 实现"""
        from scipy.linalg import inv

        data = self.df.copy()
        data = data.dropna(axis=1, how='all')

        missing_patterns = data.isnull().astype(int)
        unique_patterns = missing_patterns.drop_duplicates()

        numeric_data = data.select_dtypes(include=[np.number])

        complete_cases = numeric_data.dropna()
        if len(complete_cases) < 2:
            return np.nan, np.nan, np.nan

        grand_mean = complete_cases.mean().values
        grand_cov = complete_cases.cov().values

        chi2_stat = 0
        total_df = 0

        for idx, pattern in unique_patterns.iterrows():
            mask = (missing_patterns == pattern).all(axis=1)
            pattern_data = numeric_data[mask]

            if len(pattern_data) == 0:
                continue

            observed_vars = ~pattern[numeric_data.columns].values

            if observed_vars.sum() == 0:
                continue

            pattern_mean = pattern_data.iloc[:, observed_vars].mean().values
            sub_cov = grand_cov[np.ix_(observed_vars, observed_vars)]

            if np.linalg.matrix_rank(sub_cov) < len(pattern_mean):
                continue

            sub_mean = grand_mean[observed_vars]
            diff = pattern_mean - sub_mean
            n_j = len(pattern_data)

            try:
                chi2_j = n_j * diff @ inv(sub_cov) @ diff
                chi2_stat += chi2_j
                total_df += len(pattern_mean)
            except np.linalg.LinAlgError:
                continue

        n_patterns = len(unique_patterns)
        n_vars = len(numeric_data.columns)
        df = (n_patterns - 1) * n_vars

        p_value = 1 - stats.chi2.cdf(chi2_stat, df) if df > 0 else np.nan

        return chi2_stat, p_value, df

    def _plot_missing_patterns(self):
        """可视化缺失模式"""
        try:
            import missingno as msno
            import matplotlib.pyplot as plt

            fig, axes = plt.subplots(1, 2, figsize=(14, 6))

            # 缺失矩阵
            msno.matrix(self.df, ax=axes[0], fontsize=10)
            axes[0].set_title(f'{self.name} - 缺失模式矩阵', fontsize=12)

            # 缺失热图
            msno.heatmap(self.df, ax=axes[1], fontsize=10)
            axes[1].set_title(f'{self.name} - 缺失相关性', fontsize=12)

            plt.tight_layout()
            plt.show()
        except ImportError:
            self._log("  提示: 安装 missingno 以可视化缺失模式 (pip install missingno)")

    def impute_mice(self, n_imputations=5, max_iter=10, use_rubin=True):
        """
        MICE (Multivariate Imputation by Chained Equations) 完整实现

        参数:
        ------
        n_imputations : int
            生成的插补数据集数量
        max_iter : int
            每次插补的最大迭代次数
        use_rubin : bool
            是否使用 Rubin's Rules 合并结果

        返回:
        ------
        imputed_datasets : list (如果 use_rubin=False)
            或 合并后的数据框 (如果 use_rubin=True)
        """
        self._log("\n" + "="*70)
        self._log(f"MICE 多重插补 (m={n_imputations}, max_iter={max_iter})")
        self._log("="*70)

        # 只对数值列插补
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        non_numeric_cols = self.df.select_dtypes(exclude=[np.number]).columns

        # 保存非数值列
        df_non_numeric = self.df[non_numeric_cols].copy()

        imputed_datasets = []

        for i in range(n_imputations):
            self._log(f"\n  插补数据集 {i+1}/{n_imputations}")

            imputer = IterativeImputer(
                estimator=BayesianRidge(),
                max_iter=max_iter,
                random_state=42 + i,
                verbose=0
            )

            X_imputed = imputer.fit_transform(self.df[numeric_cols])

            df_imputed = pd.DataFrame(
                X_imputed,
                columns=numeric_cols,
                index=self.df.index
            )

            # 合并非数值列
            df_imputed = pd.concat([df_imputed, df_non_numeric], axis=1)

            imputed_datasets.append(df_imputed)

            self._log(f"    收敛状态: 完成")

        if use_rubin:
            # 使用 Rubin's Rules 合并
            self._log(f"\n  使用 Rubin's Rules 合并 {n_imputations} 个数据集")

            # 对每个数值变量取平均(简化版 Rubin's Rules)
            df_pooled = pd.DataFrame(index=self.df.index)

            for col in numeric_cols:
                values = np.array([df[col].values for df in imputed_datasets])
                df_pooled[col] = values.mean(axis=0)

            # 合并非数值列
            df_pooled = pd.concat([df_pooled, df_non_numeric], axis=1)

            self.df = df_pooled
            self._log(f"  已合并为单一数据集")

            return df_pooled
        else:
            self._log(f"  返回 {n_imputations} 个独立插补数据集")
            return imputed_datasets

    def handle_outliers(self, method='winsorize', limits=[0.05, 0.05],
                       threshold=3, contamination=0.1):
        """
        异常值稳健处理

        参数:
        ------
        method : str
            处理方法: 'winsorize', 'iqr', 'zscore', 'isolation_forest'
        limits : list
            Winsorization 的上下限比例 [lower, upper]
        threshold : float
            Z-score 方法的阈值
        contamination : float
            Isolation Forest 的异常值比例
        """
        self._log("\n" + "="*70)
        self._log(f"异常值处理 (方法: {method})")
        self._log("="*70)

        numeric_cols = self.df.select_dtypes(include=[np.number]).columns

        outliers_summary = []

        for col in numeric_cols:
            if self.df[col].std() == 0:
                continue

            before_min = self.df[col].min()
            before_max = self.df[col].max()
            before_mean = self.df[col].mean()

            if method == 'winsorize':
                self.df[col] = winsorize(self.df[col], limits=limits)

            elif method == 'iqr':
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR

                outliers_mask = (self.df[col] < lower_bound) | (self.df[col] > upper_bound)
                n_outliers = outliers_mask.sum()

                # 删除异常值
                self.df = self.df[~outliers_mask]

                outliers_summary.append({
                    '变量': col,
                    '方法': 'IQR',
                    '异常值数': n_outliers,
                    '下界': lower_bound,
                    '上界': upper_bound
                })

            elif method == 'zscore':
                z_scores = np.abs(stats.zscore(self.df[col]))
                outliers_mask = z_scores > threshold
                n_outliers = outliers_mask.sum()

                # 删除异常值
                self.df = self.df[~outliers_mask]

                outliers_summary.append({
                    '变量': col,
                    '方法': 'Z-score',
                    '异常值数': n_outliers,
                    '阈值': threshold
                })

            elif method == 'isolation_forest':
                # 使用 Isolation Forest
                iso_forest = IsolationForest(
                    contamination=contamination,
                    random_state=42
                )

                outliers_mask = iso_forest.fit_predict(
                    self.df[[col]].values.reshape(-1, 1)
                ) == -1

                n_outliers = outliers_mask.sum()

                # 删除异常值
                self.df = self.df[~outliers_mask]

                outliers_summary.append({
                    '变量': col,
                    '方法': 'Isolation Forest',
                    '异常值数': n_outliers,
                    '污染率': contamination
                })

            after_min = self.df[col].min()
            after_max = self.df[col].max()
            after_mean = self.df[col].mean()

            if method == 'winsorize':
                self._log(f"\n  {col}:")
                self._log(f"    处理前: [{before_min:.2f}, {before_max:.2f}], 均值 = {before_mean:.2f}")
                self._log(f"    处理后: [{after_min:.2f}, {after_max:.2f}], 均值 = {after_mean:.2f}")

        if outliers_summary:
            self._log(f"\n  异常值处理汇总:")
            for item in outliers_summary:
                self._log(f"    {item}")

        self.outlier_diagnostics = {
            'method': method,
            'summary': outliers_summary
        }

    def clean_strings(self):
        """
        字符串清洗

        步骤:
        -----
        1. Strip whitespace
        2. Normalize case
        3. Remove special characters
        """
        self._log("\n" + "="*70)
        self._log("字符串清洗")
        self._log("="*70)

        string_cols = self.df.select_dtypes(include=['object']).columns

        for col in string_cols:
            # 去除空格
            self.df[col] = self.df[col].str.strip()

            # 统一小写
            self.df[col] = self.df[col].str.lower()

            self._log(f"  {col}: 已清洗 (去空格 + 小写)")

        self._log(f"\n  共清洗 {len(string_cols)} 个字符串列")

    def remove_duplicates(self, subset=None, keep='first'):
        """
        删除重复值

        参数:
        ------
        subset : list
            检查重复的列
        keep : str
            保留策略: 'first', 'last', False
        """
        self._log("\n" + "="*70)
        self._log("删除重复值")
        self._log("="*70)

        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset, keep=keep)
        after = len(self.df)

        self._log(f"  删除 {before - after} 行重复记录 ({(before-after)/before*100:.2f}%)")

    def get_clean_data(self):
        """返回清洗后的数据"""
        return self.df.copy()

    def generate_cleaning_report(self):
        """
        生成完整清洗报告
        """
        print("\n" + "="*70)
        print(f"{self.name} - 数据清洗报告")
        print("="*70)

        print(f"\n【1】数据概览")
        print(f"  原始数据: {self.df_original.shape[0]} 行 × {self.df_original.shape[1]} 列")
        print(f"  清洗后: {self.df.shape[0]} 行 × {self.df.shape[1]} 列")
        print(f"  删除行数: {self.df_original.shape[0] - self.df.shape[0]} ({(self.df_original.shape[0] - self.df.shape[0])/self.df_original.shape[0]*100:.2f}%)")

        print(f"\n【2】缺失值统计")
        original_missing = self.df_original.isnull().sum().sum()
        final_missing = self.df.isnull().sum().sum()
        print(f"  原始缺失值: {original_missing}")
        print(f"  清洗后缺失值: {final_missing}")
        print(f"  处理缺失值: {original_missing - final_missing}")

        if self.missing_diagnostics:
            print(f"  缺失机制: {self.missing_diagnostics.get('mechanism', 'Unknown')}")

        print(f"\n【3】异常值处理")
        if self.outlier_diagnostics:
            print(f"  方法: {self.outlier_diagnostics.get('method', 'None')}")
            summary = self.outlier_diagnostics.get('summary', [])
            if summary:
                total_outliers = sum(item['异常值数'] for item in summary)
                print(f"  总异常值: {total_outliers}")

        print(f"\n【4】数据质量指标")
        print(f"  完整性: {(1 - self.df.isnull().mean().mean())*100:.2f}%")
        print(f"  重复率: {self.df.duplicated().mean()*100:.2f}%")

        print(f"\n【5】清洗步骤日志")
        print(f"  共执行 {len(self.cleaning_log)} 个步骤")

        print("="*70)

        # 返回详细日志
        return {
            'original_shape': self.df_original.shape,
            'final_shape': self.df.shape,
            'rows_removed': self.df_original.shape[0] - self.df.shape[0],
            'missing_handled': original_missing - final_missing,
            'cleaning_log': self.cleaning_log,
            'missing_diagnostics': self.missing_diagnostics,
            'outlier_diagnostics': self.outlier_diagnostics
        }

# 使用示例
if __name__ == "__main__":
    # 创建模拟数据
    np.random.seed(42)
    n = 1000

    df_raw = pd.DataFrame({
        'id': range(1, n+1),
        'age': np.random.normal(40, 10, n),
        'education': np.random.normal(12, 3, n),
        'income': np.random.normal(50000, 20000, n),
        'health': np.random.normal(70, 15, n),
        'region': np.random.choice(['east', 'west', 'north', 'south'], n),
        'gender': np.random.choice(['male', 'female'], n)
    })

    # 引入缺失值
    df_raw.loc[df_raw['income'] > 60000, 'income'] = np.where(
        np.random.random(sum(df_raw['income'] > 60000)) < 0.3,
        np.nan,
        df_raw.loc[df_raw['income'] > 60000, 'income']
    )

    for col in ['age', 'education', 'health']:
        mask = np.random.random(n) < 0.1
        df_raw.loc[mask, col] = np.nan

    # 引入异常值
    df_raw.loc[np.random.choice(n, 10), 'age'] = -5
    df_raw.loc[np.random.choice(n, 10), 'income'] = 999999

    # 使用 DataCleaner
    cleaner = DataCleaner(df_raw, name='模拟数据集')

    # 诊断缺失
    cleaner.diagnose_missing(plot=False)

    # MICE 插补
    cleaner.impute_mice(n_imputations=5, max_iter=10)

    # 处理异常值
    cleaner.handle_outliers(method='winsorize', limits=[0.05, 0.05])

    # 清洗字符串
    cleaner.clean_strings()

    # 删除重复
    cleaner.remove_duplicates()

    # 生成报告
    report = cleaner.generate_cleaning_report()

    # 获取清洗后的数据
    df_clean = cleaner.get_clean_data()

异常值处理

什么是异常值?

异常值(Outlier):与大部分数据显著不同的观测值

两种类型:

  1. 错误值(Data Errors):录入错误、测量错误

    • 例如:年龄 = -5,收入 = 999999999
    • 处理:删除或修正
  2. 真实极端值(True Extremes):真实但罕见的观测

    • 例如:CEO 年薪、亿万富翁财富
    • 处理:保留、Winsorize、对数转换

识别异常值的方法

方法 1:箱线图(Box Plot)

python
import matplotlib.pyplot as plt
import seaborn as sns

# 单变量箱线图
plt.figure(figsize=(8, 6))
sns.boxplot(y=df['income'])
plt.title('收入分布(箱线图)')
plt.show()

# 多变量对比
plt.figure(figsize=(12, 6))
df[['age', 'income', 'education']].boxplot()
plt.ylabel('值')
plt.title('多变量箱线图')
plt.show()

方法 2:IQR 方法(四分位距)

python
def detect_outliers_iqr(df, col):
    """
    使用 IQR 方法检测异常值
    """
    Q1 = df[col].quantile(0.25)
    Q3 = df[col].quantile(0.75)
    IQR = Q3 - Q1

    # 定义异常值边界
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    # 识别异常值
    outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]

    print(f"列: {col}")
    print(f"  Q1: {Q1:.2f}")
    print(f"  Q3: {Q3:.2f}")
    print(f"  IQR: {IQR:.2f}")
    print(f"  下界: {lower_bound:.2f}")
    print(f"  上界: {upper_bound:.2f}")
    print(f"  异常值数量: {len(outliers)} ({100*len(outliers)/len(df):.2f}%)")

    return outliers

# 使用示例
income_outliers = detect_outliers_iqr(df, 'income')

方法 3:Z-score 方法

对于正态分布, 的数据在 范围内:

python
from scipy import stats

def detect_outliers_zscore(df, col, threshold=3):
    """
    使用 Z-score 方法检测异常值
    threshold: 通常用 3(99.7% 数据在 ±3σ 内)
    """
    z_scores = np.abs(stats.zscore(df[col].dropna()))
    outliers = df[z_scores > threshold]

    print(f"列: {col}")
    print(f"  均值: {df[col].mean():.2f}")
    print(f"  标准差: {df[col].std():.2f}")
    print(f"  阈值: ±{threshold}σ")
    print(f"  异常值数量: {len(outliers)} ({100*len(outliers)/len(df):.2f}%)")

    return outliers

# 使用示例
age_outliers = detect_outliers_zscore(df, 'age', threshold=3)

方法 4:Isolation Forest (机器学习方法)

Isolation Forest 基于决策树隔离异常值:

python
from sklearn.ensemble import IsolationForest

def detect_outliers_isolation_forest(df, cols, contamination=0.1):
    """
    使用 Isolation Forest 检测多变量异常值

    参数:
    ------
    contamination : float
        预期异常值比例
    """
    iso_forest = IsolationForest(
        contamination=contamination,
        random_state=42
    )

    # 预测(-1 表示异常值)
    outliers_mask = iso_forest.fit_predict(df[cols]) == -1

    print(f"Isolation Forest:")
    print(f"  异常值数量: {outliers_mask.sum()} ({outliers_mask.mean()*100:.2f}%)")

    return df[outliers_mask]

# 使用示例
outliers_multi = detect_outliers_isolation_forest(
    df,
    ['age', 'income', 'education'],
    contamination=0.05
)

处理异常值的策略

策略 1:删除

python
# 删除异常值
df_clean = df[~df.index.isin(outliers.index)]

# 删除 income < 0 或 income > 1,000,000 的记录
df_clean = df[(df['income'] > 0) & (df['income'] < 1000000)]

何时删除:

  • 明显是错误值(如年龄 = -5)
  • 样本量足够大,删除后不影响分析
  • 异常值是真实的(如亿万富翁)

策略 2:Winsorize(缩尾处理)

python
from scipy.stats.mstats import winsorize

# 将极端值替换为第 5 和第 95 百分位数
df['income_winsorized'] = winsorize(df['income'], limits=[0.05, 0.05])

# 或者手动实现
lower = df['income'].quantile(0.05)
upper = df['income'].quantile(0.95)
df['income_winsorized'] = df['income'].clip(lower, upper)

print(f"原始范围: {df['income'].min():.0f} - {df['income'].max():.0f}")
print(f"Winsorize 后: {df['income_winsorized'].min():.0f} - {df['income_winsorized'].max():.0f}")

优点:

  • 保留样本量
  • 减少极端值影响
  • 保持数据分布形态

策略 3:对数转换(Log Transformation)

对于右偏分布(如收入、财富):

其中 是一个常数(通常 ),防止

python
# 对数转换可以压缩右偏分布
df['log_income'] = np.log(df['income'] + 1)

# 可视化对比
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# 原始分布
axes[0].hist(df['income'], bins=50, edgecolor='black')
axes[0].set_title('原始收入分布(右偏)')
axes[0].set_xlabel('收入')

# 对数转换后
axes[1].hist(df['log_income'], bins=50, edgecolor='black', color='green')
axes[1].set_title('对数收入分布(接近正态)')
axes[1].set_xlabel('log(收入)')

plt.tight_layout()
plt.show()

策略 4:Cap(截断)

python
# 设置上下限
df['income_capped'] = df['income'].clip(lower=0, upper=500000)

# 或者用百分位数
lower = df['income'].quantile(0.01)
upper = df['income'].quantile(0.99)
df['income_capped'] = df['income'].clip(lower, upper)

重复值处理

检测重复值

python
# 检测完全重复的行
duplicates = df.duplicated()
print(f"重复行数: {duplicates.sum()} ({100*duplicates.mean():.2f}%)")

# 查看重复的行
print(df[duplicates])

# 检测特定列的重复
duplicates_id = df.duplicated(subset=['id'])
print(f"ID 重复数: {duplicates_id.sum()}")

# 查看重复的 ID
duplicate_ids = df[duplicates_id]['id'].unique()
print(f"重复的 ID: {duplicate_ids}")

删除重复值

python
# 删除完全重复的行(保留第一次出现)
df_clean = df.drop_duplicates()

# 保留最后一次出现
df_clean = df.drop_duplicates(keep='last')

# 删除所有重复(一个都不保留)
df_clean = df.drop_duplicates(keep=False)

# 基于特定列删除重复
df_clean = df.drop_duplicates(subset=['id'], keep='first')

# 基于多列组合删除重复
df_clean = df.drop_duplicates(subset=['person_id', 'year'], keep='first')

处理"准重复"(Fuzzy Duplicates)

python
# 示例:姓名略有差异但实际是同一人
from fuzzywuzzy import fuzz

def find_fuzzy_duplicates(df, col, threshold=90):
    """
    查找模糊匹配的重复值
    """
    values = df[col].unique()
    duplicates = []

    for i, val1 in enumerate(values):
        for val2 in values[i+1:]:
            similarity = fuzz.ratio(str(val1), str(val2))
            if similarity >= threshold:
                duplicates.append((val1, val2, similarity))

    return pd.DataFrame(duplicates, columns=['值1', '值2', '相似度'])

# 示例:姓名匹配
fuzzy_dups = find_fuzzy_duplicates(df, 'name', threshold=90)
print(fuzzy_dups)

数据类型与格式统一

数据类型转换

python
# 1. 转换为数值类型
df['age'] = pd.to_numeric(df['age'], errors='coerce')  # 无法转换的变为 NaN

# 2. 转换为字符串
df['id'] = df['id'].astype(str)

# 3. 转换为分类类型(节省内存)
df['region'] = df['region'].astype('category')

# 4. 转换为日期类型
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d', errors='coerce')

# 5. 转换为布尔类型
df['employed'] = df['employed'].astype(bool)

字符串清洗

python
# 去除空格
df['name'] = df['name'].str.strip()  # 前后空格
df['name'] = df['name'].str.lstrip()  # 左侧空格
df['name'] = df['name'].str.rstrip()  # 右侧空格

# 统一大小写
df['gender'] = df['gender'].str.lower()  # 全部小写
df['gender'] = df['gender'].str.upper()  # 全部大写
df['name'] = df['name'].str.title()     # 首字母大写

# 替换字符
df['phone'] = df['phone'].str.replace('-', '')  # 删除连字符
df['income'] = df['income'].str.replace(',', '')  # 删除千分位逗号
df['income'] = df['income'].str.replace('$', '')  # 删除美元符号

# 提取数字
df['income_num'] = df['income'].str.extract(r'(\d+)').astype(float)

日期处理

python
# 解析日期
df['date'] = pd.to_datetime(df['date'])

# 处理多种日期格式
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d', errors='coerce')

# 提取日期组成部分
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['weekday'] = df['date'].dt.day_name()

# 计算日期差
df['days_since'] = (pd.Timestamp.now() - df['date']).dt.days

Heckman 样本选择模型

理论基础

问题: 当样本选择是非随机的(MNAR),直接删除缺失值会导致选择偏误。

Heckman Two-Step 模型 (Heckman, 1979):

Step 1: Selection Equation (Probit)

其中:

  • 表示观测到

Step 2: Outcome Equation (OLS + IMR)

关键: 假设 联合正态:

Inverse Mills Ratio

其中:

  • 是标准正态 PDF
  • 是标准正态 CDF

修正后的回归

Python 实现

python
from scipy.stats import norm
from sklearn.linear_model import LogisticRegression, LinearRegression
import statsmodels.api as sm

class HeckmanCorrection:
    """
    Heckman Two-Step Selection Model

    用于处理样本选择偏误(MNAR)

    示例:
    -----
    >>> heckman = HeckmanCorrection()
    >>> heckman.fit(
    ...     selection_data=df,
    ...     selection_formula='selected ~ age + education',
    ...     outcome_data=df[df['selected'] == 1],
    ...     outcome_formula='income ~ age + education'
    ... )
    >>> results = heckman.summary()
    """

    def __init__(self):
        self.probit_model = None
        self.ols_model = None
        self.lambda_coef = None
        self.rho = None

    def fit(self, df, selection_vars, outcome_vars, selection_indicator):
        """
        拟合 Heckman 模型

        参数:
        ------
        df : pd.DataFrame
            完整数据框
        selection_vars : list
            选择方程的自变量
        outcome_vars : list
            结果方程的自变量
        selection_indicator : str
            选择指示变量名(1=观测到, 0=未观测到)
        """
        print("="*70)
        print("Heckman Two-Step Selection Model")
        print("="*70)

        # Step 1: Probit for selection
        print("\n【Step 1】Selection Equation (Probit)")

        X_selection = df[selection_vars]
        y_selection = df[selection_indicator]

        # 使用 statsmodels Probit
        X_selection_const = sm.add_constant(X_selection)
        probit = sm.Probit(y_selection, X_selection_const)
        self.probit_model = probit.fit(disp=False)

        print(self.probit_model.summary())

        # 计算 Inverse Mills Ratio
        linear_pred = self.probit_model.predict(X_selection_const)

        # 只对选中的样本
        selected_mask = (y_selection == 1)

        # IMR = phi(z) / Phi(z)
        imr = norm.pdf(linear_pred[selected_mask]) / norm.cdf(linear_pred[selected_mask])

        # Step 2: OLS with IMR
        print("\n【Step 2】Outcome Equation (OLS + IMR)")

        df_selected = df[selected_mask].copy()
        df_selected['imr'] = imr.values

        # 结果方程
        X_outcome = df_selected[outcome_vars + ['imr']]
        y_outcome = df_selected['outcome']

        X_outcome_const = sm.add_constant(X_outcome)
        ols = sm.OLS(y_outcome, X_outcome_const)
        self.ols_model = ols.fit()

        print(self.ols_model.summary())

        # 提取 IMR 系数
        self.lambda_coef = self.ols_model.params['imr']

        print(f"\n【结果】")
        print(f"  Inverse Mills Ratio 系数: {self.lambda_coef:.4f}")

        if abs(self.lambda_coef) > 0.1:
            print(f"  ️  存在显著的样本选择偏误!")
        else:
            print(f"   样本选择偏误较小")

        return self

    def predict(self, X_new, with_correction=True):
        """
        预测(可选是否使用 Heckman 修正)
        """
        if with_correction:
            # 使用修正后的模型
            return self.ols_model.predict(sm.add_constant(X_new))
        else:
            # 忽略 IMR,使用普通 OLS
            return self.ols_model.predict(sm.add_constant(X_new))

# 使用示例
if __name__ == "__main__":
    # 模拟数据:女性工资数据(已婚女性更可能不工作)
    np.random.seed(42)
    n = 2000

    df = pd.DataFrame({
        'age': np.random.normal(35, 8, n),
        'education': np.random.normal(13, 2, n),
        'married': np.random.binomial(1, 0.5, n),
        'children': np.random.poisson(1, n)
    })

    # 选择方程:工作概率
    # 已婚、有孩子的女性更可能不工作
    work_prob = 1 / (1 + np.exp(
        -(0.5 - 0.3 * df['married'] - 0.2 * df['children'] + 0.05 * df['education'])
    ))
    df['working'] = np.random.binomial(1, work_prob)

    # 结果方程:工资(只对工作的人观测到)
    df['wage'] = np.nan
    working_mask = df['working'] == 1
    df.loc[working_mask, 'wage'] = (
        5000 +
        200 * df.loc[working_mask, 'age'] +
        1000 * df.loc[working_mask, 'education'] +
        np.random.normal(0, 2000, working_mask.sum())
    )

    print(f"工作人数: {df['working'].sum()} / {n} ({df['working'].mean()*100:.1f}%)")
    print(f"观测到工资: {df['wage'].notna().sum()}")

    # Heckman 模型
    df_heckman = df.copy()
    df_heckman['outcome'] = df_heckman['wage']

    heckman = HeckmanCorrection()
    heckman.fit(
        df=df_heckman,
        selection_vars=['age', 'education', 'married', 'children'],
        outcome_vars=['age', 'education'],
        selection_indicator='working'
    )

真实案例:CFPS 2018 数据清洗

案例背景

数据: 中国家庭追踪调查(CFPS) 2018

研究问题: 教育对收入的影响

挑战:

  • 收入缺失率 30%
  • 高收入者更可能拒绝回答(MAR/MNAR)
  • 存在极端值(超高收入)

完整清洗流程

python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 模拟 CFPS 2018 数据
np.random.seed(42)
n = 5000

# 创建模拟数据
cfps = pd.DataFrame({
    'pid': range(1, n+1),
    'age': np.random.normal(42, 12, n),
    'education': np.random.choice([6, 9, 12, 16], n, p=[0.3, 0.2, 0.3, 0.2]),
    'gender': np.random.choice(['男', '女'], n),
    'urban': np.random.choice([0, 1], n, p=[0.4, 0.6]),
    'province': np.random.choice([
        '北京', '上海', '广东', '浙江', '江苏', '山东', '河南', '四川', '湖北', '湖南'
    ], n),
    'health': np.random.choice([1, 2, 3, 4, 5], n, p=[0.05, 0.15, 0.4, 0.3, 0.1])
})

# 生成收入(基于教育、年龄等)
cfps['income'] = (
    5000 +
    2000 * cfps['education'] +
    300 * cfps['age'] +
    5000 * cfps['urban'] +
    10000 * (cfps['gender'] == '男').astype(int) +
    np.random.lognormal(8, 1.5, n)
)

# 引入 MAR 缺失(高收入、高教育者更可能缺失)
missing_prob = 1 / (1 + np.exp(
    -(cfps['income'] / 50000 - 2 + 0.1 * cfps['education'])
))
missing_mask = np.random.random(n) < missing_prob
cfps.loc[missing_mask, 'income'] = np.nan

print(f"收入缺失率: {cfps['income'].isnull().mean()*100:.2f}%")

# 引入其他缺失
for col in ['age', 'education', 'health']:
    mask = np.random.random(n) < 0.05
    cfps.loc[mask, col] = np.nan

# 引入异常值
cfps.loc[np.random.choice(n, 20, replace=False), 'income'] = np.random.uniform(
    500000, 1000000, 20
)
cfps.loc[np.random.choice(n, 10, replace=False), 'age'] = -99

# 引入重复
cfps = pd.concat([cfps, cfps.sample(50)], ignore_index=True)

print(f"\n原始数据: {cfps.shape}")
print(f"\n前5行:")
print(cfps.head())

# 使用 DataCleaner
print("\n" + "="*70)
print("开始数据清洗")
print("="*70)

cleaner = DataCleaner(cfps, name='CFPS 2018')

# 1. 诊断缺失
cleaner.diagnose_missing(plot=False)

# 2. 删除重复
cleaner.remove_duplicates(subset=['pid'], keep='first')

# 3. 处理明显错误值
print("\n" + "="*70)
print("处理错误值")
print("="*70)

# 年龄范围: 16-100
before = len(cleaner.df)
cleaner.df = cleaner.df[(cleaner.df['age'] >= 16) & (cleaner.df['age'] <= 100)]
print(f"  删除年龄异常值: {before - len(cleaner.df)} 行")

# 收入 > 0
before = len(cleaner.df)
cleaner.df = cleaner.df[(cleaner.df['income'] > 0) | (cleaner.df['income'].isnull())]
print(f"  删除负收入: {before - len(cleaner.df)} 行")

# 4. MICE 插补
cleaner.impute_mice(n_imputations=5, max_iter=10, use_rubin=True)

# 5. 处理异常值
cleaner.handle_outliers(method='winsorize', limits=[0.01, 0.01])

# 6. 清洗字符串
cleaner.clean_strings()

# 7. 生成报告
report = cleaner.generate_cleaning_report()

# 获取清洗后的数据
cfps_clean = cleaner.get_clean_data()

print(f"\n清洗后数据: {cfps_clean.shape}")
print(f"\n描述性统计:")
print(cfps_clean[['age', 'education', 'income']].describe())

# 8. 对比完整案例删除 vs MICE 插补
print("\n" + "="*70)
print("完整案例删除 vs MICE 插补对比")
print("="*70)

# 完整案例
cfps_complete = cfps.dropna(subset=['income'])

# 估计教育回报
from sklearn.linear_model import LinearRegression

# 完整案例估计
X_complete = cfps_complete[['education', 'age', 'urban']]
y_complete = cfps_complete['income']
model_complete = LinearRegression().fit(X_complete, y_complete)
beta_complete = model_complete.coef_[0]

# MICE 插补估计
X_mice = cfps_clean[['education', 'age', 'urban']]
y_mice = cfps_clean['income']
model_mice = LinearRegression().fit(X_mice, y_mice)
beta_mice = model_mice.coef_[0]

print(f"\n教育回报估计:")
print(f"  完整案例删除: {beta_complete:.2f}")
print(f"  MICE 插补: {beta_mice:.2f}")
print(f"  差异: {beta_mice - beta_complete:.2f} ({(beta_mice/beta_complete - 1)*100:.2f}%)")

# 9. 可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 原始 vs 清洗后的收入分布
axes[0, 0].hist(cfps['income'].dropna(), bins=50, alpha=0.5, label='原始', edgecolor='black')
axes[0, 0].hist(cfps_clean['income'], bins=50, alpha=0.5, label='清洗后', edgecolor='black')
axes[0, 0].set_title('收入分布对比')
axes[0, 0].set_xlabel('收入')
axes[0, 0].legend()

# 年龄分布
axes[0, 1].hist(cfps_clean['age'], bins=30, edgecolor='black', color='green')
axes[0, 1].set_title('年龄分布(清洗后)')
axes[0, 1].set_xlabel('年龄')

# 教育分布
cfps_clean['education'].value_counts().sort_index().plot(kind='bar', ax=axes[1, 0], color='orange')
axes[1, 0].set_title('教育分布(清洗后)')
axes[1, 0].set_xlabel('教育年限')
axes[1, 0].set_ylabel('频数')

# 收入 vs 教育
axes[1, 1].scatter(cfps_clean['education'], cfps_clean['income'], alpha=0.3, s=10)
axes[1, 1].set_title('教育与收入关系')
axes[1, 1].set_xlabel('教育年限')
axes[1, 1].set_ylabel('收入')

plt.tight_layout()
plt.show()

print("\n 数据清洗完成!")

敏感性分析

python
print("\n" + "="*70)
print("敏感性分析:缺失机制假设")
print("="*70)

# 假设 1: MCAR(删除)
cfps_mcar = cfps.dropna()
X_mcar = cfps_mcar[['education', 'age', 'urban']]
y_mcar = cfps_mcar['income']
model_mcar = LinearRegression().fit(X_mcar, y_mcar)
beta_mcar = model_mcar.coef_[0]

# 假设 2: MAR(MICE)
beta_mar = beta_mice

# 假设 3: MNAR(Heckman)
# 简化:假设收入缺失与真实收入相关
# (实际应用中需要更复杂的建模)

print(f"\n教育回报估计(敏感性分析):")
print(f"  MCAR 假设(删除): {beta_mcar:.2f}")
print(f"  MAR 假设(MICE): {beta_mar:.2f}")
print(f"  差异: {beta_mar - beta_mcar:.2f} ({abs(beta_mar - beta_mcar) / beta_mcar * 100:.2f}%)")

print(f"\n结论:")
if abs(beta_mar - beta_mcar) / beta_mcar < 0.05:
    print(f"   结果对缺失机制假设不敏感(差异 < 5%)")
else:
    print(f"  ️  结果对缺失机制假设敏感(差异 ≥ 5%)")
    print(f"  建议: 进一步调查缺失机制,可能需要 Heckman 修正")

练习题

⭐⭐ 基础题:简单均值插补

任务: 给定包含缺失值的数据集,实现简单均值插补。

python
# 练习数据
np.random.seed(42)
df_practice = pd.DataFrame({
    'x1': np.random.normal(50, 10, 100),
    'x2': np.random.normal(100, 20, 100),
    'x3': np.random.normal(75, 15, 100)
})

# 引入缺失
for col in df_practice.columns:
    mask = np.random.random(100) < 0.2
    df_practice.loc[mask, col] = np.nan

print("缺失率:")
print(df_practice.isnull().mean())

# TODO: 实现均值插补
# df_imputed = ...

# 验证
# assert df_imputed.isnull().sum().sum() == 0

⭐⭐⭐ 中级题:IQR 异常值检测

任务: 实现基于 IQR 的异常值检测函数,并可视化。

python
# TODO: 实现函数
def detect_and_visualize_outliers(df, col):
    """
    检测并可视化异常值

    返回:
    ------
    outliers : pd.DataFrame
        异常值数据
    """
    # 1. IQR 计算
    # 2. 识别异常值
    # 3. 可视化(箱线图 + 散点图)
    pass

# 测试
# outliers = detect_and_visualize_outliers(df, 'income')

⭐⭐⭐⭐ 高级题:MICE + Rubin's Rules

任务: 实现完整的 MICE 插补并使用 Rubin's Rules 合并结果。

python
# TODO: 实现
# 1. MICE 生成 m=5 个插补数据集
# 2. 对每个数据集估计线性回归系数
# 3. 使用 Rubin's Rules 合并
# 4. 计算 95% 置信区间
# 5. 对比完整案例分析

⭐⭐⭐⭐⭐ Nobel 级别:Heckman + 敏感性分析

任务: 实现 Heckman 两步法,并进行敏感性分析。

python
# TODO: 实现
# 1. 模拟女性工资数据(已婚女性更可能不工作)
# 2. Heckman 两步法估计
# 3. 对比普通 OLS(删除缺失)
# 4. 敏感性分析:改变选择方程变量
# 5. 解释 Inverse Mills Ratio 的经济含义

小结

数据清洗核心函数

任务函数示例
缺失值dropna()删除缺失
fillna()填充缺失
IterativeImputer()MICE 插补
异常值clip()截断
winsorize()缩尾
IsolationForest()ML 检测
重复值duplicated()检测重复
drop_duplicates()删除重复
类型转换astype()类型转换
pd.to_numeric()转换为数值
pd.to_datetime()转换为日期
字符串str.strip()去空格
str.lower()转小写
str.replace()替换字符

缺失值处理决策树

开始

├─ 缺失率 < 5%?
│  ├─ 是 → 直接删除(dropna)
│  └─ 否 → 继续

├─ Little's MCAR Test p > 0.05?
│  ├─ 是 → MCAR,可以删除或简单填充
│  └─ 否 → MAR/MNAR,需要高级方法

├─ MAR?
│  ├─ 是 → MICE 多重插补
│  └─ 否 → MNAR

└─ MNAR?
   ├─ 样本选择问题 → Heckman 两步法
   ├─ 缺失本身有信息 → 缺失指示变量
   └─ 不确定 → 敏感性分析

数据清洗最佳实践

  1. 保留原始数据: 使用 df.copy() 创建副本
  2. 记录清洗步骤: 每步都添加注释和日志
  3. 先探索后清洗: 理解数据后再动手
  4. 可重复的流程: 编写函数,便于复用
  5. 验证清洗结果: 清洗后重新检查数据质量
  6. 敏感性分析: 对比不同清洗策略的影响
  7. 文档化: 生成清洗报告

理论要点

  1. 缺失机制:

    • MCAR: 删除不会导致偏误
    • MAR: 可以用观测变量建模
    • MNAR: 需要额外假设或敏感性分析
  2. MICE:

    • 迭代插补,考虑变量间关系
    • Rubin's Rules 合并多个插补数据集
    • 标准误正确反映不确定性
  3. Heckman 模型:

    • 两步法修正样本选择偏误
    • Inverse Mills Ratio 量化选择效应
    • 需要排他性限制(exclusion restriction)

下一步

下一节我们将学习 变量构造,包括虚拟变量、交互项、滞后项等。

继续前进!


参考资源

经典教材

  • Little, R. J., & Rubin, D. B. (2019). Statistical Analysis with Missing Data (3rd Edition). Wiley.
  • Heckman, J. J. (1979). "Sample Selection Bias as a Specification Error." Econometrica, 47(1), 153-161.
  • van Buuren, S. (2018). Flexible Imputation of Missing Data (2nd Edition). Chapman and Hall/CRC.

实用工具

  • missingno: 缺失值可视化
  • scikit-learn.impute: 插补算法
  • statsmodels: 统计建模(包括 Heckman)
  • fancyimpute: 高级插补方法

在线资源

基于 MIT 许可证发布。内容版权归作者所有。