3.4 变量构造
"Feature engineering is the key to machine learning success.""特征工程是机器学习成功的关键。"— Pedro Domingos, Professor at University of Washington (华盛顿大学教授)
从原始变量到分析就绪
本节目标
- 掌握虚拟变量(Dummy Variables)的创建与理论
- 深入理解交互项(Interaction Terms)与边际效应
- 学习动态面板中的滞后算子(Lag Operators)
- 掌握 Mincer 工资方程的完整变量构造
- 理解 DID 变量与处理效应分解
- 构建生产级 VariableConstructor 类
理论基础
变量构造在因果推断中的地位
Angrist & Pischke (2009) 强调:
"Good econometrics is about good research design, and good research design requires careful variable construction."
三大作用:
控制混淆因素(Confounding Control)
- :时间固定效应(虚拟变量)
- :个体固定效应(组均值去中心化)
捕捉异质性效应(Heterogeneous Effects)
- 交互项 捕捉处理效应如何随 变化
构建工具变量(Instrumental Variables)
虚拟变量(Dummy Variables)
理论:分类变量的数值化
数学表示:
设分类变量 有 个类别 ,则创建 个虚拟变量:
回归模型:
系数解释:
- :参照组()的期望值
- :类别 相对于参照组的差异
多重共线性陷阱(Dummy Variable Trap)
完全共线性:
如果包含所有 个虚拟变量,则:
这与截距项(恒为 1)完全共线,导致设计矩阵 不可逆:
解决方案:删除一个类别作为参照组(Reference Category)
方法 1:pd.get_dummies()
import pandas as pd
import numpy as np
# 示例数据
df = pd.DataFrame({
'id': [1, 2, 3, 4, 5],
'education': ['高中', '本科', '硕士', '高中', '博士']
})
# 创建虚拟变量
dummies = pd.get_dummies(df['education'], prefix='edu', drop_first=True)
df = pd.concat([df, dummies], axis=1)
print(df)
# id education edu_博士 edu_本科 edu_硕士
# 0 1 高中 0 0 0
# 1 2 本科 0 1 0
# 2 3 硕士 0 0 1
# 3 4 高中 0 0 0
# 4 5 博士 1 0 0参数说明:
prefix='edu':变量名前缀drop_first=True:删除第一个类别(按字母顺序)作为参照组- 参照组:高中(所有虚拟变量都为 0)
方法 2:手动构造(更灵活)
# 创建单个虚拟变量
df['college'] = (df['education'] == '本科').astype(int)
df['female'] = (df['gender'] == '女').astype(int)
df['high_income'] = (df['income'] > df['income'].median()).astype(int)
# 多个条件(交叉分类)
df['young_female'] = ((df['age'] < 30) & (df['gender'] == '女')).astype(int)
df['urban_college'] = ((df['urban'] == 1) & (df['education'] == '本科')).astype(int)
# 指定参照组
def create_education_dummies(education_series, ref='高中'):
"""创建教育虚拟变量,指定参照组"""
categories = education_series.unique()
dummies = {}
for cat in categories:
if cat != ref:
dummies[f'edu_{cat}'] = (education_series == cat).astype(int)
return pd.DataFrame(dummies)
edu_dummies = create_education_dummies(df['education'], ref='高中')
df = pd.concat([df, edu_dummies], axis=1)回归与解释
import statsmodels.api as sm
# 模拟工资数据
np.random.seed(42)
n = 1000
df = pd.DataFrame({
'education': np.random.choice(['高中', '本科', '硕士'], n),
'experience': np.random.uniform(0, 30, n),
'age': np.random.uniform(22, 65, n)
})
# 生成工资(真实模型)
df['wage'] = (30000 +
5000 * (df['education'] == '本科') +
10000 * (df['education'] == '硕士') +
1000 * df['experience'] +
np.random.normal(0, 5000, n))
# 创建虚拟变量
edu_dummies = pd.get_dummies(df['education'], prefix='edu', drop_first=True)
df = pd.concat([df, edu_dummies], axis=1)
# OLS 回归
X = sm.add_constant(df[['experience', 'edu_本科', 'edu_硕士']])
y = df['wage']
model = sm.OLS(y, X).fit(cov_type='HC3')
print(model.summary().tables[1])系数解释:
- 截距(30127):高中学历、零经验者的预期工资
- 本科系数(5084):本科学历相比高中的工资溢价(Premium)
- 硕士系数(10156):硕士学历相比高中的工资溢价
显著性检验:
假设检验 (本科无溢价)
若 ,则拒绝 ,本科溢价显著。
交互项(Interaction Terms)
理论:边际效应的异质性
研究问题:教育对收入的影响是否因性别而异?
无交互模型(平行斜率):
边际效应:
问题:假设 对 的影响不随 变化
交互模型(非平行斜率):
边际效应:
解释: 对 的影响取决于 的取值
数学推导:Angrist & Pischke (2009) 框架
基本模型(教育 × 性别交互):
分组回归:
男性():
教育回报率:
女性():
教育回报率:
交互项系数解释:
若 ,则女性教育回报率低于男性(性别歧视证据)。
创建交互项
# 1. 连续变量 × 虚拟变量
df['educ_x_female'] = df['education'] * df['female']
# 2. 连续变量 × 连续变量
df['educ_x_experience'] = df['education'] * df['experience']
# 3. 虚拟变量 × 虚拟变量
df['female_x_married'] = df['female'] * df['married']
# 4. 多重交互(三阶)
df['educ_x_exp_x_female'] = df['education'] * df['experience'] * df['female']中心化(Centering):减少多重共线性
问题: 与 高度相关
当 和 都为正值时,相关系数接近 1。
解决方案:中心化
中心化后的模型:
优势:
- 解释:当 处于均值时, 对 的影响
- 解释:当 和 都处于均值时, 的期望值
# 中心化
df['educ_centered'] = df['education'] - df['education'].mean()
df['exp_centered'] = df['experience'] - df['experience'].mean()
# 创建中心化交互项
df['educ_x_exp_centered'] = df['educ_centered'] * df['exp_centered']
# 回归
X = sm.add_constant(df[['educ_centered', 'exp_centered', 'educ_x_exp_centered']])
y = df['log_wage']
model = sm.OLS(y, X).fit()交互项的统计检验
1. Wald 检验
检验 (无交互效应)
# Wald 检验
from scipy import stats
beta3 = model.params['educ_x_exp_centered']
se_beta3 = model.bse['educ_x_exp_centered']
wald_stat = (beta3 / se_beta3) ** 2
p_value = 1 - stats.chi2.cdf(wald_stat, df=1)
print(f"Wald 统计量: {wald_stat:.4f}, p-value: {p_value:.4f}")2. F 检验(多个交互项)
检验
其中:
- :受限模型残差平方和(无交互项)
- :非受限模型残差平方和(有交互项)
- :交互项个数
# F 检验
from statsmodels.iolib.table import SimpleTable
# 受限模型
X_restricted = sm.add_constant(df[['educ_centered', 'exp_centered']])
model_r = sm.OLS(y, X_restricted).fit()
# 非受限模型
X_unrestricted = sm.add_constant(df[['educ_centered', 'exp_centered', 'educ_x_exp_centered']])
model_u = sm.OLS(y, X_unrestricted).fit()
# F 统计量
rss_r = model_r.ssr
rss_u = model_u.ssr
q = 1
n = len(df)
k = X_unrestricted.shape[1] - 1
f_stat = ((rss_r - rss_u) / q) / (rss_u / (n - k - 1))
p_value = 1 - stats.f.cdf(f_stat, q, n - k - 1)
print(f"F 统计量: {f_stat:.4f}, p-value: {p_value:.4f}")可视化交互效应
import matplotlib.pyplot as plt
# 模拟数据
np.random.seed(42)
educ = np.linspace(8, 20, 100)
# 男性和女性的预测工资
wage_male = 8 + 0.08 * educ
wage_female = 8.2 + 0.06 * educ
# 绘图
plt.figure(figsize=(10, 6))
plt.plot(educ, wage_male, label='男性', linewidth=2, color='blue')
plt.plot(educ, wage_female, label='女性', linewidth=2, color='red')
plt.xlabel('教育年限', fontsize=12)
plt.ylabel('对数工资', fontsize=12)
plt.title('教育-收入关系的性别差异(交互效应)', fontsize=14)
plt.legend(fontsize=12)
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()滞后算子(Lag Operators)
理论:动态面板数据模型
滞后算子定义:
性质:
- 线性:
- 乘法:
- 多项式:
差分算子:
应用:
- 单位根检验:(ADF 检验)
- 误差修正模型:
动态面板模型
Arellano-Bond 模型:
问题: 与 相关(内生性)
解决方案:一阶差分 + GMM
使用 作为工具变量。
创建滞后项
# 示例:面板数据
df = pd.DataFrame({
'person_id': [1, 1, 1, 1, 2, 2, 2, 2],
'year': [2017, 2018, 2019, 2020, 2017, 2018, 2019, 2020],
'income': [50000, 52000, 55000, 58000, 60000, 62000, 65000, 68000]
})
# 按个体和时间排序
df = df.sort_values(['person_id', 'year'])
# 创建 1 期滞后
df['income_lag1'] = df.groupby('person_id')['income'].shift(1)
# 创建 2 期滞后
df['income_lag2'] = df.groupby('person_id')['income'].shift(2)
# 创建差分
df['income_diff'] = df.groupby('person_id')['income'].diff()
# 创建对数差分(增长率)
df['log_income'] = np.log(df['income'])
df['income_growth'] = df.groupby('person_id')['log_income'].diff()
print(df)
# person_id year income income_lag1 income_lag2 income_diff log_income income_growth
# 0 1 2017 50000 NaN NaN NaN 10.819778 NaN
# 1 1 2018 52000 50000.0 NaN 2000.0 10.859073 0.039295
# 2 1 2019 55000 52000.0 50000.0 3000.0 10.914785 0.055713
# 3 1 2020 58000 55000.0 52000.0 3000.0 10.968557 0.053771
# 4 2 2017 60000 NaN NaN NaN 11.002100 NaN
# 5 2 2018 62000 60000.0 NaN 2000.0 11.034699 0.032599
# 6 2 2019 65000 62000.0 60000.0 3000.0 11.082414 0.047715
# 7 2 2020 68000 65000.0 62000.0 3000.0 11.127384 0.044970前导项(Lead Variables)
应用:事件研究(Event Study)
检验政策实施前是否有预期效应(Anticipation Effects)
其中 表示事件发生前 期()或后 期()。
# 创建 1 期前导(未来值)
df['income_lead1'] = df.groupby('person_id')['income'].shift(-1)
df['income_lead2'] = df.groupby('person_id')['income'].shift(-2)
# 事件研究:政策实施前后的效应
df['treat_post'] = (df['year'] >= 2019) & (df['treated'] == 1)
df['treat_pre1'] = (df['year'] == 2018) & (df['treated'] == 1)
df['treat_pre2'] = (df['year'] == 2017) & (df['treated'] == 1)分组统计变量(Group Statistics)
理论:控制群体固定效应
Within-Group Variation:
其中 是组固定效应(Group Fixed Effects)。
组均值去中心化(Within Transformation):
去中心化后的模型:
被消除,避免遗漏变量偏误(Omitted Variable Bias)。
方法 1:groupby + transform
# 计算各地区的平均收入
df['region_mean_income'] = df.groupby('region')['income'].transform('mean')
# 计算个体与地区均值的差异(去中心化)
df['income_deviation'] = df['income'] - df['region_mean_income']
# 其他统计量
df['region_median_income'] = df.groupby('region')['income'].transform('median')
df['region_std_income'] = df.groupby('region')['income'].transform('std')
df['region_count'] = df.groupby('region')['income'].transform('count')
df['region_min_income'] = df.groupby('region')['income'].transform('min')
df['region_max_income'] = df.groupby('region')['income'].transform('max')
# 百分位数
df['region_p25_income'] = df.groupby('region')['income'].transform(lambda x: x.quantile(0.25))
df['region_p75_income'] = df.groupby('region')['income'].transform(lambda x: x.quantile(0.75))方法 2:排名与百分位
# 个体在地区内的收入排名(百分位)
df['income_pct_rank'] = df.groupby('region')['income'].transform(
lambda x: x.rank(pct=True)
)
# 解释:0.75 表示该个体收入高于地区内 75% 的人
# 分位数组
df['income_quartile'] = df.groupby('region')['income'].transform(
lambda x: pd.qcut(x, q=4, labels=['Q1', 'Q2', 'Q3', 'Q4'])
)方法 3:滚动窗口统计
# 时间序列:过去 3 年的平均收入
df = df.sort_values(['person_id', 'year'])
df['income_rolling_mean_3y'] = df.groupby('person_id')['income'].transform(
lambda x: x.rolling(window=3, min_periods=1).mean()
)
# 过去 3 年的标准差(收入波动性)
df['income_rolling_std_3y'] = df.groupby('person_id')['income'].transform(
lambda x: x.rolling(window=3, min_periods=1).std()
)
# 过去 3 年的增长率
df['income_growth_3y'] = df.groupby('person_id')['income'].pct_change(periods=3)️ 生产级 VariableConstructor 类
import pandas as pd
import numpy as np
from typing import List, Union, Tuple, Optional
import statsmodels.api as sm
from sklearn.preprocessing import StandardScaler
class VariableConstructor:
"""
生产级变量构造器 - Nobel Prize Standards
功能:
-----
1. 虚拟变量创建 (Dummy Variables)
2. 交互项构造 (Interaction Terms)
3. 滞后算子 (Lag Operators)
4. 分组统计变量 (Group Statistics)
5. Mincer 方程变量集
6. DID 变量构造
7. 处理效应分解
作者:StatsPai Team
参考:Angrist & Pischke (2009), Wooldridge (2010)
"""
def __init__(self, df: pd.DataFrame, id_col: Optional[str] = None,
time_col: Optional[str] = None):
"""
初始化变量构造器
参数:
------
df : pd.DataFrame
原始数据
id_col : str, optional
面板数据的个体标识列
time_col : str, optional
面板数据的时间标识列
"""
self.df = df.copy()
self.id_col = id_col
self.time_col = time_col
self.constructed_vars = []
# 验证面板数据结构
if id_col and time_col:
self._validate_panel_structure()
def _validate_panel_structure(self):
"""验证面板数据结构"""
if self.id_col not in self.df.columns:
raise ValueError(f"id_col '{self.id_col}' 不在数据中")
if self.time_col not in self.df.columns:
raise ValueError(f"time_col '{self.time_col}' 不在数据中")
# 检查是否平衡面板
counts = self.df.groupby(self.id_col)[self.time_col].count()
if counts.nunique() == 1:
print(f" 平衡面板:每个个体有 {counts.iloc[0]} 期观测")
else:
print(f"️ 非平衡面板:观测期数范围 {counts.min()} - {counts.max()}")
def create_dummies(self, var: str, prefix: Optional[str] = None,
drop_first: bool = True, ref_category: Optional[str] = None) -> pd.DataFrame:
"""
创建虚拟变量
参数:
------
var : str
分类变量名
prefix : str, optional
变量名前缀,默认使用 var
drop_first : bool, default=True
是否删除第一个类别(避免多重共线性)
ref_category : str, optional
指定参照组(优先级高于 drop_first)
返回:
------
pd.DataFrame
添加虚拟变量后的数据
示例:
------
>>> constructor.create_dummies('education', prefix='edu', ref_category='高中')
"""
if var not in self.df.columns:
raise ValueError(f"变量 '{var}' 不在数据中")
prefix = prefix or var
if ref_category:
# 指定参照组
categories = self.df[var].unique()
for cat in categories:
if cat != ref_category:
dummy_name = f'{prefix}_{cat}'
self.df[dummy_name] = (self.df[var] == cat).astype(int)
self.constructed_vars.append(dummy_name)
print(f" 创建虚拟变量:参照组 = {ref_category}")
else:
# 使用 pd.get_dummies
dummies = pd.get_dummies(self.df[var], prefix=prefix, drop_first=drop_first)
for col in dummies.columns:
self.df[col] = dummies[col]
self.constructed_vars.append(col)
print(f" 创建虚拟变量:共 {len(dummies.columns)} 个")
return self.df
def create_interactions(self, var1: str, var2: str, center: bool = False,
name: Optional[str] = None) -> pd.DataFrame:
"""
创建交互项
理论:
-----
对于模型:y = β₀ + β₁x₁ + β₂x₂ + β₃(x₁×x₂) + ε
边际效应:
∂y/∂x₁ = β₁ + β₃x₂
参数:
------
var1 : str
第一个变量
var2 : str
第二个变量
center : bool, default=False
是否中心化(减少多重共线性)
name : str, optional
交互项名称,默认为 'var1_x_var2'
返回:
------
pd.DataFrame
添加交互项后的数据
示例:
------
>>> constructor.create_interactions('education', 'female', center=True)
"""
if var1 not in self.df.columns or var2 not in self.df.columns:
raise ValueError(f"变量 '{var1}' 或 '{var2}' 不在数据中")
name = name or f'{var1}_x_{var2}'
if center:
# 中心化
var1_centered = self.df[var1] - self.df[var1].mean()
var2_centered = self.df[var2] - self.df[var2].mean()
self.df[name] = var1_centered * var2_centered
print(f" 创建中心化交互项:{name}")
else:
self.df[name] = self.df[var1] * self.df[var2]
print(f" 创建交互项:{name}")
self.constructed_vars.append(name)
return self.df
def create_lags(self, var: str, lags: Union[int, List[int]] = 1,
fill_method: str = 'drop') -> pd.DataFrame:
"""
创建滞后项(面板数据)
要求:必须设置 id_col 和 time_col
数学:
-----
L(Y_it, k) = Y_{i,t-k}
参数:
------
var : str
变量名
lags : int or list of int
滞后期数,如 1 或 [1, 2, 3]
fill_method : {'drop', 'zero', 'ffill'}
缺失值处理方式
返回:
------
pd.DataFrame
添加滞后项后的数据
示例:
------
>>> constructor.create_lags('income', lags=[1, 2])
"""
if not self.id_col or not self.time_col:
raise ValueError("创建滞后项需要设置 id_col 和 time_col")
if var not in self.df.columns:
raise ValueError(f"变量 '{var}' 不在数据中")
# 确保排序
self.df = self.df.sort_values([self.id_col, self.time_col])
# 转换为列表
if isinstance(lags, int):
lags = [lags]
for lag in lags:
lag_name = f'{var}_lag{lag}'
self.df[lag_name] = self.df.groupby(self.id_col)[var].shift(lag)
# 处理缺失值
if fill_method == 'drop':
pass # 保留 NaN,后续分析时删除
elif fill_method == 'zero':
self.df[lag_name].fillna(0, inplace=True)
elif fill_method == 'ffill':
self.df[lag_name] = self.df.groupby(self.id_col)[lag_name].fillna(method='ffill')
self.constructed_vars.append(lag_name)
print(f" 创建滞后项:{lag_name}")
return self.df
def create_leads(self, var: str, leads: Union[int, List[int]] = 1) -> pd.DataFrame:
"""
创建前导项(面板数据)
应用:事件研究(Event Study)
参数:
------
var : str
变量名
leads : int or list of int
前导期数
返回:
------
pd.DataFrame
添加前导项后的数据
"""
if not self.id_col or not self.time_col:
raise ValueError("创建前导项需要设置 id_col 和 time_col")
if var not in self.df.columns:
raise ValueError(f"变量 '{var}' 不在数据中")
self.df = self.df.sort_values([self.id_col, self.time_col])
if isinstance(leads, int):
leads = [leads]
for lead in leads:
lead_name = f'{var}_lead{lead}'
self.df[lead_name] = self.df.groupby(self.id_col)[var].shift(-lead)
self.constructed_vars.append(lead_name)
print(f" 创建前导项:{lead_name}")
return self.df
def create_group_stats(self, var: str, groupby: Union[str, List[str]],
stats: List[str] = ['mean', 'std']) -> pd.DataFrame:
"""
创建分组统计变量
示例:
-----
- 各省平均工资
- 个体工资与省均值的偏差
参数:
------
var : str
变量名
groupby : str or list of str
分组变量
stats : list of str
统计量,可选:'mean', 'std', 'min', 'max', 'median', 'count'
返回:
------
pd.DataFrame
添加分组统计变量后的数据
示例:
------
>>> constructor.create_group_stats('wage', 'region', stats=['mean', 'std'])
"""
if var not in self.df.columns:
raise ValueError(f"变量 '{var}' 不在数据中")
if isinstance(groupby, str):
groupby = [groupby]
for gb in groupby:
if gb not in self.df.columns:
raise ValueError(f"分组变量 '{gb}' 不在数据中")
group_name = '_'.join(groupby)
for stat in stats:
stat_name = f'{group_name}_{stat}_{var}'
if stat == 'mean':
self.df[stat_name] = self.df.groupby(groupby)[var].transform('mean')
elif stat == 'std':
self.df[stat_name] = self.df.groupby(groupby)[var].transform('std')
elif stat == 'min':
self.df[stat_name] = self.df.groupby(groupby)[var].transform('min')
elif stat == 'max':
self.df[stat_name] = self.df.groupby(groupby)[var].transform('max')
elif stat == 'median':
self.df[stat_name] = self.df.groupby(groupby)[var].transform('median')
elif stat == 'count':
self.df[stat_name] = self.df.groupby(groupby)[var].transform('count')
else:
raise ValueError(f"未知统计量:{stat}")
self.constructed_vars.append(stat_name)
print(f" 创建分组统计变量:{stat_name}")
# 创建偏差变量
if 'mean' in stats:
deviation_name = f'{var}_dev_{group_name}'
mean_name = f'{group_name}_mean_{var}'
self.df[deviation_name] = self.df[var] - self.df[mean_name]
self.constructed_vars.append(deviation_name)
print(f" 创建偏差变量:{deviation_name}")
return self.df
def mincer_equation_vars(self, wage_col: str, education_col: str,
age_col: str, start_age: int = 6) -> pd.DataFrame:
"""
构造完整 Mincer 工资方程变量
模型:
-----
ln(wage) = β₀ + β₁·education + β₂·experience + β₃·experience² + ε
需要变量:
--------
- education: 教育年限
- age: 年龄
- experience = age - education - start_age
- experience_sq = experience²
- log_wage = ln(wage)
参数:
------
wage_col : str
工资变量名
education_col : str
教育年限变量名
age_col : str
年龄变量名
start_age : int, default=6
开始上学年龄
返回:
------
pd.DataFrame
添加 Mincer 变量后的数据
示例:
------
>>> constructor.mincer_equation_vars('wage', 'education', 'age')
"""
required_cols = [wage_col, education_col, age_col]
for col in required_cols:
if col not in self.df.columns:
raise ValueError(f"变量 '{col}' 不在数据中")
# 创建 experience
self.df['experience'] = self.df[age_col] - self.df[education_col] - start_age
self.df['experience'] = self.df['experience'].clip(lower=0) # 确保非负
# 创建 experience²
self.df['experience_sq'] = self.df['experience'] ** 2
# 创建 log(wage)
self.df['log_wage'] = np.log(self.df[wage_col])
mincer_vars = ['experience', 'experience_sq', 'log_wage']
self.constructed_vars.extend(mincer_vars)
print(" 创建 Mincer 方程变量:")
print(f" - experience = {age_col} - {education_col} - {start_age}")
print(f" - experience_sq = experience²")
print(f" - log_wage = ln({wage_col})")
return self.df
def did_variables(self, treat_col: str, post_period: Union[int, str],
time_col: Optional[str] = None) -> pd.DataFrame:
"""
构造 DID 变量
变量:
-----
- treat: 处理组指示变量(已存在)
- post: 政策后指示变量
- treat_post: 交互项(DID 估计量)
参数:
------
treat_col : str
处理组变量名
post_period : int or str
政策实施时间点
time_col : str, optional
时间变量名,默认使用 self.time_col
返回:
------
pd.DataFrame
添加 DID 变量后的数据
示例:
------
>>> constructor.did_variables('treated', post_period=2010)
"""
if treat_col not in self.df.columns:
raise ValueError(f"处理组变量 '{treat_col}' 不在数据中")
time_col = time_col or self.time_col
if not time_col:
raise ValueError("必须指定 time_col")
if time_col not in self.df.columns:
raise ValueError(f"时间变量 '{time_col}' 不在数据中")
# 创建 post 变量
self.df['post'] = (self.df[time_col] >= post_period).astype(int)
# 创建 DID 交互项
self.df['treat_post'] = self.df[treat_col] * self.df['post']
did_vars = ['post', 'treat_post']
self.constructed_vars.extend(did_vars)
print(" 创建 DID 变量:")
print(f" - post = ({time_col} >= {post_period})")
print(f" - treat_post = {treat_col} × post")
print(f"\nDID 回归方程:")
print(f" Y = β₀ + β₁·{treat_col} + β₂·post + β₃·treat_post + ε")
print(f" β₃ = DID 估计量(处理效应)")
return self.df
def decompose_treatment_effect(self, outcome: str, treatment: str,
covariates: Optional[List[str]] = None) -> dict:
"""
处理效应分解
理论:
-----
ATE = E[Y(1) - Y(0)]
= [E[Y|D=1] - E[Y|D=0]]
- {E[Y(0)|D=1] - E[Y(0)|D=0]}
= Observed Difference - Selection Bias
参数:
------
outcome : str
结果变量
treatment : str
处理变量(0/1)
covariates : list of str, optional
控制变量
返回:
------
dict
包含分解结果的字典
示例:
------
>>> results = constructor.decompose_treatment_effect('wage', 'college_degree')
"""
if outcome not in self.df.columns:
raise ValueError(f"结果变量 '{outcome}' 不在数据中")
if treatment not in self.df.columns:
raise ValueError(f"处理变量 '{treatment}' 不在数据中")
# 观测差异
treated = self.df[self.df[treatment] == 1][outcome]
control = self.df[self.df[treatment] == 0][outcome]
observed_diff = treated.mean() - control.mean()
# 回归调整(控制协变量)
if covariates:
X = sm.add_constant(self.df[[treatment] + covariates])
else:
X = sm.add_constant(self.df[[treatment]])
y = self.df[outcome]
model = sm.OLS(y, X).fit(cov_type='HC3')
ate = model.params[treatment]
se = model.bse[treatment]
# 选择偏误
selection_bias = observed_diff - ate
results = {
'observed_difference': observed_diff,
'ate': ate,
'se': se,
'selection_bias': selection_bias,
't_stat': ate / se,
'p_value': model.pvalues[treatment],
'ci_lower': ate - 1.96 * se,
'ci_upper': ate + 1.96 * se
}
print(" 处理效应分解:")
print(f" 观测差异:{observed_diff:.4f}")
print(f" ATE:{ate:.4f} (SE: {se:.4f})")
print(f" 选择偏误:{selection_bias:.4f}")
print(f" 95% CI:[{results['ci_lower']:.4f}, {results['ci_upper']:.4f}]")
print(f" t-stat:{results['t_stat']:.4f}, p-value:{results['p_value']:.4f}")
return results
def get_constructed_vars(self) -> List[str]:
"""返回所有构造的变量名"""
return self.constructed_vars
def summary(self):
"""打印变量构造摘要"""
print("\n" + "="*60)
print(" 变量构造摘要")
print("="*60)
print(f"原始变量数:{len(self.df.columns) - len(self.constructed_vars)}")
print(f"构造变量数:{len(self.constructed_vars)}")
print(f"总变量数:{len(self.df.columns)}")
if self.id_col and self.time_col:
n_id = self.df[self.id_col].nunique()
n_time = self.df[self.time_col].nunique()
print(f"\n面板结构:{n_id} 个体 × {n_time} 期 = {len(self.df)} 观测")
print(f"\n构造的变量:")
for var in self.constructed_vars:
print(f" - {var}")
print("="*60)完整案例:NLSY97 Mincer 工资方程
案例背景
数据:National Longitudinal Survey of Youth 1997 (NLSY97)
研究问题:
- 教育年限对工资的回报率是多少?
- 教育回报率是否因性别而异?
- 工作经验对工资的影响是否呈现倒U型?
- 地区和行业差异如何影响工资?
数据模拟(替代真实 NLSY97)
import pandas as pd
import numpy as np
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns
# 设置随机种子
np.random.seed(2024)
# 模拟 5000 个个体
n = 5000
# 生成基础特征
df = pd.DataFrame({
'person_id': range(1, n+1),
'age': np.random.uniform(25, 55, n),
'education': np.random.choice([12, 14, 16, 18, 20], n, p=[0.3, 0.2, 0.3, 0.15, 0.05]),
'female': np.random.binomial(1, 0.5, n),
'race': np.random.choice(['White', 'Black', 'Hispanic', 'Other'], n, p=[0.6, 0.15, 0.2, 0.05]),
'urban': np.random.binomial(1, 0.7, n),
'married': np.random.binomial(1, 0.55, n),
'region': np.random.choice(['Northeast', 'South', 'Midwest', 'West'], n, p=[0.18, 0.37, 0.21, 0.24]),
'industry': np.random.choice(['Manufacturing', 'Service', 'Tech', 'Finance', 'Other'], n,
p=[0.15, 0.35, 0.2, 0.15, 0.15])
})
# 计算工作经验
df['experience'] = df['age'] - df['education'] - 6
df['experience'] = df['experience'].clip(lower=0)
# 生成工资(基于真实经济学关系)
log_wage = (
9.5 + # 基础对数工资
0.08 * df['education'] + # 教育回报率(8%)
0.05 * df['experience'] + # 经验回报率
-0.0008 * df['experience']**2 + # 经验的边际递减
-0.15 * df['female'] + # 性别工资差距
-0.02 * df['education'] * df['female'] + # 教育回报率的性别差异
0.10 * (df['race'] == 'White') + # 种族差异
-0.08 * (df['race'] == 'Black') +
-0.05 * (df['race'] == 'Hispanic') +
0.05 * df['urban'] + # 城市溢价
0.08 * df['married'] + # 婚姻溢价
0.12 * (df['region'] == 'West') + # 地区差异
0.08 * (df['region'] == 'Northeast') +
-0.05 * (df['region'] == 'South') +
0.15 * (df['industry'] == 'Tech') + # 行业差异
0.12 * (df['industry'] == 'Finance') +
-0.08 * (df['industry'] == 'Service') +
np.random.normal(0, 0.3, n) # 误差项
)
df['wage'] = np.exp(log_wage)
print(" 数据生成完成")
print(f"样本量:{len(df)}")
print(f"\n描述统计:")
print(df[['age', 'education', 'experience', 'wage']].describe())使用 VariableConstructor 构造变量
# 初始化构造器
constructor = VariableConstructor(df)
# 1. 创建 Mincer 方程基础变量
constructor.mincer_equation_vars('wage', 'education', 'age', start_age=6)
# 2. 创建虚拟变量
constructor.create_dummies('race', prefix='race', ref_category='White')
constructor.create_dummies('region', prefix='region', ref_category='Northeast')
constructor.create_dummies('industry', prefix='industry', ref_category='Other')
# 3. 创建交互项
constructor.create_interactions('education', 'female', center=True, name='educ_x_female')
constructor.create_interactions('experience', 'female', center=False, name='exp_x_female')
constructor.create_interactions('education', 'experience', center=True, name='educ_x_exp')
# 4. 创建分组统计变量
constructor.create_group_stats('wage', 'region', stats=['mean', 'std'])
constructor.create_group_stats('wage', 'industry', stats=['mean', 'median'])
# 5. 查看构造的变量
print("\n构造的变量:")
for var in constructor.get_constructed_vars():
print(f" - {var}")
# 获取构造后的数据
df_constructed = constructor.df估计 Mincer 工资方程
# 模型 1:基础 Mincer 方程
X1 = sm.add_constant(df_constructed[['education', 'experience', 'experience_sq']])
y = df_constructed['log_wage']
model1 = sm.OLS(y, X1).fit(cov_type='HC3')
print("\n" + "="*80)
print("模型 1:基础 Mincer 工资方程")
print("="*80)
print(model1.summary().tables[1])
# 模型 2:加入人口统计学特征
X2 = sm.add_constant(df_constructed[[
'education', 'experience', 'experience_sq',
'female', 'race_Black', 'race_Hispanic', 'race_Other',
'urban', 'married'
]])
model2 = sm.OLS(y, X2).fit(cov_type='HC3')
print("\n" + "="*80)
print("模型 2:加入人口统计学特征")
print("="*80)
print(model2.summary().tables[1])
# 模型 3:加入交互项
X3 = sm.add_constant(df_constructed[[
'education', 'experience', 'experience_sq',
'female', 'race_Black', 'race_Hispanic', 'race_Other',
'urban', 'married',
'educ_x_female', 'exp_x_female'
]])
model3 = sm.OLS(y, X3).fit(cov_type='HC3')
print("\n" + "="*80)
print("模型 3:加入交互项(性别异质性)")
print("="*80)
print(model3.summary().tables[1])
# 模型 4:完整模型(加入地区和行业)
X4 = sm.add_constant(df_constructed[[
'education', 'experience', 'experience_sq',
'female', 'race_Black', 'race_Hispanic', 'race_Other',
'urban', 'married',
'educ_x_female', 'exp_x_female',
'region_South', 'region_Midwest', 'region_West',
'industry_Manufacturing', 'industry_Service', 'industry_Tech', 'industry_Finance'
]])
model4 = sm.OLS(y, X4).fit(cov_type='HC3')
print("\n" + "="*80)
print("模型 4:完整模型(地区 + 行业)")
print("="*80)
print(model4.summary().tables[1])结果解释
print("\n" + "="*80)
print(" 结果解释")
print("="*80)
# 教育回报率
beta_educ = model4.params['education']
print(f"\n1. 教育回报率")
print(f" - 每增加 1 年教育,工资增长 {beta_educ*100:.2f}%")
print(f" - 95% 置信区间:[{(beta_educ - 1.96*model4.bse['education'])*100:.2f}%, "
f"{(beta_educ + 1.96*model4.bse['education'])*100:.2f}%]")
# 性别工资差距
beta_female = model4.params['female']
print(f"\n2. 性别工资差距")
print(f" - 女性工资比男性低 {abs(beta_female)*100:.2f}%")
print(f" - p-value: {model4.pvalues['female']:.4f}")
# 教育回报率的性别差异
beta_educ_female = model4.params['educ_x_female']
print(f"\n3. 教育回报率的性别差异")
print(f" - 男性教育回报率:{beta_educ*100:.2f}%")
print(f" - 女性教育回报率:{(beta_educ + beta_educ_female)*100:.2f}%")
print(f" - 差异:{beta_educ_female*100:.2f}% (女性较低)")
# 经验的倒U型效应
beta_exp = model4.params['experience']
beta_exp_sq = model4.params['experience_sq']
optimal_exp = -beta_exp / (2 * beta_exp_sq)
print(f"\n4. 工作经验的倒U型效应")
print(f" - 经验系数:{beta_exp:.6f}")
print(f" - 经验平方系数:{beta_exp_sq:.6f}")
print(f" - 工资最大化的经验年数:{optimal_exp:.1f} 年")
# 地区差异
print(f"\n5. 地区工资差异(相对于 Northeast)")
for region in ['South', 'Midwest', 'West']:
var = f'region_{region}'
if var in model4.params.index:
print(f" - {region}: {model4.params[var]*100:.2f}% ({model4.pvalues[var]:.4f})")
# 行业差异
print(f"\n6. 行业工资差异(相对于 Other)")
for industry in ['Manufacturing', 'Service', 'Tech', 'Finance']:
var = f'industry_{industry}'
if var in model4.params.index:
print(f" - {industry}: {model4.params[var]*100:.2f}% ({model4.pvalues[var]:.4f})")
# 模型拟合优度
print(f"\n7. 模型拟合优度")
print(f" - R²: {model4.rsquared:.4f}")
print(f" - Adjusted R²: {model4.rsquared_adj:.4f}")
print(f" - F-statistic: {model4.fvalue:.2f} (p-value: {model4.f_pvalue:.4e})")可视化分析
# 1. 教育-工资关系(分性别)
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
for gender, label in [(0, '男性'), (1, '女性')]:
subset = df_constructed[df_constructed['female'] == gender]
avg_wage = subset.groupby('education')['wage'].mean()
plt.plot(avg_wage.index, avg_wage.values, marker='o', label=label, linewidth=2)
plt.xlabel('教育年限', fontsize=12)
plt.ylabel('平均工资(美元)', fontsize=12)
plt.title('教育-工资关系(分性别)', fontsize=14)
plt.legend(fontsize=12)
plt.grid(alpha=0.3)
# 2. 经验-工资关系(倒U型)
plt.subplot(1, 2, 2)
exp_range = np.linspace(0, 40, 100)
log_wage_pred = (model4.params['const'] +
model4.params['education'] * df_constructed['education'].mean() +
model4.params['experience'] * exp_range +
model4.params['experience_sq'] * exp_range**2)
wage_pred = np.exp(log_wage_pred)
plt.plot(exp_range, wage_pred, linewidth=2, color='green')
plt.axvline(optimal_exp, color='red', linestyle='--', label=f'最优经验 = {optimal_exp:.1f} 年')
plt.xlabel('工作经验(年)', fontsize=12)
plt.ylabel('预测工资(美元)', fontsize=12)
plt.title('经验-工资关系(倒U型)', fontsize=14)
plt.legend(fontsize=12)
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()
# 3. 地区和行业工资分布
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 地区
df_constructed.boxplot(column='wage', by='region', ax=axes[0])
axes[0].set_title('各地区工资分布', fontsize=14)
axes[0].set_xlabel('地区', fontsize=12)
axes[0].set_ylabel('工资(美元)', fontsize=12)
plt.sca(axes[0])
plt.xticks(rotation=45)
# 行业
df_constructed.boxplot(column='wage', by='industry', ax=axes[1])
axes[1].set_title('各行业工资分布', fontsize=14)
axes[1].set_xlabel('行业', fontsize=12)
axes[1].set_ylabel('工资(美元)', fontsize=12)
plt.sca(axes[1])
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# 4. 残差诊断
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 残差 vs 拟合值
axes[0, 0].scatter(model4.fittedvalues, model4.resid, alpha=0.5)
axes[0, 0].axhline(y=0, color='r', linestyle='--')
axes[0, 0].set_xlabel('拟合值', fontsize=12)
axes[0, 0].set_ylabel('残差', fontsize=12)
axes[0, 0].set_title('残差 vs 拟合值', fontsize=14)
axes[0, 0].grid(alpha=0.3)
# QQ 图
from scipy import stats as sp_stats
sp_stats.probplot(model4.resid, dist="norm", plot=axes[0, 1])
axes[0, 1].set_title('QQ 图(正态性检验)', fontsize=14)
axes[0, 1].grid(alpha=0.3)
# 残差直方图
axes[1, 0].hist(model4.resid, bins=50, edgecolor='black', alpha=0.7)
axes[1, 0].set_xlabel('残差', fontsize=12)
axes[1, 0].set_ylabel('频数', fontsize=12)
axes[1, 0].set_title('残差分布', fontsize=14)
axes[1, 0].grid(alpha=0.3)
# 标准化残差
from statsmodels.stats.outliers_influence import OLSInfluence
influence = OLSInfluence(model4)
standardized_resid = influence.resid_studentized_internal
axes[1, 1].scatter(range(len(standardized_resid)), standardized_resid, alpha=0.5)
axes[1, 1].axhline(y=3, color='r', linestyle='--', label='±3σ')
axes[1, 1].axhline(y=-3, color='r', linestyle='--')
axes[1, 1].set_xlabel('观测序号', fontsize=12)
axes[1, 1].set_ylabel('标准化残差', fontsize=12)
axes[1, 1].set_title('标准化残差(异常值检测)', fontsize=14)
axes[1, 1].legend(fontsize=10)
axes[1, 1].grid(alpha=0.3)
plt.tight_layout()
plt.show()异质性分析:教育回报率的地区差异
# 分地区估计教育回报率
print("\n" + "="*80)
print(" 异质性分析:教育回报率的地区差异")
print("="*80)
results_by_region = {}
for region in df_constructed['region'].unique():
subset = df_constructed[df_constructed['region'] == region]
X_subset = sm.add_constant(subset[['education', 'experience', 'experience_sq', 'female']])
y_subset = subset['log_wage']
model_subset = sm.OLS(y_subset, X_subset).fit(cov_type='HC3')
educ_return = model_subset.params['education']
educ_se = model_subset.bse['education']
results_by_region[region] = {
'return': educ_return,
'se': educ_se,
'ci_lower': educ_return - 1.96 * educ_se,
'ci_upper': educ_return + 1.96 * educ_se
}
print(f"\n{region}:")
print(f" 教育回报率: {educ_return*100:.2f}% (SE: {educ_se*100:.2f}%)")
print(f" 95% CI: [{results_by_region[region]['ci_lower']*100:.2f}%, "
f"{results_by_region[region]['ci_upper']*100:.2f}%]")
# 可视化地区差异
fig, ax = plt.subplots(figsize=(10, 6))
regions = list(results_by_region.keys())
returns = [results_by_region[r]['return'] * 100 for r in regions]
errors = [1.96 * results_by_region[r]['se'] * 100 for r in regions]
ax.barh(regions, returns, xerr=errors, capsize=5, color='skyblue', edgecolor='black')
ax.set_xlabel('教育回报率(%)', fontsize=12)
ax.set_ylabel('地区', fontsize=12)
ax.set_title('教育回报率的地区差异(95% CI)', fontsize=14)
ax.grid(alpha=0.3, axis='x')
plt.tight_layout()
plt.show()其他常用变量构造
1. 多项式项(非线性关系)
# 研究非线性关系:收入 = f(经验, 经验²)
df['experience_sq'] = df['experience'] ** 2
df['experience_cube'] = df['experience'] ** 3
# 识别最优点
# ∂wage/∂experience = β₁ + 2β₂·experience = 0
# experience* = -β₁ / (2β₂)2. 比率变量
# 计算比率
df['income_per_capita'] = df['household_income'] / df['household_size']
df['debt_to_income'] = df['debt'] / df['income']
df['saving_rate'] = df['saving'] / df['income']
# 对数比率
df['log_debt_to_income'] = np.log(df['debt'] / df['income'])3. 对数差分(增长率)
# 对数差分 ≈ 增长率
df['log_income'] = np.log(df['income'])
df['income_growth'] = df.groupby('person_id')['log_income'].diff()
# 解释:income_growth = 0.05 → 收入增长约 5%
# 验证:
# ln(Y_t) - ln(Y_{t-1}) = ln(Y_t / Y_{t-1}) ≈ (Y_t - Y_{t-1}) / Y_{t-1}4. 标准化变量(Z-score)
from scipy import stats
# Z-score 标准化(总体)
df['income_std'] = stats.zscore(df['income'])
# 组内标准化(Within-group Z-score)
df['income_std_within'] = df.groupby('region')['income'].transform(
lambda x: stats.zscore(x)
)
# 解释:income_std = 2 → 该个体收入比均值高 2 个标准差练习题
⭐⭐ 练习 1:创建教育水平虚拟变量
题目:给定教育年限变量,创建教育水平虚拟变量(高中、本科、研究生),参照组为高中。
# 数据
df = pd.DataFrame({
'id': range(1, 11),
'education_years': [12, 16, 18, 12, 14, 16, 20, 12, 16, 14]
})
# 任务:创建虚拟变量
# 高中:12 年
# 本科:14-16 年
# 研究生:18-20 年提示:使用 pd.cut() 分组,再用 pd.get_dummies()
⭐⭐⭐ 练习 2:构造交互项并计算边际效应
题目:估计模型
- 创建交互项
- 运行回归
- 计算男性和女性的教育回报率
- 检验回报率差异是否显著(Wald 检验)
⭐⭐⭐⭐ 练习 3:动态面板模型
题目:使用 Arellano-Bond 方法估计动态面板模型
- 创建滞后项
- 一阶差分消除
- 使用 作为工具变量
- GMM 估计
⭐⭐⭐⭐⭐ 练习 4:异质性处理效应分解
题目:分析大学学位对工资的处理效应,并分解为:
- ATE(Average Treatment Effect):平均处理效应
- ATT(Average Treatment on the Treated):处理组平均处理效应
- ATE vs ATT 差异:选择偏误的证据
使用倾向得分匹配(Propensity Score Matching)进行敏感性分析。
小结
核心函数总结
| 任务 | 方法 | 数学表示 | 应用 |
|---|---|---|---|
| 虚拟变量 | pd.get_dummies() | 分类变量数值化 | |
| 交互项 | df['A'] * df['B'] | 效应异质性 | |
| 滞后项 | df.shift() | 动态模型 | |
| 分组统计 | groupby().transform() | 控制群体效应 | |
| 多项式 | df['x'] ** 2 | 非线性关系 |
最佳实践
- 理解参照组:虚拟变量回归中明确参照组
- 避免多重共线性:使用
drop_first=True或指定参照组 - 中心化交互项:提高解释性,减少共线性
- 面板数据排序:创建滞后项前先按
id和time排序 - 变量命名规范:如
income_lag1,edu_x_female,region_mean_wage - 文档化:记录每个变量的构造逻辑和理论依据
- 验证:检查构造变量的分布和缺失值
理论要点
- 虚拟变量陷阱: 导致完全共线性
- 交互项解释: 表示 对 的边际效应如何随 变化
- 滞后算子性质:,
- Within Transformation: 消除组固定效应
- Mincer 方程:
下一步
下一节:数据转换(标准化、对数转换、Box-Cox 变换、分箱)
参考文献
Angrist, J. D., & Pischke, J. S. (2009). Mostly Harmless Econometrics: An Empiricist's Companion. Princeton University Press.
- 第 3 章:Making Regression Make Sense
- 交互项与异质性效应
Wooldridge, J. M. (2010). Econometric Analysis of Cross Section and Panel Data (2nd ed.). MIT Press.
- 第 10 章:Basic Linear Unobserved Effects Panel Data Models
- 动态面板模型
Mincer, J. (1974). Schooling, Experience, and Earnings. NBER.
- Mincer 工资方程的原始文献
Arellano, M., & Bond, S. (1991). Some Tests of Specification for Panel Data: Monte Carlo Evidence and an Application to Employment Equations. The Review of Economic Studies, 58(2), 277-297.
- 动态面板 GMM 估计
Cameron, A. C., & Trivedi, P. K. (2005). Microeconometrics: Methods and Applications. Cambridge University Press.
- 第 21 章:Panel Data Models
- 第 25 章:Treatment Evaluation
版本:StatsPai Module 3.4 - Nobel Prize Standards 最后更新:2024-01-15 作者:StatsPai Team 许可:CC BY-NC-SA 4.0