Skip to content

3.4 变量构造

"Feature engineering is the key to machine learning success.""特征工程是机器学习成功的关键。"— Pedro Domingos, Professor at University of Washington (华盛顿大学教授)

从原始变量到分析就绪


本节目标

  • 掌握虚拟变量(Dummy Variables)的创建与理论
  • 深入理解交互项(Interaction Terms)与边际效应
  • 学习动态面板中的滞后算子(Lag Operators)
  • 掌握 Mincer 工资方程的完整变量构造
  • 理解 DID 变量与处理效应分解
  • 构建生产级 VariableConstructor 类

理论基础

变量构造在因果推断中的地位

Angrist & Pischke (2009) 强调:

"Good econometrics is about good research design, and good research design requires careful variable construction."

三大作用

  1. 控制混淆因素(Confounding Control)

    • :时间固定效应(虚拟变量)
    • :个体固定效应(组均值去中心化)
  2. 捕捉异质性效应(Heterogeneous Effects)

    • 交互项 捕捉处理效应如何随 变化
  3. 构建工具变量(Instrumental Variables)


虚拟变量(Dummy Variables)

理论:分类变量的数值化

数学表示

设分类变量 个类别 ,则创建 个虚拟变量:

回归模型

系数解释

  • :参照组()的期望值
  • :类别 相对于参照组的差异

多重共线性陷阱(Dummy Variable Trap)

完全共线性

如果包含所有 个虚拟变量,则:

这与截距项(恒为 1)完全共线,导致设计矩阵 不可逆:

解决方案:删除一个类别作为参照组(Reference Category)

方法 1:pd.get_dummies()

python
import pandas as pd
import numpy as np

# 示例数据
df = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'education': ['高中', '本科', '硕士', '高中', '博士']
})

# 创建虚拟变量
dummies = pd.get_dummies(df['education'], prefix='edu', drop_first=True)
df = pd.concat([df, dummies], axis=1)

print(df)
#    id education  edu_博士  edu_本科  edu_硕士
# 0   1      高中       0       0       0
# 1   2      本科       0       1       0
# 2   3      硕士       0       0       1
# 3   4      高中       0       0       0
# 4   5      博士       1       0       0

参数说明

  • prefix='edu':变量名前缀
  • drop_first=True:删除第一个类别(按字母顺序)作为参照组
  • 参照组:高中(所有虚拟变量都为 0)

方法 2:手动构造(更灵活)

python
# 创建单个虚拟变量
df['college'] = (df['education'] == '本科').astype(int)
df['female'] = (df['gender'] == '女').astype(int)
df['high_income'] = (df['income'] > df['income'].median()).astype(int)

# 多个条件(交叉分类)
df['young_female'] = ((df['age'] < 30) & (df['gender'] == '女')).astype(int)
df['urban_college'] = ((df['urban'] == 1) & (df['education'] == '本科')).astype(int)

# 指定参照组
def create_education_dummies(education_series, ref='高中'):
    """创建教育虚拟变量,指定参照组"""
    categories = education_series.unique()
    dummies = {}
    for cat in categories:
        if cat != ref:
            dummies[f'edu_{cat}'] = (education_series == cat).astype(int)
    return pd.DataFrame(dummies)

edu_dummies = create_education_dummies(df['education'], ref='高中')
df = pd.concat([df, edu_dummies], axis=1)

回归与解释

python
import statsmodels.api as sm

# 模拟工资数据
np.random.seed(42)
n = 1000
df = pd.DataFrame({
    'education': np.random.choice(['高中', '本科', '硕士'], n),
    'experience': np.random.uniform(0, 30, n),
    'age': np.random.uniform(22, 65, n)
})

# 生成工资(真实模型)
df['wage'] = (30000 +
              5000 * (df['education'] == '本科') +
              10000 * (df['education'] == '硕士') +
              1000 * df['experience'] +
              np.random.normal(0, 5000, n))

# 创建虚拟变量
edu_dummies = pd.get_dummies(df['education'], prefix='edu', drop_first=True)
df = pd.concat([df, edu_dummies], axis=1)

# OLS 回归
X = sm.add_constant(df[['experience', 'edu_本科', 'edu_硕士']])
y = df['wage']
model = sm.OLS(y, X).fit(cov_type='HC3')

print(model.summary().tables[1])

系数解释

  • 截距(30127):高中学历、零经验者的预期工资
  • 本科系数(5084):本科学历相比高中的工资溢价(Premium)
  • 硕士系数(10156):硕士学历相比高中的工资溢价

显著性检验

假设检验 (本科无溢价)

,则拒绝 ,本科溢价显著。


交互项(Interaction Terms)

理论:边际效应的异质性

研究问题:教育对收入的影响是否因性别而异?

无交互模型(平行斜率):

边际效应

问题:假设 的影响不随 变化

交互模型(非平行斜率):

边际效应

解释 的影响取决于 的取值

数学推导:Angrist & Pischke (2009) 框架

基本模型(教育 × 性别交互):

分组回归

  • 男性):

    教育回报率:

  • 女性):

    教育回报率:

交互项系数解释

,则女性教育回报率低于男性(性别歧视证据)。

创建交互项

python
# 1. 连续变量 × 虚拟变量
df['educ_x_female'] = df['education'] * df['female']

# 2. 连续变量 × 连续变量
df['educ_x_experience'] = df['education'] * df['experience']

# 3. 虚拟变量 × 虚拟变量
df['female_x_married'] = df['female'] * df['married']

# 4. 多重交互(三阶)
df['educ_x_exp_x_female'] = df['education'] * df['experience'] * df['female']

中心化(Centering):减少多重共线性

问题 高度相关

都为正值时,相关系数接近 1。

解决方案:中心化

中心化后的模型

优势

  • 解释:当 处于均值时, 的影响
  • 解释:当 都处于均值时, 的期望值
python
# 中心化
df['educ_centered'] = df['education'] - df['education'].mean()
df['exp_centered'] = df['experience'] - df['experience'].mean()

# 创建中心化交互项
df['educ_x_exp_centered'] = df['educ_centered'] * df['exp_centered']

# 回归
X = sm.add_constant(df[['educ_centered', 'exp_centered', 'educ_x_exp_centered']])
y = df['log_wage']
model = sm.OLS(y, X).fit()

