3.3 数据清洗
"Garbage in, garbage out.""垃圾进,垃圾出。"— George Fuechsel, IBM Programmer (IBM程序员)
垃圾进,垃圾出(Garbage In, Garbage Out)
本节目标
- 科学处理缺失值(理解 MCAR/MAR/MNAR)
- 掌握 MICE 多重插补理论与实践
- 识别和处理异常值
- 删除重复值
- 统一数据类型和格式
- 建立可重复的清洗流程
- 理解 Heckman 样本选择模型
为什么数据清洗如此重要?
真实案例:Excel 错误导致的政策失误
2010 年,Reinhart-Rogoff 债务与增长研究
- 哈佛经济学家发表论文:政府债务 > 90% GDP → 经济增长显著下降
- 多国政府据此实施紧缩政策
- 2013 年发现:Excel 公式错误(遗漏了 5 个国家的数据)
- 后果:结论被推翻,政策建议失效
教训:数据清洗错误会导致错误的结论和政策!
缺失数据的统计理论
缺失机制的数学定义
设 为完整数据矩阵,, 为缺失指示矩阵:
缺失机制由条件分布 刻画,其中 为缺失机制参数。
1. MCAR (Missing Completely At Random)
缺失与所有变量(观测的和未观测的)都无关。
示例: 问卷调查中,受访者因随机中断(电话掉线)而导致的缺失。
2. MAR (Missing At Random)
给定观测数据后,缺失与未观测数据无关。
示例: 高收入者更可能拒绝回答收入问题,但给定教育、职业等观测变量后,缺失与真实收入无关。
3. MNAR (Missing Not At Random)
缺失与未观测数据相关,即使给定观测数据。
示例: 吸毒者更可能拒绝回答吸毒史,缺失与答案本身相关。
Little's MCAR Test
检验 MCAR 假设的统计检验:
其中:
- 为缺失模式总数
- 为第 个缺失模式的样本量
- 为第 个缺失模式的均值向量
- 为总体均值向量
- 为合并协方差矩阵
在 MCAR 假设下, 近似服从卡方分布。
缺失值处理
缺失值的三种类型
| 类型 | 英文 | 定义 | 处理策略 |
|---|---|---|---|
| 完全随机缺失 | MCAR | 缺失与任何变量无关 | 删除 or 简单填充 |
| 随机缺失 | MAR | 缺失与观测变量相关 | 条件填充 or 插值 |
| 非随机缺失 | MNAR | 缺失与未观测变量相关 | ️ 需要建模 or 敏感性分析 |
检测缺失值
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# 1. 基本检测
df.isnull().sum() # 各列缺失数
df.isnull().mean() * 100 # 缺失率
df.isnull().any(axis=1).sum() # 含缺失的行数
# 2. 可视化缺失模式
import missingno as msno
# 缺失值矩阵图
msno.matrix(df, figsize=(10, 6))
plt.title('缺失值模式')
# 缺失值相关性热图
msno.heatmap(df, figsize=(10, 6))
# 缺失值树状图(显示哪些变量一起缺失)
msno.dendrogram(df)Little's MCAR Test 实现
from scipy import stats
from scipy.linalg import inv
def littles_mcar_test(data):
"""
Little's MCAR Test
H0: Data is MCAR
H1: Data is not MCAR
Parameters:
-----------
data : pd.DataFrame
包含缺失值的数据框
Returns:
--------
chi2_stat : float
卡方统计量
p_value : float
p值
df : int
自由度
"""
# 移除全为缺失的列
data = data.dropna(axis=1, how='all')
# 识别缺失模式
missing_patterns = data.isnull().astype(int)
unique_patterns = missing_patterns.drop_duplicates()
# 只保留数值列
numeric_data = data.select_dtypes(include=[np.number])
# 计算总体均值和协方差(完整案例)
complete_cases = numeric_data.dropna()
if len(complete_cases) < 2:
return np.nan, np.nan, np.nan
grand_mean = complete_cases.mean().values
grand_cov = complete_cases.cov().values
# 计算卡方统计量
chi2_stat = 0
total_df = 0
for idx, pattern in unique_patterns.iterrows():
# 获取该模式的数据
mask = (missing_patterns == pattern).all(axis=1)
pattern_data = numeric_data[mask]
if len(pattern_data) == 0:
continue
# 观测变量
observed_vars = ~pattern[numeric_data.columns].values
if observed_vars.sum() == 0:
continue
# 该模式的均值
pattern_mean = pattern_data.iloc[:, observed_vars].mean().values
# 该模式的协方差矩阵
sub_cov = grand_cov[np.ix_(observed_vars, observed_vars)]
if np.linalg.matrix_rank(sub_cov) < len(pattern_mean):
continue
# 总体均值(仅观测变量)
sub_mean = grand_mean[observed_vars]
# 计算 Mahalanobis 距离
diff = pattern_mean - sub_mean
n_j = len(pattern_data)
try:
chi2_j = n_j * diff @ inv(sub_cov) @ diff
chi2_stat += chi2_j
total_df += len(pattern_mean)
except np.linalg.LinAlgError:
continue
# 自由度
n_patterns = len(unique_patterns)
n_vars = observed_vars.sum()
df = (n_patterns - 1) * n_vars
# p值
p_value = 1 - stats.chi2.cdf(chi2_stat, df)
return chi2_stat, p_value, df
# 使用示例
chi2, p_value, df = littles_mcar_test(df)
print(f"Little's MCAR Test:")
print(f" Chi-square = {chi2:.4f}")
print(f" df = {df}")
print(f" p-value = {p_value:.4f}")
if p_value > 0.05:
print(" 结论: 不能拒绝 MCAR 假设(数据可能是 MCAR)")
else:
print(" 结论: 拒绝 MCAR 假设(数据不是 MCAR)")缺失值处理策略
策略 1:删除(Deletion)
完全删除(Listwise Deletion):
# 删除任何列有缺失的行
df_clean = df.dropna()
# 删除特定列有缺失的行
df_clean = df.dropna(subset=['income', 'education'])
# 删除全为缺失的行
df_clean = df.dropna(how='all')
# 删除缺失超过阈值的行(至少有 5 个非缺失值才保留)
df_clean = df.dropna(thresh=5)删除缺失率高的列:
# 删除缺失率 > 50% 的列
missing_ratio = df.isnull().mean()
cols_to_drop = missing_ratio[missing_ratio > 0.5].index
df_clean = df.drop(columns=cols_to_drop)
print(f"删除列: {list(cols_to_drop)}")何时使用删除:
- 缺失率 < 5%
- MCAR(完全随机缺失)
- 样本量足够大
- 缺失率高会导致信息损失
- MAR/MNAR 会导致选择偏误
策略 2:简单填充(Simple Imputation)
# 1. 用均值填充
df['age'].fillna(df['age'].mean(), inplace=True)
# 2. 用中位数填充(对异常值稳健)
df['income'].fillna(df['income'].median(), inplace=True)
# 3. 用众数填充(分类变量)
df['region'].fillna(df['region'].mode()[0], inplace=True)
# 4. 用常数填充
df['score'].fillna(0, inplace=True)
# 5. 前向填充(时间序列)
df['sales'].fillna(method='ffill', inplace=True)
# 6. 后向填充
df['sales'].fillna(method='bfill', inplace=True)
# 7. 线性插值(时间序列)
df['temperature'] = df['temperature'].interpolate(method='linear')批量填充:
# 数值列用中位数,分类列用众数
from sklearn.impute import SimpleImputer
# 数值列
num_cols = df.select_dtypes(include=[np.number]).columns
imputer_num = SimpleImputer(strategy='median')
df[num_cols] = imputer_num.fit_transform(df[num_cols])
# 分类列
cat_cols = df.select_dtypes(include=['object']).columns
imputer_cat = SimpleImputer(strategy='most_frequent')
df[cat_cols] = imputer_cat.fit_transform(df[cat_cols])策略 3:高级填充(Advanced Imputation)
条件填充(按组填充):
# 用各地区的平均收入填充缺失值
df['income'] = df.groupby('region')['income'].transform(
lambda x: x.fillna(x.median())
)
# 用各行业-教育组合的平均工资填充
df['wage'] = df.groupby(['industry', 'education'])['wage'].transform(
lambda x: x.fillna(x.mean())
)回归填充(用其他变量预测缺失值):
from sklearn.linear_model import LinearRegression
# 示例:用年龄和教育预测缺失的收入
# 1. 分离有/无缺失的数据
df_with_income = df[df['income'].notnull()]
df_missing_income = df[df['income'].isnull()]
# 2. 训练模型
X = df_with_income[['age', 'education']]
y = df_with_income['income']
model = LinearRegression().fit(X, y)
# 3. 预测缺失值
X_missing = df_missing_income[['age', 'education']]
predicted_income = model.predict(X_missing)
# 4. 填充
df.loc[df['income'].isnull(), 'income'] = predicted_incomeKNN 填充:
from sklearn.impute import KNNImputer
# 用 K 近邻的平均值填充
imputer = KNNImputer(n_neighbors=5)
df_imputed = pd.DataFrame(
imputer.fit_transform(df[['age', 'income', 'education']]),
columns=['age', 'income', 'education']
)MICE 多重插补理论
MICE 算法原理
MICE (Multivariate Imputation by Chained Equations) 是一种迭代插补算法。
算法步骤
对于变量 :
初始化: 用简单方法(如均值)填充所有缺失值
迭代过程: 对于第 次迭代,变量 :
其中 表示除 外的所有变量。
收敛: 重复步骤 2 直到收敛(通常 5-10 次迭代)
生成多个插补数据集: 重复上述过程 次(通常 )
Rubin's Rules
对于 个插补数据集,如何合并估计量?
设 为第 个插补数据集的参数估计, 为其方差估计。
合并估计量:
总方差:
其中:
- Within-imputation variance (平均方差):
- Between-imputation variance (插补间方差):
自由度:
置信区间:
Fraction of Missing Information
衡量由于缺失导致的信息损失比例。
MICE 完整实现
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import BayesianRidge
import pandas as pd
import numpy as np
class MICEImputer:
"""
MICE (Multivariate Imputation by Chained Equations)
完整实现 + Rubin's Rules
"""
def __init__(self, n_imputations=5, max_iter=10, random_state=42):
"""
参数:
------
n_imputations : int
生成的插补数据集数量
max_iter : int
每次插补的最大迭代次数
random_state : int
随机种子
"""
self.n_imputations = n_imputations
self.max_iter = max_iter
self.random_state = random_state
self.imputed_datasets = []
def fit_transform(self, X):
"""
执行 MICE 插补
参数:
------
X : pd.DataFrame
包含缺失值的数据框
返回:
------
imputed_datasets : list
m 个插补后的数据框
"""
self.imputed_datasets = []
for i in range(self.n_imputations):
# 创建迭代插补器
imputer = IterativeImputer(
estimator=BayesianRidge(),
max_iter=self.max_iter,
random_state=self.random_state + i,
verbose=0
)
# 插补
X_imputed = imputer.fit_transform(X)
# 转换为 DataFrame
df_imputed = pd.DataFrame(
X_imputed,
columns=X.columns,
index=X.index
)
self.imputed_datasets.append(df_imputed)
return self.imputed_datasets
def analyze(self, estimator_func):
"""
对多个插补数据集进行分析并使用 Rubin's Rules 合并结果
参数:
------
estimator_func : function
接受 DataFrame 并返回 (estimate, variance) 的函数
返回:
------
pooled_estimate : float
合并估计量
pooled_variance : float
总方差
pooled_se : float
合并标准误
ci_lower, ci_upper : float
95% 置信区间
"""
estimates = []
variances = []
# 对每个插补数据集进行估计
for df in self.imputed_datasets:
est, var = estimator_func(df)
estimates.append(est)
variances.append(var)
# Rubin's Rules
m = self.n_imputations
# 合并估计量
pooled_estimate = np.mean(estimates)
# Within-imputation variance
W = np.mean(variances)
# Between-imputation variance
B = np.var(estimates, ddof=1)
# Total variance
T = W + (1 + 1/m) * B
# Standard error
pooled_se = np.sqrt(T)
# Degrees of freedom
if B > 0:
nu = (m - 1) * (1 + W / ((1 + 1/m) * B))**2
else:
nu = np.inf
# 95% CI
from scipy import stats
t_crit = stats.t.ppf(0.975, nu) if nu < np.inf else 1.96
ci_lower = pooled_estimate - t_crit * pooled_se
ci_upper = pooled_estimate + t_crit * pooled_se
# Fraction of missing information
fmi = (B + B/m) / T if T > 0 else 0
return {
'estimate': pooled_estimate,
'se': pooled_se,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'W': W,
'B': B,
'T': T,
'nu': nu,
'fmi': fmi
}
# 使用示例
# 创建模拟数据
np.random.seed(42)
n = 1000
df = pd.DataFrame({
'age': np.random.normal(40, 10, n),
'education': np.random.normal(12, 3, n),
'income': np.random.normal(50000, 20000, n),
'health': np.random.normal(70, 15, n)
})
# 引入 MAR 缺失(高收入者更可能缺失 income)
df.loc[df['income'] > 60000, 'income'] = np.where(
np.random.random(sum(df['income'] > 60000)) < 0.3,
np.nan,
df.loc[df['income'] > 60000, 'income']
)
# 引入其他缺失
for col in ['age', 'education', 'health']:
mask = np.random.random(n) < 0.1
df.loc[mask, col] = np.nan
print(f"缺失率:\n{df.isnull().mean() * 100}")
# MICE 插补
mice = MICEImputer(n_imputations=5, max_iter=10)
imputed_datasets = mice.fit_transform(df)
# 定义估计函数(例如:估计收入均值)
def estimate_mean_income(df):
"""估计收入均值及其方差"""
mean = df['income'].mean()
var = df['income'].var() / len(df) # 均值的方差
return mean, var
# 使用 Rubin's Rules 合并结果
results = mice.analyze(estimate_mean_income)
print("\n=== MICE + Rubin's Rules 结果 ===")
print(f"合并估计量: {results['estimate']:.2f}")
print(f"标准误: {results['se']:.2f}")
print(f"95% CI: [{results['ci_lower']:.2f}, {results['ci_upper']:.2f}]")
print(f"Within variance (W): {results['W']:.2f}")
print(f"Between variance (B): {results['B']:.2f}")
print(f"Total variance (T): {results['T']:.2f}")
print(f"自由度: {results['nu']:.2f}")
print(f"缺失信息比例 (FMI): {results['fmi']:.4f}")
# 对比完整案例分析
complete_case_mean = df['income'].dropna().mean()
print(f"\n完整案例均值: {complete_case_mean:.2f}")
print(f"差异: {results['estimate'] - complete_case_mean:.2f}")策略 4:缺失指示变量(Missing Indicator)
# 创建缺失指示变量
df['income_missing'] = df['income'].isnull().astype(int)
# 然后填充原变量(如用中位数)
df['income'].fillna(df['income'].median(), inplace=True)
# 回归时同时包含原变量和缺失指示变量
# income_missing 的系数反映缺失本身的效应何时使用:
- 怀疑缺失本身有信息含量(MNAR)
- 缺失率适中(10-30%)
生产级数据清洗类
import pandas as pd
import numpy as np
from scipy import stats
from scipy.stats.mstats import winsorize
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import BayesianRidge
from sklearn.ensemble import IsolationForest
import warnings
warnings.filterwarnings('ignore')
class DataCleaner:
"""
生产级数据清洗管道 - Nobel Prize Standards
功能:
-----
1. 缺失值诊断 (MCAR/MAR/MNAR)
2. MICE 多重插补
3. 异常值检测与处理 (IQR, Z-score, Isolation Forest, Winsorization)
4. 字符串清洗
5. 重复值处理
6. 数据类型统一
7. 完整清洗报告
示例:
-----
>>> cleaner = DataCleaner(df, name='CFPS 2018')
>>> cleaner.diagnose_missing()
>>> cleaner.impute_mice(n_imputations=5)
>>> cleaner.handle_outliers(method='winsorize')
>>> df_clean = cleaner.get_clean_data()
>>> cleaner.generate_cleaning_report()
"""
def __init__(self, df, name='Dataset'):
"""
初始化清洗器
参数:
------
df : pd.DataFrame
原始数据框
name : str
数据集名称
"""
self.df_original = df.copy()
self.df = df.copy()
self.name = name
self.cleaning_log = []
self.missing_diagnostics = {}
self.outlier_diagnostics = {}
self._log(f"初始化 DataCleaner: {name}")
self._log(f" 形状: {self.df.shape[0]} 行 × {self.df.shape[1]} 列")
def _log(self, message):
"""记录清洗步骤"""
self.cleaning_log.append(message)
print(message)
def diagnose_missing(self, plot=True):
"""
诊断缺失机制: MCAR/MAR/MNAR
步骤:
-----
1. Little's MCAR Test
2. Missing pattern analysis
3. Correlation heatmap
4. Missingness correlation with observed variables
"""
self._log("\n" + "="*70)
self._log("缺失值诊断")
self._log("="*70)
# 基本统计
missing_count = self.df.isnull().sum()
missing_pct = self.df.isnull().mean() * 100
missing_summary = pd.DataFrame({
'缺失数': missing_count,
'缺失率(%)': missing_pct
})
missing_summary = missing_summary[missing_summary['缺失数'] > 0].sort_values(
'缺失率(%)', ascending=False
)
self._log(f"\n【1】缺失值概览")
self._log(f" 总缺失值: {self.df.isnull().sum().sum()}")
self._log(f" 含缺失的行: {self.df.isnull().any(axis=1).sum()} ({self.df.isnull().any(axis=1).mean()*100:.2f}%)")
self._log(f"\n{missing_summary.to_string()}")
# Little's MCAR Test
self._log(f"\n【2】Little's MCAR Test")
try:
chi2, p_value, df_test = self._littles_mcar_test()
self._log(f" Chi-square = {chi2:.4f}")
self._log(f" df = {df_test}")
self._log(f" p-value = {p_value:.4f}")
if p_value > 0.05:
self._log(f" 结论: 不能拒绝 MCAR (p > 0.05)")
self.missing_diagnostics['mechanism'] = 'MCAR'
else:
self._log(f" 结论: 拒绝 MCAR (p < 0.05), 数据可能是 MAR 或 MNAR")
self.missing_diagnostics['mechanism'] = 'MAR/MNAR'
except Exception as e:
self._log(f" 无法执行 Little's MCAR Test: {e}")
# 缺失模式分析
self._log(f"\n【3】缺失模式分析")
missing_patterns = self.df.isnull().astype(int)
unique_patterns = missing_patterns.value_counts()
self._log(f" 唯一缺失模式数: {len(unique_patterns)}")
self._log(f" Top 5 缺失模式:")
for i, (pattern, count) in enumerate(unique_patterns.head(5).items(), 1):
self._log(f" 模式 {i}: {count} 行 ({count/len(self.df)*100:.2f}%)")
# 缺失相关性分析
self._log(f"\n【4】缺失与观测变量的相关性")
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if self.df[col].isnull().any():
# 创建缺失指示变量
missing_indicator = self.df[col].isnull().astype(int)
# 计算与其他数值变量的相关性
correlations = []
for other_col in numeric_cols:
if other_col != col and not self.df[other_col].isnull().all():
try:
corr = missing_indicator.corr(self.df[other_col])
if abs(corr) > 0.1: # 只显示显著相关
correlations.append((other_col, corr))
except:
pass
if correlations:
self._log(f"\n {col} 的缺失与以下变量相关:")
for other_col, corr in sorted(correlations, key=lambda x: abs(x[1]), reverse=True)[:3]:
self._log(f" - {other_col}: {corr:.3f}")
# 可视化
if plot:
self._plot_missing_patterns()
self.missing_diagnostics['summary'] = missing_summary
return missing_summary
def _littles_mcar_test(self):
"""Little's MCAR Test 实现"""
from scipy.linalg import inv
data = self.df.copy()
data = data.dropna(axis=1, how='all')
missing_patterns = data.isnull().astype(int)
unique_patterns = missing_patterns.drop_duplicates()
numeric_data = data.select_dtypes(include=[np.number])
complete_cases = numeric_data.dropna()
if len(complete_cases) < 2:
return np.nan, np.nan, np.nan
grand_mean = complete_cases.mean().values
grand_cov = complete_cases.cov().values
chi2_stat = 0
total_df = 0
for idx, pattern in unique_patterns.iterrows():
mask = (missing_patterns == pattern).all(axis=1)
pattern_data = numeric_data[mask]
if len(pattern_data) == 0:
continue
observed_vars = ~pattern[numeric_data.columns].values
if observed_vars.sum() == 0:
continue
pattern_mean = pattern_data.iloc[:, observed_vars].mean().values
sub_cov = grand_cov[np.ix_(observed_vars, observed_vars)]
if np.linalg.matrix_rank(sub_cov) < len(pattern_mean):
continue
sub_mean = grand_mean[observed_vars]
diff = pattern_mean - sub_mean
n_j = len(pattern_data)
try:
chi2_j = n_j * diff @ inv(sub_cov) @ diff
chi2_stat += chi2_j
total_df += len(pattern_mean)
except np.linalg.LinAlgError:
continue
n_patterns = len(unique_patterns)
n_vars = len(numeric_data.columns)
df = (n_patterns - 1) * n_vars
p_value = 1 - stats.chi2.cdf(chi2_stat, df) if df > 0 else np.nan
return chi2_stat, p_value, df
def _plot_missing_patterns(self):
"""可视化缺失模式"""
try:
import missingno as msno
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# 缺失矩阵
msno.matrix(self.df, ax=axes[0], fontsize=10)
axes[0].set_title(f'{self.name} - 缺失模式矩阵', fontsize=12)
# 缺失热图
msno.heatmap(self.df, ax=axes[1], fontsize=10)
axes[1].set_title(f'{self.name} - 缺失相关性', fontsize=12)
plt.tight_layout()
plt.show()
except ImportError:
self._log(" 提示: 安装 missingno 以可视化缺失模式 (pip install missingno)")
def impute_mice(self, n_imputations=5, max_iter=10, use_rubin=True):
"""
MICE (Multivariate Imputation by Chained Equations) 完整实现
参数:
------
n_imputations : int
生成的插补数据集数量
max_iter : int
每次插补的最大迭代次数
use_rubin : bool
是否使用 Rubin's Rules 合并结果
返回:
------
imputed_datasets : list (如果 use_rubin=False)
或 合并后的数据框 (如果 use_rubin=True)
"""
self._log("\n" + "="*70)
self._log(f"MICE 多重插补 (m={n_imputations}, max_iter={max_iter})")
self._log("="*70)
# 只对数值列插补
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
non_numeric_cols = self.df.select_dtypes(exclude=[np.number]).columns
# 保存非数值列
df_non_numeric = self.df[non_numeric_cols].copy()
imputed_datasets = []
for i in range(n_imputations):
self._log(f"\n 插补数据集 {i+1}/{n_imputations}")
imputer = IterativeImputer(
estimator=BayesianRidge(),
max_iter=max_iter,
random_state=42 + i,
verbose=0
)
X_imputed = imputer.fit_transform(self.df[numeric_cols])
df_imputed = pd.DataFrame(
X_imputed,
columns=numeric_cols,
index=self.df.index
)
# 合并非数值列
df_imputed = pd.concat([df_imputed, df_non_numeric], axis=1)
imputed_datasets.append(df_imputed)
self._log(f" 收敛状态: 完成")
if use_rubin:
# 使用 Rubin's Rules 合并
self._log(f"\n 使用 Rubin's Rules 合并 {n_imputations} 个数据集")
# 对每个数值变量取平均(简化版 Rubin's Rules)
df_pooled = pd.DataFrame(index=self.df.index)
for col in numeric_cols:
values = np.array([df[col].values for df in imputed_datasets])
df_pooled[col] = values.mean(axis=0)
# 合并非数值列
df_pooled = pd.concat([df_pooled, df_non_numeric], axis=1)
self.df = df_pooled
self._log(f" 已合并为单一数据集")
return df_pooled
else:
self._log(f" 返回 {n_imputations} 个独立插补数据集")
return imputed_datasets
def handle_outliers(self, method='winsorize', limits=[0.05, 0.05],
threshold=3, contamination=0.1):
"""
异常值稳健处理
参数:
------
method : str
处理方法: 'winsorize', 'iqr', 'zscore', 'isolation_forest'
limits : list
Winsorization 的上下限比例 [lower, upper]
threshold : float
Z-score 方法的阈值
contamination : float
Isolation Forest 的异常值比例
"""
self._log("\n" + "="*70)
self._log(f"异常值处理 (方法: {method})")
self._log("="*70)
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
outliers_summary = []
for col in numeric_cols:
if self.df[col].std() == 0:
continue
before_min = self.df[col].min()
before_max = self.df[col].max()
before_mean = self.df[col].mean()
if method == 'winsorize':
self.df[col] = winsorize(self.df[col], limits=limits)
elif method == 'iqr':
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers_mask = (self.df[col] < lower_bound) | (self.df[col] > upper_bound)
n_outliers = outliers_mask.sum()
# 删除异常值
self.df = self.df[~outliers_mask]
outliers_summary.append({
'变量': col,
'方法': 'IQR',
'异常值数': n_outliers,
'下界': lower_bound,
'上界': upper_bound
})
elif method == 'zscore':
z_scores = np.abs(stats.zscore(self.df[col]))
outliers_mask = z_scores > threshold
n_outliers = outliers_mask.sum()
# 删除异常值
self.df = self.df[~outliers_mask]
outliers_summary.append({
'变量': col,
'方法': 'Z-score',
'异常值数': n_outliers,
'阈值': threshold
})
elif method == 'isolation_forest':
# 使用 Isolation Forest
iso_forest = IsolationForest(
contamination=contamination,
random_state=42
)
outliers_mask = iso_forest.fit_predict(
self.df[[col]].values.reshape(-1, 1)
) == -1
n_outliers = outliers_mask.sum()
# 删除异常值
self.df = self.df[~outliers_mask]
outliers_summary.append({
'变量': col,
'方法': 'Isolation Forest',
'异常值数': n_outliers,
'污染率': contamination
})
after_min = self.df[col].min()
after_max = self.df[col].max()
after_mean = self.df[col].mean()
if method == 'winsorize':
self._log(f"\n {col}:")
self._log(f" 处理前: [{before_min:.2f}, {before_max:.2f}], 均值 = {before_mean:.2f}")
self._log(f" 处理后: [{after_min:.2f}, {after_max:.2f}], 均值 = {after_mean:.2f}")
if outliers_summary:
self._log(f"\n 异常值处理汇总:")
for item in outliers_summary:
self._log(f" {item}")
self.outlier_diagnostics = {
'method': method,
'summary': outliers_summary
}
def clean_strings(self):
"""
字符串清洗
步骤:
-----
1. Strip whitespace
2. Normalize case
3. Remove special characters
"""
self._log("\n" + "="*70)
self._log("字符串清洗")
self._log("="*70)
string_cols = self.df.select_dtypes(include=['object']).columns
for col in string_cols:
# 去除空格
self.df[col] = self.df[col].str.strip()
# 统一小写
self.df[col] = self.df[col].str.lower()
self._log(f" {col}: 已清洗 (去空格 + 小写)")
self._log(f"\n 共清洗 {len(string_cols)} 个字符串列")
def remove_duplicates(self, subset=None, keep='first'):
"""
删除重复值
参数:
------
subset : list
检查重复的列
keep : str
保留策略: 'first', 'last', False
"""
self._log("\n" + "="*70)
self._log("删除重复值")
self._log("="*70)
before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset, keep=keep)
after = len(self.df)
self._log(f" 删除 {before - after} 行重复记录 ({(before-after)/before*100:.2f}%)")
def get_clean_data(self):
"""返回清洗后的数据"""
return self.df.copy()
def generate_cleaning_report(self):
"""
生成完整清洗报告
"""
print("\n" + "="*70)
print(f"{self.name} - 数据清洗报告")
print("="*70)
print(f"\n【1】数据概览")
print(f" 原始数据: {self.df_original.shape[0]} 行 × {self.df_original.shape[1]} 列")
print(f" 清洗后: {self.df.shape[0]} 行 × {self.df.shape[1]} 列")
print(f" 删除行数: {self.df_original.shape[0] - self.df.shape[0]} ({(self.df_original.shape[0] - self.df.shape[0])/self.df_original.shape[0]*100:.2f}%)")
print(f"\n【2】缺失值统计")
original_missing = self.df_original.isnull().sum().sum()
final_missing = self.df.isnull().sum().sum()
print(f" 原始缺失值: {original_missing}")
print(f" 清洗后缺失值: {final_missing}")
print(f" 处理缺失值: {original_missing - final_missing}")
if self.missing_diagnostics:
print(f" 缺失机制: {self.missing_diagnostics.get('mechanism', 'Unknown')}")
print(f"\n【3】异常值处理")
if self.outlier_diagnostics:
print(f" 方法: {self.outlier_diagnostics.get('method', 'None')}")
summary = self.outlier_diagnostics.get('summary', [])
if summary:
total_outliers = sum(item['异常值数'] for item in summary)
print(f" 总异常值: {total_outliers}")
print(f"\n【4】数据质量指标")
print(f" 完整性: {(1 - self.df.isnull().mean().mean())*100:.2f}%")
print(f" 重复率: {self.df.duplicated().mean()*100:.2f}%")
print(f"\n【5】清洗步骤日志")
print(f" 共执行 {len(self.cleaning_log)} 个步骤")
print("="*70)
# 返回详细日志
return {
'original_shape': self.df_original.shape,
'final_shape': self.df.shape,
'rows_removed': self.df_original.shape[0] - self.df.shape[0],
'missing_handled': original_missing - final_missing,
'cleaning_log': self.cleaning_log,
'missing_diagnostics': self.missing_diagnostics,
'outlier_diagnostics': self.outlier_diagnostics
}
# 使用示例
if __name__ == "__main__":
# 创建模拟数据
np.random.seed(42)
n = 1000
df_raw = pd.DataFrame({
'id': range(1, n+1),
'age': np.random.normal(40, 10, n),
'education': np.random.normal(12, 3, n),
'income': np.random.normal(50000, 20000, n),
'health': np.random.normal(70, 15, n),
'region': np.random.choice(['east', 'west', 'north', 'south'], n),
'gender': np.random.choice(['male', 'female'], n)
})
# 引入缺失值
df_raw.loc[df_raw['income'] > 60000, 'income'] = np.where(
np.random.random(sum(df_raw['income'] > 60000)) < 0.3,
np.nan,
df_raw.loc[df_raw['income'] > 60000, 'income']
)
for col in ['age', 'education', 'health']:
mask = np.random.random(n) < 0.1
df_raw.loc[mask, col] = np.nan
# 引入异常值
df_raw.loc[np.random.choice(n, 10), 'age'] = -5
df_raw.loc[np.random.choice(n, 10), 'income'] = 999999
# 使用 DataCleaner
cleaner = DataCleaner(df_raw, name='模拟数据集')
# 诊断缺失
cleaner.diagnose_missing(plot=False)
# MICE 插补
cleaner.impute_mice(n_imputations=5, max_iter=10)
# 处理异常值
cleaner.handle_outliers(method='winsorize', limits=[0.05, 0.05])
# 清洗字符串
cleaner.clean_strings()
# 删除重复
cleaner.remove_duplicates()
# 生成报告
report = cleaner.generate_cleaning_report()
# 获取清洗后的数据
df_clean = cleaner.get_clean_data()异常值处理
什么是异常值?
异常值(Outlier):与大部分数据显著不同的观测值
两种类型:
错误值(Data Errors):录入错误、测量错误
- 例如:年龄 = -5,收入 = 999999999
- 处理:删除或修正
真实极端值(True Extremes):真实但罕见的观测
- 例如:CEO 年薪、亿万富翁财富
- 处理:保留、Winsorize、对数转换
识别异常值的方法
方法 1:箱线图(Box Plot)
import matplotlib.pyplot as plt
import seaborn as sns
# 单变量箱线图
plt.figure(figsize=(8, 6))
sns.boxplot(y=df['income'])
plt.title('收入分布(箱线图)')
plt.show()
# 多变量对比
plt.figure(figsize=(12, 6))
df[['age', 'income', 'education']].boxplot()
plt.ylabel('值')
plt.title('多变量箱线图')
plt.show()方法 2:IQR 方法(四分位距)
def detect_outliers_iqr(df, col):
"""
使用 IQR 方法检测异常值
"""
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
# 定义异常值边界
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 识别异常值
outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
print(f"列: {col}")
print(f" Q1: {Q1:.2f}")
print(f" Q3: {Q3:.2f}")
print(f" IQR: {IQR:.2f}")
print(f" 下界: {lower_bound:.2f}")
print(f" 上界: {upper_bound:.2f}")
print(f" 异常值数量: {len(outliers)} ({100*len(outliers)/len(df):.2f}%)")
return outliers
# 使用示例
income_outliers = detect_outliers_iqr(df, 'income')方法 3:Z-score 方法
对于正态分布, 的数据在 范围内:
from scipy import stats
def detect_outliers_zscore(df, col, threshold=3):
"""
使用 Z-score 方法检测异常值
threshold: 通常用 3(99.7% 数据在 ±3σ 内)
"""
z_scores = np.abs(stats.zscore(df[col].dropna()))
outliers = df[z_scores > threshold]
print(f"列: {col}")
print(f" 均值: {df[col].mean():.2f}")
print(f" 标准差: {df[col].std():.2f}")
print(f" 阈值: ±{threshold}σ")
print(f" 异常值数量: {len(outliers)} ({100*len(outliers)/len(df):.2f}%)")
return outliers
# 使用示例
age_outliers = detect_outliers_zscore(df, 'age', threshold=3)方法 4:Isolation Forest (机器学习方法)
Isolation Forest 基于决策树隔离异常值:
from sklearn.ensemble import IsolationForest
def detect_outliers_isolation_forest(df, cols, contamination=0.1):
"""
使用 Isolation Forest 检测多变量异常值
参数:
------
contamination : float
预期异常值比例
"""
iso_forest = IsolationForest(
contamination=contamination,
random_state=42
)
# 预测(-1 表示异常值)
outliers_mask = iso_forest.fit_predict(df[cols]) == -1
print(f"Isolation Forest:")
print(f" 异常值数量: {outliers_mask.sum()} ({outliers_mask.mean()*100:.2f}%)")
return df[outliers_mask]
# 使用示例
outliers_multi = detect_outliers_isolation_forest(
df,
['age', 'income', 'education'],
contamination=0.05
)处理异常值的策略
策略 1:删除
# 删除异常值
df_clean = df[~df.index.isin(outliers.index)]
# 删除 income < 0 或 income > 1,000,000 的记录
df_clean = df[(df['income'] > 0) & (df['income'] < 1000000)]何时删除:
- 明显是错误值(如年龄 = -5)
- 样本量足够大,删除后不影响分析
- 异常值是真实的(如亿万富翁)
策略 2:Winsorize(缩尾处理)
from scipy.stats.mstats import winsorize
# 将极端值替换为第 5 和第 95 百分位数
df['income_winsorized'] = winsorize(df['income'], limits=[0.05, 0.05])
# 或者手动实现
lower = df['income'].quantile(0.05)
upper = df['income'].quantile(0.95)
df['income_winsorized'] = df['income'].clip(lower, upper)
print(f"原始范围: {df['income'].min():.0f} - {df['income'].max():.0f}")
print(f"Winsorize 后: {df['income_winsorized'].min():.0f} - {df['income_winsorized'].max():.0f}")优点:
- 保留样本量
- 减少极端值影响
- 保持数据分布形态
策略 3:对数转换(Log Transformation)
对于右偏分布(如收入、财富):
其中 是一个常数(通常 ),防止 。
# 对数转换可以压缩右偏分布
df['log_income'] = np.log(df['income'] + 1)
# 可视化对比
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 原始分布
axes[0].hist(df['income'], bins=50, edgecolor='black')
axes[0].set_title('原始收入分布(右偏)')
axes[0].set_xlabel('收入')
# 对数转换后
axes[1].hist(df['log_income'], bins=50, edgecolor='black', color='green')
axes[1].set_title('对数收入分布(接近正态)')
axes[1].set_xlabel('log(收入)')
plt.tight_layout()
plt.show()策略 4:Cap(截断)
# 设置上下限
df['income_capped'] = df['income'].clip(lower=0, upper=500000)
# 或者用百分位数
lower = df['income'].quantile(0.01)
upper = df['income'].quantile(0.99)
df['income_capped'] = df['income'].clip(lower, upper)重复值处理
检测重复值
# 检测完全重复的行
duplicates = df.duplicated()
print(f"重复行数: {duplicates.sum()} ({100*duplicates.mean():.2f}%)")
# 查看重复的行
print(df[duplicates])
# 检测特定列的重复
duplicates_id = df.duplicated(subset=['id'])
print(f"ID 重复数: {duplicates_id.sum()}")
# 查看重复的 ID
duplicate_ids = df[duplicates_id]['id'].unique()
print(f"重复的 ID: {duplicate_ids}")删除重复值
# 删除完全重复的行(保留第一次出现)
df_clean = df.drop_duplicates()
# 保留最后一次出现
df_clean = df.drop_duplicates(keep='last')
# 删除所有重复(一个都不保留)
df_clean = df.drop_duplicates(keep=False)
# 基于特定列删除重复
df_clean = df.drop_duplicates(subset=['id'], keep='first')
# 基于多列组合删除重复
df_clean = df.drop_duplicates(subset=['person_id', 'year'], keep='first')处理"准重复"(Fuzzy Duplicates)
# 示例:姓名略有差异但实际是同一人
from fuzzywuzzy import fuzz
def find_fuzzy_duplicates(df, col, threshold=90):
"""
查找模糊匹配的重复值
"""
values = df[col].unique()
duplicates = []
for i, val1 in enumerate(values):
for val2 in values[i+1:]:
similarity = fuzz.ratio(str(val1), str(val2))
if similarity >= threshold:
duplicates.append((val1, val2, similarity))
return pd.DataFrame(duplicates, columns=['值1', '值2', '相似度'])
# 示例:姓名匹配
fuzzy_dups = find_fuzzy_duplicates(df, 'name', threshold=90)
print(fuzzy_dups)数据类型与格式统一
数据类型转换
# 1. 转换为数值类型
df['age'] = pd.to_numeric(df['age'], errors='coerce') # 无法转换的变为 NaN
# 2. 转换为字符串
df['id'] = df['id'].astype(str)
# 3. 转换为分类类型(节省内存)
df['region'] = df['region'].astype('category')
# 4. 转换为日期类型
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d', errors='coerce')
# 5. 转换为布尔类型
df['employed'] = df['employed'].astype(bool)字符串清洗
# 去除空格
df['name'] = df['name'].str.strip() # 前后空格
df['name'] = df['name'].str.lstrip() # 左侧空格
df['name'] = df['name'].str.rstrip() # 右侧空格
# 统一大小写
df['gender'] = df['gender'].str.lower() # 全部小写
df['gender'] = df['gender'].str.upper() # 全部大写
df['name'] = df['name'].str.title() # 首字母大写
# 替换字符
df['phone'] = df['phone'].str.replace('-', '') # 删除连字符
df['income'] = df['income'].str.replace(',', '') # 删除千分位逗号
df['income'] = df['income'].str.replace('$', '') # 删除美元符号
# 提取数字
df['income_num'] = df['income'].str.extract(r'(\d+)').astype(float)日期处理
# 解析日期
df['date'] = pd.to_datetime(df['date'])
# 处理多种日期格式
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d', errors='coerce')
# 提取日期组成部分
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['weekday'] = df['date'].dt.day_name()
# 计算日期差
df['days_since'] = (pd.Timestamp.now() - df['date']).dt.daysHeckman 样本选择模型
理论基础
问题: 当样本选择是非随机的(MNAR),直接删除缺失值会导致选择偏误。
Heckman Two-Step 模型 (Heckman, 1979):
Step 1: Selection Equation (Probit)
其中:
- 表示观测到
Step 2: Outcome Equation (OLS + IMR)
关键: 假设 联合正态:
Inverse Mills Ratio
其中:
- 是标准正态 PDF
- 是标准正态 CDF
修正后的回归
Python 实现
from scipy.stats import norm
from sklearn.linear_model import LogisticRegression, LinearRegression
import statsmodels.api as sm
class HeckmanCorrection:
"""
Heckman Two-Step Selection Model
用于处理样本选择偏误(MNAR)
示例:
-----
>>> heckman = HeckmanCorrection()
>>> heckman.fit(
... selection_data=df,
... selection_formula='selected ~ age + education',
... outcome_data=df[df['selected'] == 1],
... outcome_formula='income ~ age + education'
... )
>>> results = heckman.summary()
"""
def __init__(self):
self.probit_model = None
self.ols_model = None
self.lambda_coef = None
self.rho = None
def fit(self, df, selection_vars, outcome_vars, selection_indicator):
"""
拟合 Heckman 模型
参数:
------
df : pd.DataFrame
完整数据框
selection_vars : list
选择方程的自变量
outcome_vars : list
结果方程的自变量
selection_indicator : str
选择指示变量名(1=观测到, 0=未观测到)
"""
print("="*70)
print("Heckman Two-Step Selection Model")
print("="*70)
# Step 1: Probit for selection
print("\n【Step 1】Selection Equation (Probit)")
X_selection = df[selection_vars]
y_selection = df[selection_indicator]
# 使用 statsmodels Probit
X_selection_const = sm.add_constant(X_selection)
probit = sm.Probit(y_selection, X_selection_const)
self.probit_model = probit.fit(disp=False)
print(self.probit_model.summary())
# 计算 Inverse Mills Ratio
linear_pred = self.probit_model.predict(X_selection_const)
# 只对选中的样本
selected_mask = (y_selection == 1)
# IMR = phi(z) / Phi(z)
imr = norm.pdf(linear_pred[selected_mask]) / norm.cdf(linear_pred[selected_mask])
# Step 2: OLS with IMR
print("\n【Step 2】Outcome Equation (OLS + IMR)")
df_selected = df[selected_mask].copy()
df_selected['imr'] = imr.values
# 结果方程
X_outcome = df_selected[outcome_vars + ['imr']]
y_outcome = df_selected['outcome']
X_outcome_const = sm.add_constant(X_outcome)
ols = sm.OLS(y_outcome, X_outcome_const)
self.ols_model = ols.fit()
print(self.ols_model.summary())
# 提取 IMR 系数
self.lambda_coef = self.ols_model.params['imr']
print(f"\n【结果】")
print(f" Inverse Mills Ratio 系数: {self.lambda_coef:.4f}")
if abs(self.lambda_coef) > 0.1:
print(f" ️ 存在显著的样本选择偏误!")
else:
print(f" 样本选择偏误较小")
return self
def predict(self, X_new, with_correction=True):
"""
预测(可选是否使用 Heckman 修正)
"""
if with_correction:
# 使用修正后的模型
return self.ols_model.predict(sm.add_constant(X_new))
else:
# 忽略 IMR,使用普通 OLS
return self.ols_model.predict(sm.add_constant(X_new))
# 使用示例
if __name__ == "__main__":
# 模拟数据:女性工资数据(已婚女性更可能不工作)
np.random.seed(42)
n = 2000
df = pd.DataFrame({
'age': np.random.normal(35, 8, n),
'education': np.random.normal(13, 2, n),
'married': np.random.binomial(1, 0.5, n),
'children': np.random.poisson(1, n)
})
# 选择方程:工作概率
# 已婚、有孩子的女性更可能不工作
work_prob = 1 / (1 + np.exp(
-(0.5 - 0.3 * df['married'] - 0.2 * df['children'] + 0.05 * df['education'])
))
df['working'] = np.random.binomial(1, work_prob)
# 结果方程:工资(只对工作的人观测到)
df['wage'] = np.nan
working_mask = df['working'] == 1
df.loc[working_mask, 'wage'] = (
5000 +
200 * df.loc[working_mask, 'age'] +
1000 * df.loc[working_mask, 'education'] +
np.random.normal(0, 2000, working_mask.sum())
)
print(f"工作人数: {df['working'].sum()} / {n} ({df['working'].mean()*100:.1f}%)")
print(f"观测到工资: {df['wage'].notna().sum()}")
# Heckman 模型
df_heckman = df.copy()
df_heckman['outcome'] = df_heckman['wage']
heckman = HeckmanCorrection()
heckman.fit(
df=df_heckman,
selection_vars=['age', 'education', 'married', 'children'],
outcome_vars=['age', 'education'],
selection_indicator='working'
)真实案例:CFPS 2018 数据清洗
案例背景
数据: 中国家庭追踪调查(CFPS) 2018
研究问题: 教育对收入的影响
挑战:
- 收入缺失率 30%
- 高收入者更可能拒绝回答(MAR/MNAR)
- 存在极端值(超高收入)
完整清洗流程
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# 模拟 CFPS 2018 数据
np.random.seed(42)
n = 5000
# 创建模拟数据
cfps = pd.DataFrame({
'pid': range(1, n+1),
'age': np.random.normal(42, 12, n),
'education': np.random.choice([6, 9, 12, 16], n, p=[0.3, 0.2, 0.3, 0.2]),
'gender': np.random.choice(['男', '女'], n),
'urban': np.random.choice([0, 1], n, p=[0.4, 0.6]),
'province': np.random.choice([
'北京', '上海', '广东', '浙江', '江苏', '山东', '河南', '四川', '湖北', '湖南'
], n),
'health': np.random.choice([1, 2, 3, 4, 5], n, p=[0.05, 0.15, 0.4, 0.3, 0.1])
})
# 生成收入(基于教育、年龄等)
cfps['income'] = (
5000 +
2000 * cfps['education'] +
300 * cfps['age'] +
5000 * cfps['urban'] +
10000 * (cfps['gender'] == '男').astype(int) +
np.random.lognormal(8, 1.5, n)
)
# 引入 MAR 缺失(高收入、高教育者更可能缺失)
missing_prob = 1 / (1 + np.exp(
-(cfps['income'] / 50000 - 2 + 0.1 * cfps['education'])
))
missing_mask = np.random.random(n) < missing_prob
cfps.loc[missing_mask, 'income'] = np.nan
print(f"收入缺失率: {cfps['income'].isnull().mean()*100:.2f}%")
# 引入其他缺失
for col in ['age', 'education', 'health']:
mask = np.random.random(n) < 0.05
cfps.loc[mask, col] = np.nan
# 引入异常值
cfps.loc[np.random.choice(n, 20, replace=False), 'income'] = np.random.uniform(
500000, 1000000, 20
)
cfps.loc[np.random.choice(n, 10, replace=False), 'age'] = -99
# 引入重复
cfps = pd.concat([cfps, cfps.sample(50)], ignore_index=True)
print(f"\n原始数据: {cfps.shape}")
print(f"\n前5行:")
print(cfps.head())
# 使用 DataCleaner
print("\n" + "="*70)
print("开始数据清洗")
print("="*70)
cleaner = DataCleaner(cfps, name='CFPS 2018')
# 1. 诊断缺失
cleaner.diagnose_missing(plot=False)
# 2. 删除重复
cleaner.remove_duplicates(subset=['pid'], keep='first')
# 3. 处理明显错误值
print("\n" + "="*70)
print("处理错误值")
print("="*70)
# 年龄范围: 16-100
before = len(cleaner.df)
cleaner.df = cleaner.df[(cleaner.df['age'] >= 16) & (cleaner.df['age'] <= 100)]
print(f" 删除年龄异常值: {before - len(cleaner.df)} 行")
# 收入 > 0
before = len(cleaner.df)
cleaner.df = cleaner.df[(cleaner.df['income'] > 0) | (cleaner.df['income'].isnull())]
print(f" 删除负收入: {before - len(cleaner.df)} 行")
# 4. MICE 插补
cleaner.impute_mice(n_imputations=5, max_iter=10, use_rubin=True)
# 5. 处理异常值
cleaner.handle_outliers(method='winsorize', limits=[0.01, 0.01])
# 6. 清洗字符串
cleaner.clean_strings()
# 7. 生成报告
report = cleaner.generate_cleaning_report()
# 获取清洗后的数据
cfps_clean = cleaner.get_clean_data()
print(f"\n清洗后数据: {cfps_clean.shape}")
print(f"\n描述性统计:")
print(cfps_clean[['age', 'education', 'income']].describe())
# 8. 对比完整案例删除 vs MICE 插补
print("\n" + "="*70)
print("完整案例删除 vs MICE 插补对比")
print("="*70)
# 完整案例
cfps_complete = cfps.dropna(subset=['income'])
# 估计教育回报
from sklearn.linear_model import LinearRegression
# 完整案例估计
X_complete = cfps_complete[['education', 'age', 'urban']]
y_complete = cfps_complete['income']
model_complete = LinearRegression().fit(X_complete, y_complete)
beta_complete = model_complete.coef_[0]
# MICE 插补估计
X_mice = cfps_clean[['education', 'age', 'urban']]
y_mice = cfps_clean['income']
model_mice = LinearRegression().fit(X_mice, y_mice)
beta_mice = model_mice.coef_[0]
print(f"\n教育回报估计:")
print(f" 完整案例删除: {beta_complete:.2f}")
print(f" MICE 插补: {beta_mice:.2f}")
print(f" 差异: {beta_mice - beta_complete:.2f} ({(beta_mice/beta_complete - 1)*100:.2f}%)")
# 9. 可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 原始 vs 清洗后的收入分布
axes[0, 0].hist(cfps['income'].dropna(), bins=50, alpha=0.5, label='原始', edgecolor='black')
axes[0, 0].hist(cfps_clean['income'], bins=50, alpha=0.5, label='清洗后', edgecolor='black')
axes[0, 0].set_title('收入分布对比')
axes[0, 0].set_xlabel('收入')
axes[0, 0].legend()
# 年龄分布
axes[0, 1].hist(cfps_clean['age'], bins=30, edgecolor='black', color='green')
axes[0, 1].set_title('年龄分布(清洗后)')
axes[0, 1].set_xlabel('年龄')
# 教育分布
cfps_clean['education'].value_counts().sort_index().plot(kind='bar', ax=axes[1, 0], color='orange')
axes[1, 0].set_title('教育分布(清洗后)')
axes[1, 0].set_xlabel('教育年限')
axes[1, 0].set_ylabel('频数')
# 收入 vs 教育
axes[1, 1].scatter(cfps_clean['education'], cfps_clean['income'], alpha=0.3, s=10)
axes[1, 1].set_title('教育与收入关系')
axes[1, 1].set_xlabel('教育年限')
axes[1, 1].set_ylabel('收入')
plt.tight_layout()
plt.show()
print("\n 数据清洗完成!")敏感性分析
print("\n" + "="*70)
print("敏感性分析:缺失机制假设")
print("="*70)
# 假设 1: MCAR(删除)
cfps_mcar = cfps.dropna()
X_mcar = cfps_mcar[['education', 'age', 'urban']]
y_mcar = cfps_mcar['income']
model_mcar = LinearRegression().fit(X_mcar, y_mcar)
beta_mcar = model_mcar.coef_[0]
# 假设 2: MAR(MICE)
beta_mar = beta_mice
# 假设 3: MNAR(Heckman)
# 简化:假设收入缺失与真实收入相关
# (实际应用中需要更复杂的建模)
print(f"\n教育回报估计(敏感性分析):")
print(f" MCAR 假设(删除): {beta_mcar:.2f}")
print(f" MAR 假设(MICE): {beta_mar:.2f}")
print(f" 差异: {beta_mar - beta_mcar:.2f} ({abs(beta_mar - beta_mcar) / beta_mcar * 100:.2f}%)")
print(f"\n结论:")
if abs(beta_mar - beta_mcar) / beta_mcar < 0.05:
print(f" 结果对缺失机制假设不敏感(差异 < 5%)")
else:
print(f" ️ 结果对缺失机制假设敏感(差异 ≥ 5%)")
print(f" 建议: 进一步调查缺失机制,可能需要 Heckman 修正")练习题
⭐⭐ 基础题:简单均值插补
任务: 给定包含缺失值的数据集,实现简单均值插补。
# 练习数据
np.random.seed(42)
df_practice = pd.DataFrame({
'x1': np.random.normal(50, 10, 100),
'x2': np.random.normal(100, 20, 100),
'x3': np.random.normal(75, 15, 100)
})
# 引入缺失
for col in df_practice.columns:
mask = np.random.random(100) < 0.2
df_practice.loc[mask, col] = np.nan
print("缺失率:")
print(df_practice.isnull().mean())
# TODO: 实现均值插补
# df_imputed = ...
# 验证
# assert df_imputed.isnull().sum().sum() == 0⭐⭐⭐ 中级题:IQR 异常值检测
任务: 实现基于 IQR 的异常值检测函数,并可视化。
# TODO: 实现函数
def detect_and_visualize_outliers(df, col):
"""
检测并可视化异常值
返回:
------
outliers : pd.DataFrame
异常值数据
"""
# 1. IQR 计算
# 2. 识别异常值
# 3. 可视化(箱线图 + 散点图)
pass
# 测试
# outliers = detect_and_visualize_outliers(df, 'income')⭐⭐⭐⭐ 高级题:MICE + Rubin's Rules
任务: 实现完整的 MICE 插补并使用 Rubin's Rules 合并结果。
# TODO: 实现
# 1. MICE 生成 m=5 个插补数据集
# 2. 对每个数据集估计线性回归系数
# 3. 使用 Rubin's Rules 合并
# 4. 计算 95% 置信区间
# 5. 对比完整案例分析⭐⭐⭐⭐⭐ Nobel 级别:Heckman + 敏感性分析
任务: 实现 Heckman 两步法,并进行敏感性分析。
# TODO: 实现
# 1. 模拟女性工资数据(已婚女性更可能不工作)
# 2. Heckman 两步法估计
# 3. 对比普通 OLS(删除缺失)
# 4. 敏感性分析:改变选择方程变量
# 5. 解释 Inverse Mills Ratio 的经济含义小结
数据清洗核心函数
| 任务 | 函数 | 示例 |
|---|---|---|
| 缺失值 | dropna() | 删除缺失 |
fillna() | 填充缺失 | |
IterativeImputer() | MICE 插补 | |
| 异常值 | clip() | 截断 |
winsorize() | 缩尾 | |
IsolationForest() | ML 检测 | |
| 重复值 | duplicated() | 检测重复 |
drop_duplicates() | 删除重复 | |
| 类型转换 | astype() | 类型转换 |
pd.to_numeric() | 转换为数值 | |
pd.to_datetime() | 转换为日期 | |
| 字符串 | str.strip() | 去空格 |
str.lower() | 转小写 | |
str.replace() | 替换字符 |
缺失值处理决策树
开始
│
├─ 缺失率 < 5%?
│ ├─ 是 → 直接删除(dropna)
│ └─ 否 → 继续
│
├─ Little's MCAR Test p > 0.05?
│ ├─ 是 → MCAR,可以删除或简单填充
│ └─ 否 → MAR/MNAR,需要高级方法
│
├─ MAR?
│ ├─ 是 → MICE 多重插补
│ └─ 否 → MNAR
│
└─ MNAR?
├─ 样本选择问题 → Heckman 两步法
├─ 缺失本身有信息 → 缺失指示变量
└─ 不确定 → 敏感性分析数据清洗最佳实践
- 保留原始数据: 使用
df.copy()创建副本 - 记录清洗步骤: 每步都添加注释和日志
- 先探索后清洗: 理解数据后再动手
- 可重复的流程: 编写函数,便于复用
- 验证清洗结果: 清洗后重新检查数据质量
- 敏感性分析: 对比不同清洗策略的影响
- 文档化: 生成清洗报告
理论要点
缺失机制:
- MCAR: 删除不会导致偏误
- MAR: 可以用观测变量建模
- MNAR: 需要额外假设或敏感性分析
MICE:
- 迭代插补,考虑变量间关系
- Rubin's Rules 合并多个插补数据集
- 标准误正确反映不确定性
Heckman 模型:
- 两步法修正样本选择偏误
- Inverse Mills Ratio 量化选择效应
- 需要排他性限制(exclusion restriction)
下一步
下一节我们将学习 变量构造,包括虚拟变量、交互项、滞后项等。
继续前进!
参考资源
经典教材
- Little, R. J., & Rubin, D. B. (2019). Statistical Analysis with Missing Data (3rd Edition). Wiley.
- Heckman, J. J. (1979). "Sample Selection Bias as a Specification Error." Econometrica, 47(1), 153-161.
- van Buuren, S. (2018). Flexible Imputation of Missing Data (2nd Edition). Chapman and Hall/CRC.
实用工具
missingno: 缺失值可视化scikit-learn.impute: 插补算法statsmodels: 统计建模(包括 Heckman)fancyimpute: 高级插补方法