交互项的统计检验

1. Wald 检验

检验 (无交互效应)

python
# Wald 检验
from scipy import stats

beta3 = model.params['educ_x_exp_centered']
se_beta3 = model.bse['educ_x_exp_centered']
wald_stat = (beta3 / se_beta3) ** 2
p_value = 1 - stats.chi2.cdf(wald_stat, df=1)

print(f"Wald 统计量: {wald_stat:.4f}, p-value: {p_value:.4f}")

2. F 检验(多个交互项)

检验

其中:

  • :受限模型残差平方和(无交互项)
  • :非受限模型残差平方和(有交互项)
  • :交互项个数
python
# F 检验
from statsmodels.iolib.table import SimpleTable

# 受限模型
X_restricted = sm.add_constant(df[['educ_centered', 'exp_centered']])
model_r = sm.OLS(y, X_restricted).fit()

# 非受限模型
X_unrestricted = sm.add_constant(df[['educ_centered', 'exp_centered', 'educ_x_exp_centered']])
model_u = sm.OLS(y, X_unrestricted).fit()

# F 统计量
rss_r = model_r.ssr
rss_u = model_u.ssr
q = 1
n = len(df)
k = X_unrestricted.shape[1] - 1

f_stat = ((rss_r - rss_u) / q) / (rss_u / (n - k - 1))
p_value = 1 - stats.f.cdf(f_stat, q, n - k - 1)

print(f"F 统计量: {f_stat:.4f}, p-value: {p_value:.4f}")

可视化交互效应

python
import matplotlib.pyplot as plt

# 模拟数据
np.random.seed(42)
educ = np.linspace(8, 20, 100)

# 男性和女性的预测工资
wage_male = 8 + 0.08 * educ
wage_female = 8.2 + 0.06 * educ

# 绘图
plt.figure(figsize=(10, 6))
plt.plot(educ, wage_male, label='男性', linewidth=2, color='blue')
plt.plot(educ, wage_female, label='女性', linewidth=2, color='red')
plt.xlabel('教育年限', fontsize=12)
plt.ylabel('对数工资', fontsize=12)
plt.title('教育-收入关系的性别差异(交互效应)', fontsize=14)
plt.legend(fontsize=12)
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

滞后算子(Lag Operators)

理论:动态面板数据模型

滞后算子定义

性质

  1. 线性
  2. 乘法
  3. 多项式

差分算子

应用

  • 单位根检验(ADF 检验)
  • 误差修正模型

动态面板模型

Arellano-Bond 模型

问题 相关(内生性)

解决方案:一阶差分 + GMM

使用 作为工具变量。

创建滞后项

python
# 示例:面板数据
df = pd.DataFrame({
    'person_id': [1, 1, 1, 1, 2, 2, 2, 2],
    'year': [2017, 2018, 2019, 2020, 2017, 2018, 2019, 2020],
    'income': [50000, 52000, 55000, 58000, 60000, 62000, 65000, 68000]
})

# 按个体和时间排序
df = df.sort_values(['person_id', 'year'])

# 创建 1 期滞后
df['income_lag1'] = df.groupby('person_id')['income'].shift(1)

# 创建 2 期滞后
df['income_lag2'] = df.groupby('person_id')['income'].shift(2)

# 创建差分
df['income_diff'] = df.groupby('person_id')['income'].diff()

# 创建对数差分(增长率)
df['log_income'] = np.log(df['income'])
df['income_growth'] = df.groupby('person_id')['log_income'].diff()

print(df)
#    person_id  year  income  income_lag1  income_lag2  income_diff  log_income  income_growth
# 0          1  2017   50000          NaN          NaN          NaN   10.819778            NaN
# 1          1  2018   52000      50000.0          NaN       2000.0   10.859073       0.039295
# 2          1  2019   55000      52000.0      50000.0       3000.0   10.914785       0.055713
# 3          1  2020   58000      55000.0      52000.0       3000.0   10.968557       0.053771
# 4          2  2017   60000          NaN          NaN          NaN   11.002100            NaN
# 5          2  2018   62000      60000.0          NaN       2000.0   11.034699       0.032599
# 6          2  2019   65000      62000.0      60000.0       3000.0   11.082414       0.047715
# 7          2  2020   68000      65000.0      62000.0       3000.0   11.127384       0.044970

前导项(Lead Variables)

应用:事件研究(Event Study)

检验政策实施前是否有预期效应(Anticipation Effects)

其中 表示事件发生前 期()或后 期()。

python
# 创建 1 期前导(未来值)
df['income_lead1'] = df.groupby('person_id')['income'].shift(-1)
df['income_lead2'] = df.groupby('person_id')['income'].shift(-2)

# 事件研究:政策实施前后的效应
df['treat_post'] = (df['year'] >= 2019) & (df['treated'] == 1)
df['treat_pre1'] = (df['year'] == 2018) & (df['treated'] == 1)
df['treat_pre2'] = (df['year'] == 2017) & (df['treated'] == 1)

分组统计变量(Group Statistics)

理论:控制群体固定效应

Within-Group Variation

其中 是组固定效应(Group Fixed Effects)。

组均值去中心化(Within Transformation)

去中心化后的模型

被消除,避免遗漏变量偏误(Omitted Variable Bias)。

方法 1:groupby + transform

python
# 计算各地区的平均收入
df['region_mean_income'] = df.groupby('region')['income'].transform('mean')

# 计算个体与地区均值的差异(去中心化)
df['income_deviation'] = df['income'] - df['region_mean_income']

# 其他统计量
df['region_median_income'] = df.groupby('region')['income'].transform('median')
df['region_std_income'] = df.groupby('region')['income'].transform('std')
df['region_count'] = df.groupby('region')['income'].transform('count')
df['region_min_income'] = df.groupby('region')['income'].transform('min')
df['region_max_income'] = df.groupby('region')['income'].transform('max')

# 百分位数
df['region_p25_income'] = df.groupby('region')['income'].transform(lambda x: x.quantile(0.25))
df['region_p75_income'] = df.groupby('region')['income'].transform(lambda x: x.quantile(0.75))

方法 2:排名与百分位

python
# 个体在地区内的收入排名(百分位)
df['income_pct_rank'] = df.groupby('region')['income'].transform(
    lambda x: x.rank(pct=True)
)

# 解释:0.75 表示该个体收入高于地区内 75% 的人

# 分位数组
df['income_quartile'] = df.groupby('region')['income'].transform(
    lambda x: pd.qcut(x, q=4, labels=['Q1', 'Q2', 'Q3', 'Q4'])
)

方法 3:滚动窗口统计

python
# 时间序列:过去 3 年的平均收入
df = df.sort_values(['person_id', 'year'])
df['income_rolling_mean_3y'] = df.groupby('person_id')['income'].transform(
    lambda x: x.rolling(window=3, min_periods=1).mean()
)

# 过去 3 年的标准差(收入波动性)
df['income_rolling_std_3y'] = df.groupby('person_id')['income'].transform(
    lambda x: x.rolling(window=3, min_periods=1).std()
)

# 过去 3 年的增长率
df['income_growth_3y'] = df.groupby('person_id')['income'].pct_change(periods=3)

️ 生产级 VariableConstructor 类

python
import pandas as pd
import numpy as np
from typing import List, Union, Tuple, Optional
import statsmodels.api as sm
from sklearn.preprocessing import StandardScaler

class VariableConstructor:
    """
    生产级变量构造器 - Nobel Prize Standards

    功能:
    -----
    1. 虚拟变量创建 (Dummy Variables)
    2. 交互项构造 (Interaction Terms)
    3. 滞后算子 (Lag Operators)
    4. 分组统计变量 (Group Statistics)
    5. Mincer 方程变量集
    6. DID 变量构造
    7. 处理效应分解

    作者:StatsPai Team
    参考:Angrist & Pischke (2009), Wooldridge (2010)
    """

    def __init__(self, df: pd.DataFrame, id_col: Optional[str] = None,
                 time_col: Optional[str] = None):
        """
        初始化变量构造器

        参数:
        ------
        df : pd.DataFrame
            原始数据
        id_col : str, optional
            面板数据的个体标识列
        time_col : str, optional
            面板数据的时间标识列
        """
        self.df = df.copy()
        self.id_col = id_col
        self.time_col = time_col
        self.constructed_vars = []

        # 验证面板数据结构
        if id_col and time_col:
            self._validate_panel_structure()

    def _validate_panel_structure(self):
        """验证面板数据结构"""
        if self.id_col not in self.df.columns:
            raise ValueError(f"id_col '{self.id_col}' 不在数据中")
        if self.time_col not in self.df.columns:
            raise ValueError(f"time_col '{self.time_col}' 不在数据中")

        # 检查是否平衡面板
        counts = self.df.groupby(self.id_col)[self.time_col].count()
        if counts.nunique() == 1:
            print(f" 平衡面板:每个个体有 {counts.iloc[0]} 期观测")
        else:
            print(f"️  非平衡面板:观测期数范围 {counts.min()} - {counts.max()}")

    def create_dummies(self, var: str, prefix: Optional[str] = None,
                      drop_first: bool = True, ref_category: Optional[str] = None) -> pd.DataFrame:
        """
        创建虚拟变量

        参数:
        ------
        var : str
            分类变量名
        prefix : str, optional
            变量名前缀,默认使用 var
        drop_first : bool, default=True
            是否删除第一个类别(避免多重共线性)
        ref_category : str, optional
            指定参照组(优先级高于 drop_first)

        返回:
        ------
        pd.DataFrame
            添加虚拟变量后的数据

        示例:
        ------
        >>> constructor.create_dummies('education', prefix='edu', ref_category='高中')
        """
        if var not in self.df.columns:
            raise ValueError(f"变量 '{var}' 不在数据中")

        prefix = prefix or var

        if ref_category:
            # 指定参照组
            categories = self.df[var].unique()
            for cat in categories:
                if cat != ref_category:
                    dummy_name = f'{prefix}_{cat}'
                    self.df[dummy_name] = (self.df[var] == cat).astype(int)
                    self.constructed_vars.append(dummy_name)
            print(f" 创建虚拟变量:参照组 = {ref_category}")
        else:
            # 使用 pd.get_dummies
            dummies = pd.get_dummies(self.df[var], prefix=prefix, drop_first=drop_first)
            for col in dummies.columns:
                self.df[col] = dummies[col]
                self.constructed_vars.append(col)
            print(f" 创建虚拟变量:共 {len(dummies.columns)} 个")

        return self.df

    def create_interactions(self, var1: str, var2: str, center: bool = False,
                          name: Optional[str] = None) -> pd.DataFrame:
        """
        创建交互项

        理论:
        -----
        对于模型:y = β₀ + β₁x₁ + β₂x₂ + β₃(x₁×x₂) + ε

        边际效应:
        ∂y/∂x₁ = β₁ + β₃x₂

        参数:
        ------
        var1 : str
            第一个变量
        var2 : str
            第二个变量
        center : bool, default=False
            是否中心化(减少多重共线性)
        name : str, optional
            交互项名称,默认为 'var1_x_var2'

        返回:
        ------
        pd.DataFrame
            添加交互项后的数据

        示例:
        ------
        >>> constructor.create_interactions('education', 'female', center=True)
        """
        if var1 not in self.df.columns or var2 not in self.df.columns:
            raise ValueError(f"变量 '{var1}' 或 '{var2}' 不在数据中")

        name = name or f'{var1}_x_{var2}'

        if center:
            # 中心化
            var1_centered = self.df[var1] - self.df[var1].mean()
            var2_centered = self.df[var2] - self.df[var2].mean()
            self.df[name] = var1_centered * var2_centered
            print(f" 创建中心化交互项:{name}")
        else:
            self.df[name] = self.df[var1] * self.df[var2]
            print(f" 创建交互项:{name}")

        self.constructed_vars.append(name)
        return self.df

    def create_lags(self, var: str, lags: Union[int, List[int]] = 1,
                   fill_method: str = 'drop') -> pd.DataFrame:
        """
        创建滞后项(面板数据)

        要求:必须设置 id_col 和 time_col

        数学:
        -----
        L(Y_it, k) = Y_{i,t-k}

        参数:
        ------
        var : str
            变量名
        lags : int or list of int
            滞后期数,如 1 或 [1, 2, 3]
        fill_method : {'drop', 'zero', 'ffill'}
            缺失值处理方式

        返回:
        ------
        pd.DataFrame
            添加滞后项后的数据

        示例:
        ------
        >>> constructor.create_lags('income', lags=[1, 2])
        """
        if not self.id_col or not self.time_col:
            raise ValueError("创建滞后项需要设置 id_col 和 time_col")

        if var not in self.df.columns:
            raise ValueError(f"变量 '{var}' 不在数据中")

        # 确保排序
        self.df = self.df.sort_values([self.id_col, self.time_col])

        # 转换为列表
        if isinstance(lags, int):
            lags = [lags]

        for lag in lags:
            lag_name = f'{var}_lag{lag}'
            self.df[lag_name] = self.df.groupby(self.id_col)[var].shift(lag)

            # 处理缺失值
            if fill_method == 'drop':
                pass  # 保留 NaN,后续分析时删除
            elif fill_method == 'zero':
                self.df[lag_name].fillna(0, inplace=True)
            elif fill_method == 'ffill':
                self.df[lag_name] = self.df.groupby(self.id_col)[lag_name].fillna(method='ffill')

            self.constructed_vars.append(lag_name)
            print(f" 创建滞后项:{lag_name}")

        return self.df

    def create_leads(self, var: str, leads: Union[int, List[int]] = 1) -> pd.DataFrame:
        """
        创建前导项(面板数据)

        应用:事件研究(Event Study)

        参数:
        ------
        var : str
            变量名
        leads : int or list of int
            前导期数

        返回:
        ------
        pd.DataFrame
            添加前导项后的数据
        """
        if not self.id_col or not self.time_col:
            raise ValueError("创建前导项需要设置 id_col 和 time_col")

        if var not in self.df.columns:
            raise ValueError(f"变量 '{var}' 不在数据中")

        self.df = self.df.sort_values([self.id_col, self.time_col])

        if isinstance(leads, int):
            leads = [leads]

        for lead in leads:
            lead_name = f'{var}_lead{lead}'
            self.df[lead_name] = self.df.groupby(self.id_col)[var].shift(-lead)
            self.constructed_vars.append(lead_name)
            print(f" 创建前导项:{lead_name}")

        return self.df

    def create_group_stats(self, var: str, groupby: Union[str, List[str]],
                          stats: List[str] = ['mean', 'std']) -> pd.DataFrame:
        """
        创建分组统计变量

        示例:
        -----
        - 各省平均工资
        - 个体工资与省均值的偏差

        参数:
        ------
        var : str
            变量名
        groupby : str or list of str
            分组变量
        stats : list of str
            统计量,可选:'mean', 'std', 'min', 'max', 'median', 'count'

        返回:
        ------
        pd.DataFrame
            添加分组统计变量后的数据

        示例:
        ------
        >>> constructor.create_group_stats('wage', 'region', stats=['mean', 'std'])
        """
        if var not in self.df.columns:
            raise ValueError(f"变量 '{var}' 不在数据中")

        if isinstance(groupby, str):
            groupby = [groupby]

        for gb in groupby:
            if gb not in self.df.columns:
                raise ValueError(f"分组变量 '{gb}' 不在数据中")

        group_name = '_'.join(groupby)

        for stat in stats:
            stat_name = f'{group_name}_{stat}_{var}'

            if stat == 'mean':
                self.df[stat_name] = self.df.groupby(groupby)[var].transform('mean')
            elif stat == 'std':
                self.df[stat_name] = self.df.groupby(groupby)[var].transform('std')
            elif stat == 'min':
                self.df[stat_name] = self.df.groupby(groupby)[var].transform('min')
            elif stat == 'max':
                self.df[stat_name] = self.df.groupby(groupby)[var].transform('max')
            elif stat == 'median':
                self.df[stat_name] = self.df.groupby(groupby)[var].transform('median')
            elif stat == 'count':
                self.df[stat_name] = self.df.groupby(groupby)[var].transform('count')
            else:
                raise ValueError(f"未知统计量:{stat}")

            self.constructed_vars.append(stat_name)
            print(f" 创建分组统计变量:{stat_name}")

        # 创建偏差变量
        if 'mean' in stats:
            deviation_name = f'{var}_dev_{group_name}'
            mean_name = f'{group_name}_mean_{var}'
            self.df[deviation_name] = self.df[var] - self.df[mean_name]
            self.constructed_vars.append(deviation_name)
            print(f" 创建偏差变量:{deviation_name}")

        return self.df

    def mincer_equation_vars(self, wage_col: str, education_col: str,
                            age_col: str, start_age: int = 6) -> pd.DataFrame:
        """
        构造完整 Mincer 工资方程变量

        模型:
        -----
        ln(wage) = β₀ + β₁·education + β₂·experience + β₃·experience² + ε

        需要变量:
        --------
        - education: 教育年限
        - age: 年龄
        - experience = age - education - start_age
        - experience_sq = experience²
        - log_wage = ln(wage)

        参数:
        ------
        wage_col : str
            工资变量名
        education_col : str
            教育年限变量名
        age_col : str
            年龄变量名
        start_age : int, default=6
            开始上学年龄

        返回:
        ------
        pd.DataFrame
            添加 Mincer 变量后的数据

        示例:
        ------
        >>> constructor.mincer_equation_vars('wage', 'education', 'age')
        """
        required_cols = [wage_col, education_col, age_col]
        for col in required_cols:
            if col not in self.df.columns:
                raise ValueError(f"变量 '{col}' 不在数据中")

        # 创建 experience
        self.df['experience'] = self.df[age_col] - self.df[education_col] - start_age
        self.df['experience'] = self.df['experience'].clip(lower=0)  # 确保非负

        # 创建 experience²
        self.df['experience_sq'] = self.df['experience'] ** 2

        # 创建 log(wage)
        self.df['log_wage'] = np.log(self.df[wage_col])

        mincer_vars = ['experience', 'experience_sq', 'log_wage']
        self.constructed_vars.extend(mincer_vars)

        print(" 创建 Mincer 方程变量:")
        print(f"   - experience = {age_col} - {education_col} - {start_age}")
        print(f"   - experience_sq = experience²")
        print(f"   - log_wage = ln({wage_col})")

        return self.df

    def did_variables(self, treat_col: str, post_period: Union[int, str],
                     time_col: Optional[str] = None) -> pd.DataFrame:
        """
        构造 DID 变量

        变量:
        -----
        - treat: 处理组指示变量(已存在)
        - post: 政策后指示变量
        - treat_post: 交互项(DID 估计量)

        参数:
        ------
        treat_col : str
            处理组变量名
        post_period : int or str
            政策实施时间点
        time_col : str, optional
            时间变量名,默认使用 self.time_col

        返回:
        ------
        pd.DataFrame
            添加 DID 变量后的数据

        示例:
        ------
        >>> constructor.did_variables('treated', post_period=2010)
        """
        if treat_col not in self.df.columns:
            raise ValueError(f"处理组变量 '{treat_col}' 不在数据中")

        time_col = time_col or self.time_col
        if not time_col:
            raise ValueError("必须指定 time_col")

        if time_col not in self.df.columns:
            raise ValueError(f"时间变量 '{time_col}' 不在数据中")

        # 创建 post 变量
        self.df['post'] = (self.df[time_col] >= post_period).astype(int)

        # 创建 DID 交互项
        self.df['treat_post'] = self.df[treat_col] * self.df['post']

        did_vars = ['post', 'treat_post']
        self.constructed_vars.extend(did_vars)

        print(" 创建 DID 变量:")
        print(f"   - post = ({time_col} >= {post_period})")
        print(f"   - treat_post = {treat_col} × post")
        print(f"\nDID 回归方程:")
        print(f"   Y = β₀ + β₁·{treat_col} + β₂·post + β₃·treat_post + ε")
        print(f"   β₃ = DID 估计量(处理效应)")

        return self.df

    def decompose_treatment_effect(self, outcome: str, treatment: str,
                                   covariates: Optional[List[str]] = None) -> dict:
        """
        处理效应分解

        理论:
        -----
        ATE = E[Y(1) - Y(0)]
            = [E[Y|D=1] - E[Y|D=0]]
              - {E[Y(0)|D=1] - E[Y(0)|D=0]}
            = Observed Difference - Selection Bias

        参数:
        ------
        outcome : str
            结果变量
        treatment : str
            处理变量(0/1)
        covariates : list of str, optional
            控制变量

        返回:
        ------
        dict
            包含分解结果的字典

        示例:
        ------
        >>> results = constructor.decompose_treatment_effect('wage', 'college_degree')
        """
        if outcome not in self.df.columns:
            raise ValueError(f"结果变量 '{outcome}' 不在数据中")
        if treatment not in self.df.columns:
            raise ValueError(f"处理变量 '{treatment}' 不在数据中")

        # 观测差异
        treated = self.df[self.df[treatment] == 1][outcome]
        control = self.df[self.df[treatment] == 0][outcome]
        observed_diff = treated.mean() - control.mean()

        # 回归调整(控制协变量)
        if covariates:
            X = sm.add_constant(self.df[[treatment] + covariates])
        else:
            X = sm.add_constant(self.df[[treatment]])

        y = self.df[outcome]
        model = sm.OLS(y, X).fit(cov_type='HC3')

        ate = model.params[treatment]
        se = model.bse[treatment]

        # 选择偏误
        selection_bias = observed_diff - ate

        results = {
            'observed_difference': observed_diff,
            'ate': ate,
            'se': se,
            'selection_bias': selection_bias,
            't_stat': ate / se,
            'p_value': model.pvalues[treatment],
            'ci_lower': ate - 1.96 * se,
            'ci_upper': ate + 1.96 * se
        }

        print(" 处理效应分解:")
        print(f"   观测差异:{observed_diff:.4f}")
        print(f"   ATE:{ate:.4f} (SE: {se:.4f})")
        print(f"   选择偏误:{selection_bias:.4f}")
        print(f"   95% CI:[{results['ci_lower']:.4f}, {results['ci_upper']:.4f}]")
        print(f"   t-stat:{results['t_stat']:.4f}, p-value:{results['p_value']:.4f}")

        return results

    def get_constructed_vars(self) -> List[str]:
        """返回所有构造的变量名"""
        return self.constructed_vars

    def summary(self):
        """打印变量构造摘要"""
        print("\n" + "="*60)
        print(" 变量构造摘要")
        print("="*60)
        print(f"原始变量数:{len(self.df.columns) - len(self.constructed_vars)}")
        print(f"构造变量数:{len(self.constructed_vars)}")
        print(f"总变量数:{len(self.df.columns)}")

        if self.id_col and self.time_col:
            n_id = self.df[self.id_col].nunique()
            n_time = self.df[self.time_col].nunique()
            print(f"\n面板结构:{n_id} 个体 × {n_time} 期 = {len(self.df)} 观测")

        print(f"\n构造的变量:")
        for var in self.constructed_vars:
            print(f"  - {var}")

        print("="*60)

完整案例:NLSY97 Mincer 工资方程

案例背景

数据:National Longitudinal Survey of Youth 1997 (NLSY97)

研究问题

  1. 教育年限对工资的回报率是多少?
  2. 教育回报率是否因性别而异?
  3. 工作经验对工资的影响是否呈现倒U型?
  4. 地区和行业差异如何影响工资?

数据模拟(替代真实 NLSY97)

python
import pandas as pd
import numpy as np
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns

# 设置随机种子
np.random.seed(2024)

# 模拟 5000 个个体
n = 5000

# 生成基础特征
df = pd.DataFrame({
    'person_id': range(1, n+1),
    'age': np.random.uniform(25, 55, n),
    'education': np.random.choice([12, 14, 16, 18, 20], n, p=[0.3, 0.2, 0.3, 0.15, 0.05]),
    'female': np.random.binomial(1, 0.5, n),
    'race': np.random.choice(['White', 'Black', 'Hispanic', 'Other'], n, p=[0.6, 0.15, 0.2, 0.05]),
    'urban': np.random.binomial(1, 0.7, n),
    'married': np.random.binomial(1, 0.55, n),
    'region': np.random.choice(['Northeast', 'South', 'Midwest', 'West'], n, p=[0.18, 0.37, 0.21, 0.24]),
    'industry': np.random.choice(['Manufacturing', 'Service', 'Tech', 'Finance', 'Other'], n,
                                 p=[0.15, 0.35, 0.2, 0.15, 0.15])
})

# 计算工作经验
df['experience'] = df['age'] - df['education'] - 6
df['experience'] = df['experience'].clip(lower=0)

# 生成工资(基于真实经济学关系)
log_wage = (
    9.5 +                                          # 基础对数工资
    0.08 * df['education'] +                       # 教育回报率(8%)
    0.05 * df['experience'] +                      # 经验回报率
    -0.0008 * df['experience']**2 +                # 经验的边际递减
    -0.15 * df['female'] +                         # 性别工资差距
    -0.02 * df['education'] * df['female'] +       # 教育回报率的性别差异
    0.10 * (df['race'] == 'White') +               # 种族差异
    -0.08 * (df['race'] == 'Black') +
    -0.05 * (df['race'] == 'Hispanic') +
    0.05 * df['urban'] +                           # 城市溢价
    0.08 * df['married'] +                         # 婚姻溢价
    0.12 * (df['region'] == 'West') +              # 地区差异
    0.08 * (df['region'] == 'Northeast') +
    -0.05 * (df['region'] == 'South') +
    0.15 * (df['industry'] == 'Tech') +            # 行业差异
    0.12 * (df['industry'] == 'Finance') +
    -0.08 * (df['industry'] == 'Service') +
    np.random.normal(0, 0.3, n)                    # 误差项
)

df['wage'] = np.exp(log_wage)

print(" 数据生成完成")
print(f"样本量:{len(df)}")
print(f"\n描述统计:")
print(df[['age', 'education', 'experience', 'wage']].describe())

使用 VariableConstructor 构造变量

python
# 初始化构造器
constructor = VariableConstructor(df)

# 1. 创建 Mincer 方程基础变量
constructor.mincer_equation_vars('wage', 'education', 'age', start_age=6)

# 2. 创建虚拟变量
constructor.create_dummies('race', prefix='race', ref_category='White')
constructor.create_dummies('region', prefix='region', ref_category='Northeast')
constructor.create_dummies('industry', prefix='industry', ref_category='Other')

# 3. 创建交互项
constructor.create_interactions('education', 'female', center=True, name='educ_x_female')
constructor.create_interactions('experience', 'female', center=False, name='exp_x_female')
constructor.create_interactions('education', 'experience', center=True, name='educ_x_exp')

# 4. 创建分组统计变量
constructor.create_group_stats('wage', 'region', stats=['mean', 'std'])
constructor.create_group_stats('wage', 'industry', stats=['mean', 'median'])

# 5. 查看构造的变量
print("\n构造的变量:")
for var in constructor.get_constructed_vars():
    print(f"  - {var}")

# 获取构造后的数据
df_constructed = constructor.df

估计 Mincer 工资方程

python
# 模型 1:基础 Mincer 方程
X1 = sm.add_constant(df_constructed[['education', 'experience', 'experience_sq']])
y = df_constructed['log_wage']
model1 = sm.OLS(y, X1).fit(cov_type='HC3')

print("\n" + "="*80)
print("模型 1:基础 Mincer 工资方程")
print("="*80)
print(model1.summary().tables[1])

# 模型 2:加入人口统计学特征
X2 = sm.add_constant(df_constructed[[
    'education', 'experience', 'experience_sq',
    'female', 'race_Black', 'race_Hispanic', 'race_Other',
    'urban', 'married'
]])
model2 = sm.OLS(y, X2).fit(cov_type='HC3')

print("\n" + "="*80)
print("模型 2:加入人口统计学特征")
print("="*80)
print(model2.summary().tables[1])

# 模型 3:加入交互项
X3 = sm.add_constant(df_constructed[[
    'education', 'experience', 'experience_sq',
    'female', 'race_Black', 'race_Hispanic', 'race_Other',
    'urban', 'married',
    'educ_x_female', 'exp_x_female'
]])
model3 = sm.OLS(y, X3).fit(cov_type='HC3')

print("\n" + "="*80)
print("模型 3:加入交互项(性别异质性)")
print("="*80)
print(model3.summary().tables[1])

# 模型 4:完整模型(加入地区和行业)
X4 = sm.add_constant(df_constructed[[
    'education', 'experience', 'experience_sq',
    'female', 'race_Black', 'race_Hispanic', 'race_Other',
    'urban', 'married',
    'educ_x_female', 'exp_x_female',
    'region_South', 'region_Midwest', 'region_West',
    'industry_Manufacturing', 'industry_Service', 'industry_Tech', 'industry_Finance'
]])
model4 = sm.OLS(y, X4).fit(cov_type='HC3')

print("\n" + "="*80)
print("模型 4:完整模型(地区 + 行业)")
print("="*80)
print(model4.summary().tables[1])

结果解释

python
print("\n" + "="*80)
print(" 结果解释")
print("="*80)

# 教育回报率
beta_educ = model4.params['education']
print(f"\n1. 教育回报率")
print(f"   - 每增加 1 年教育,工资增长 {beta_educ*100:.2f}%")
print(f"   - 95% 置信区间:[{(beta_educ - 1.96*model4.bse['education'])*100:.2f}%, "
      f"{(beta_educ + 1.96*model4.bse['education'])*100:.2f}%]")

# 性别工资差距
beta_female = model4.params['female']
print(f"\n2. 性别工资差距")
print(f"   - 女性工资比男性低 {abs(beta_female)*100:.2f}%")
print(f"   - p-value: {model4.pvalues['female']:.4f}")

# 教育回报率的性别差异
beta_educ_female = model4.params['educ_x_female']
print(f"\n3. 教育回报率的性别差异")
print(f"   - 男性教育回报率:{beta_educ*100:.2f}%")
print(f"   - 女性教育回报率:{(beta_educ + beta_educ_female)*100:.2f}%")
print(f"   - 差异:{beta_educ_female*100:.2f}% (女性较低)")

# 经验的倒U型效应
beta_exp = model4.params['experience']
beta_exp_sq = model4.params['experience_sq']
optimal_exp = -beta_exp / (2 * beta_exp_sq)
print(f"\n4. 工作经验的倒U型效应")
print(f"   - 经验系数:{beta_exp:.6f}")
print(f"   - 经验平方系数:{beta_exp_sq:.6f}")
print(f"   - 工资最大化的经验年数:{optimal_exp:.1f} 年")

# 地区差异
print(f"\n5. 地区工资差异(相对于 Northeast)")
for region in ['South', 'Midwest', 'West']:
    var = f'region_{region}'
    if var in model4.params.index:
        print(f"   - {region}: {model4.params[var]*100:.2f}% ({model4.pvalues[var]:.4f})")

# 行业差异
print(f"\n6. 行业工资差异(相对于 Other)")
for industry in ['Manufacturing', 'Service', 'Tech', 'Finance']:
    var = f'industry_{industry}'
    if var in model4.params.index:
        print(f"   - {industry}: {model4.params[var]*100:.2f}% ({model4.pvalues[var]:.4f})")

# 模型拟合优度
print(f"\n7. 模型拟合优度")
print(f"   - R²: {model4.rsquared:.4f}")
print(f"   - Adjusted R²: {model4.rsquared_adj:.4f}")
print(f"   - F-statistic: {model4.fvalue:.2f} (p-value: {model4.f_pvalue:.4e})")

可视化分析

python
# 1. 教育-工资关系(分性别)
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
for gender, label in [(0, '男性'), (1, '女性')]:
    subset = df_constructed[df_constructed['female'] == gender]
    avg_wage = subset.groupby('education')['wage'].mean()
    plt.plot(avg_wage.index, avg_wage.values, marker='o', label=label, linewidth=2)
plt.xlabel('教育年限', fontsize=12)
plt.ylabel('平均工资(美元)', fontsize=12)
plt.title('教育-工资关系(分性别)', fontsize=14)
plt.legend(fontsize=12)
plt.grid(alpha=0.3)

# 2. 经验-工资关系(倒U型)
plt.subplot(1, 2, 2)
exp_range = np.linspace(0, 40, 100)
log_wage_pred = (model4.params['const'] +
                 model4.params['education'] * df_constructed['education'].mean() +
                 model4.params['experience'] * exp_range +
                 model4.params['experience_sq'] * exp_range**2)
wage_pred = np.exp(log_wage_pred)
plt.plot(exp_range, wage_pred, linewidth=2, color='green')
plt.axvline(optimal_exp, color='red', linestyle='--', label=f'最优经验 = {optimal_exp:.1f} 年')
plt.xlabel('工作经验(年)', fontsize=12)
plt.ylabel('预测工资(美元)', fontsize=12)
plt.title('经验-工资关系(倒U型)', fontsize=14)
plt.legend(fontsize=12)
plt.grid(alpha=0.3)

plt.tight_layout()
plt.show()

# 3. 地区和行业工资分布
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 地区
df_constructed.boxplot(column='wage', by='region', ax=axes[0])
axes[0].set_title('各地区工资分布', fontsize=14)
axes[0].set_xlabel('地区', fontsize=12)
axes[0].set_ylabel('工资(美元)', fontsize=12)
plt.sca(axes[0])
plt.xticks(rotation=45)

# 行业
df_constructed.boxplot(column='wage', by='industry', ax=axes[1])
axes[1].set_title('各行业工资分布', fontsize=14)
axes[1].set_xlabel('行业', fontsize=12)
axes[1].set_ylabel('工资(美元)', fontsize=12)
plt.sca(axes[1])
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

# 4. 残差诊断
fig, axes = plt.subplots(2, 2, figsize=(12, 10))

# 残差 vs 拟合值
axes[0, 0].scatter(model4.fittedvalues, model4.resid, alpha=0.5)
axes[0, 0].axhline(y=0, color='r', linestyle='--')
axes[0, 0].set_xlabel('拟合值', fontsize=12)
axes[0, 0].set_ylabel('残差', fontsize=12)
axes[0, 0].set_title('残差 vs 拟合值', fontsize=14)
axes[0, 0].grid(alpha=0.3)

# QQ 图
from scipy import stats as sp_stats
sp_stats.probplot(model4.resid, dist="norm", plot=axes[0, 1])
axes[0, 1].set_title('QQ 图(正态性检验)', fontsize=14)
axes[0, 1].grid(alpha=0.3)

# 残差直方图
axes[1, 0].hist(model4.resid, bins=50, edgecolor='black', alpha=0.7)
axes[1, 0].set_xlabel('残差', fontsize=12)
axes[1, 0].set_ylabel('频数', fontsize=12)
axes[1, 0].set_title('残差分布', fontsize=14)
axes[1, 0].grid(alpha=0.3)

# 标准化残差
from statsmodels.stats.outliers_influence import OLSInfluence
influence = OLSInfluence(model4)
standardized_resid = influence.resid_studentized_internal
axes[1, 1].scatter(range(len(standardized_resid)), standardized_resid, alpha=0.5)
axes[1, 1].axhline(y=3, color='r', linestyle='--', label='±3σ')
axes[1, 1].axhline(y=-3, color='r', linestyle='--')
axes[1, 1].set_xlabel('观测序号', fontsize=12)
axes[1, 1].set_ylabel('标准化残差', fontsize=12)
axes[1, 1].set_title('标准化残差(异常值检测)', fontsize=14)
axes[1, 1].legend(fontsize=10)
axes[1, 1].grid(alpha=0.3)

plt.tight_layout()
plt.show()

异质性分析:教育回报率的地区差异

python
# 分地区估计教育回报率
print("\n" + "="*80)
print(" 异质性分析:教育回报率的地区差异")
print("="*80)

results_by_region = {}

for region in df_constructed['region'].unique():
    subset = df_constructed[df_constructed['region'] == region]
    X_subset = sm.add_constant(subset[['education', 'experience', 'experience_sq', 'female']])
    y_subset = subset['log_wage']
    model_subset = sm.OLS(y_subset, X_subset).fit(cov_type='HC3')

    educ_return = model_subset.params['education']
    educ_se = model_subset.bse['education']

    results_by_region[region] = {
        'return': educ_return,
        'se': educ_se,
        'ci_lower': educ_return - 1.96 * educ_se,
        'ci_upper': educ_return + 1.96 * educ_se
    }

    print(f"\n{region}:")
    print(f"  教育回报率: {educ_return*100:.2f}% (SE: {educ_se*100:.2f}%)")
    print(f"  95% CI: [{results_by_region[region]['ci_lower']*100:.2f}%, "
          f"{results_by_region[region]['ci_upper']*100:.2f}%]")

# 可视化地区差异
fig, ax = plt.subplots(figsize=(10, 6))
regions = list(results_by_region.keys())
returns = [results_by_region[r]['return'] * 100 for r in regions]
errors = [1.96 * results_by_region[r]['se'] * 100 for r in regions]

ax.barh(regions, returns, xerr=errors, capsize=5, color='skyblue', edgecolor='black')
ax.set_xlabel('教育回报率(%)', fontsize=12)
ax.set_ylabel('地区', fontsize=12)
ax.set_title('教育回报率的地区差异(95% CI)', fontsize=14)
ax.grid(alpha=0.3, axis='x')
plt.tight_layout()
plt.show()

其他常用变量构造

1. 多项式项(非线性关系)

python
# 研究非线性关系:收入 = f(经验, 经验²)
df['experience_sq'] = df['experience'] ** 2
df['experience_cube'] = df['experience'] ** 3

# 识别最优点
# ∂wage/∂experience = β₁ + 2β₂·experience = 0
# experience* = -β₁ / (2β₂)

2. 比率变量

python
# 计算比率
df['income_per_capita'] = df['household_income'] / df['household_size']
df['debt_to_income'] = df['debt'] / df['income']
df['saving_rate'] = df['saving'] / df['income']

# 对数比率
df['log_debt_to_income'] = np.log(df['debt'] / df['income'])

3. 对数差分(增长率)

python
# 对数差分 ≈ 增长率
df['log_income'] = np.log(df['income'])
df['income_growth'] = df.groupby('person_id')['log_income'].diff()

# 解释:income_growth = 0.05 → 收入增长约 5%

# 验证:
# ln(Y_t) - ln(Y_{t-1}) = ln(Y_t / Y_{t-1}) ≈ (Y_t - Y_{t-1}) / Y_{t-1}

4. 标准化变量(Z-score)

python
from scipy import stats

# Z-score 标准化(总体)
df['income_std'] = stats.zscore(df['income'])

# 组内标准化(Within-group Z-score)
df['income_std_within'] = df.groupby('region')['income'].transform(
    lambda x: stats.zscore(x)
)

# 解释:income_std = 2 → 该个体收入比均值高 2 个标准差

练习题

⭐⭐ 练习 1:创建教育水平虚拟变量

题目:给定教育年限变量,创建教育水平虚拟变量(高中、本科、研究生),参照组为高中。

python
# 数据
df = pd.DataFrame({
    'id': range(1, 11),
    'education_years': [12, 16, 18, 12, 14, 16, 20, 12, 16, 14]
})

# 任务:创建虚拟变量
# 高中:12 年
# 本科:14-16 年
# 研究生:18-20 年

提示:使用 pd.cut() 分组,再用 pd.get_dummies()

⭐⭐⭐ 练习 2:构造交互项并计算边际效应

题目:估计模型

  1. 创建交互项
  2. 运行回归
  3. 计算男性和女性的教育回报率
  4. 检验回报率差异是否显著(Wald 检验)

⭐⭐⭐⭐ 练习 3:动态面板模型

题目:使用 Arellano-Bond 方法估计动态面板模型

  1. 创建滞后项
  2. 一阶差分消除
  3. 使用 作为工具变量
  4. GMM 估计

⭐⭐⭐⭐⭐ 练习 4:异质性处理效应分解

题目:分析大学学位对工资的处理效应,并分解为:

  1. ATE(Average Treatment Effect):平均处理效应
  2. ATT(Average Treatment on the Treated):处理组平均处理效应
  3. ATE vs ATT 差异:选择偏误的证据

使用倾向得分匹配(Propensity Score Matching)进行敏感性分析。


小结

核心函数总结

任务方法数学表示应用
虚拟变量pd.get_dummies()分类变量数值化
交互项df['A'] * df['B']效应异质性
滞后项df.shift()动态模型
分组统计groupby().transform()控制群体效应
多项式df['x'] ** 2非线性关系

最佳实践

  1. 理解参照组:虚拟变量回归中明确参照组
  2. 避免多重共线性:使用 drop_first=True 或指定参照组
  3. 中心化交互项:提高解释性,减少共线性
  4. 面板数据排序:创建滞后项前先按 idtime 排序
  5. 变量命名规范:如 income_lag1, edu_x_female, region_mean_wage
  6. 文档化:记录每个变量的构造逻辑和理论依据
  7. 验证:检查构造变量的分布和缺失值

理论要点

  1. 虚拟变量陷阱 导致完全共线性
  2. 交互项解释 表示 的边际效应如何随 变化
  3. 滞后算子性质
  4. Within Transformation 消除组固定效应
  5. Mincer 方程

下一步

下一节:数据转换(标准化、对数转换、Box-Cox 变换、分箱)


参考文献

  1. Angrist, J. D., & Pischke, J. S. (2009). Mostly Harmless Econometrics: An Empiricist's Companion. Princeton University Press.

    • 第 3 章:Making Regression Make Sense
    • 交互项与异质性效应
  2. Wooldridge, J. M. (2010). Econometric Analysis of Cross Section and Panel Data (2nd ed.). MIT Press.

    • 第 10 章:Basic Linear Unobserved Effects Panel Data Models
    • 动态面板模型
  3. Mincer, J. (1974). Schooling, Experience, and Earnings. NBER.

    • Mincer 工资方程的原始文献
  4. Arellano, M., & Bond, S. (1991). Some Tests of Specification for Panel Data: Monte Carlo Evidence and an Application to Employment Equations. The Review of Economic Studies, 58(2), 277-297.

    • 动态面板 GMM 估计
  5. Cameron, A. C., & Trivedi, P. K. (2005). Microeconometrics: Methods and Applications. Cambridge University Press.

    • 第 21 章:Panel Data Models
    • 第 25 章:Treatment Evaluation

版本:StatsPai Module 3.4 - Nobel Prize Standards 最后更新:2024-01-15 作者:StatsPai Team 许可:CC BY-NC-SA 4.0

基于 MIT 许可证发布。内容版权归作者所有。