Skip to content

13.6 前沿专题与综合应用

"The credibility revolution in empirical economics has fundamentally changed how we think about evidence.""实证经济学的可信性革命从根本上改变了我们对证据的看法。"— Joshua Angrist & Jörn-Steffen Pischke

因果推断前沿:Staggered DID、外部效度与综合政策评估

难度综合性应用


本节目标

  • 掌握Staggered DID及其最新进展
  • 理解Bacon Decomposition和TWFE偏误
  • 学习Shift-Share IV方法
  • 理解外部效度挑战与SUTVA违反
  • 区分ML预测vs因果推断
  • 完成综合政策评估案例研究
  • 整合Modules 8-13知识

专题1:Staggered DID的革命

问题:传统TWFE的偏误

**经典2WFE (Two-Way Fixed Effects)**模型:

假设: 所有单位在同一时间接受处理

现实: 交错采用(Staggered Adoption)

示例:州级政策
单位    2015  2016  2017  2018  2019
州A      0     1     1     1     1    (2016采用)
州B      0     0     1     1     1    (2017采用)
州C      0     0     0     1     1    (2018采用)
州D      0     0     0     0     0    (从不采用)

Goodman-Bacon分解(2021)

发现: TWFE估计量是多个2×2 DID的加权平均:

问题: 权重可能为!

三类比较:

1. 早vs晚(Timing Groups):

权重

2. 处理vs从不处理:

权重

3. 禁止比较(Forbidden Comparison) ️:

危险: 如果效应随时间变化,组充当组的"坏对照"!

结果: TWFE可能有负权重,估计有偏甚至错符号!

案例:最低工资研究

Dube et al. (2010) vs Neumark et al. (2014):

争议: 使用相同数据,不同方法,得出相反结论!

Goodman-Bacon解释: TWFE包含"禁止比较",导致偏误。

Python实现:Bacon Decomposition

python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']

def bacon_decomposition(data, outcome, treatment, unit_id, time_id):
    """
    Goodman-Bacon分解

    参数:
    ------
    data: DataFrame
    outcome: 结果变量名
    treatment: 处理变量名(0/1)
    unit_id: 单位ID
    time_id: 时间ID

    返回:
    ------
    decomp: DataFrame, 包含各2×2 DID及权重
    """
    df = data.copy()

    # 识别采用时间
    treated_units = df[df[treatment]==1][unit_id].unique()
    adoption_times = {}
    for unit in treated_units:
        unit_data = df[df[unit_id]==unit]
        first_treated = unit_data[unit_data[treatment]==1][time_id].min()
        adoption_times[unit] = first_treated

    # 分组
    timing_groups = {}
    for unit, time in adoption_times.items():
        if time not in timing_groups:
            timing_groups[time] = []
        timing_groups[time].append(unit)

    # 从不处理组
    never_treated = df.groupby(unit_id)[treatment].sum()
    never_treated = never_treated[never_treated == 0].index.tolist()

    # 计算所有2×2 DID
    comparisons = []

    # 处理组 vs 从不处理
    for g_time, g_units in timing_groups.items():
        if len(never_treated) > 0:
            # 计算DD
            pre = df[df[time_id] < g_time]
            post = df[df[time_id] >= g_time]

            # 处理组
            treated_pre = pre[pre[unit_id].isin(g_units)][outcome].mean()
            treated_post = post[post[unit_id].isin(g_units)][outcome].mean()

            # 对照组
            control_pre = pre[pre[unit_id].isin(never_treated)][outcome].mean()
            control_post = post[post[unit_id].isin(never_treated)][outcome].mean()

            dd = (treated_post - treated_pre) - (control_post - control_pre)

            # 权重(简化)
            n_treated = len(g_units)
            n_control = len(never_treated)
            weight = n_treated * n_control / ((n_treated + n_control) * len(df))

            comparisons.append({
                'Type': '处理 vs 从不处理',
                'Treated_Group': f'G{g_time}',
                'Control_Group': '从不',
                'DD_Estimate': dd,
                'Weight': weight
            })

    # 早 vs 晚
    sorted_times = sorted(timing_groups.keys())
    for i, t1 in enumerate(sorted_times):
        for t2 in sorted_times[i+1:]:
            # 早组作对照
            early_units = timing_groups[t1]
            late_units = timing_groups[t2]

            pre = df[df[time_id] < t2]
            post = df[df[time_id] >= t2]

            early_pre = pre[(pre[unit_id].isin(early_units)) & (pre[time_id] >= t1)][outcome].mean()
            early_post = post[post[unit_id].isin(early_units)][outcome].mean()

            late_pre = pre[pre[unit_id].isin(late_units)][outcome].mean()
            late_post = post[post[unit_id].isin(late_units)][outcome].mean()

            dd = (late_post - late_pre) - (early_post - early_pre)

            weight = len(early_units) * len(late_units) / (len(df) * (len(early_units) + len(late_units)))

            comparisons.append({
                'Type': '早 vs 晚',
                'Treated_Group': f'G{t2}',
                'Control_Group': f'G{t1}',
                'DD_Estimate': dd,
                'Weight': weight
            })

    decomp = pd.DataFrame(comparisons)

    # TWFE估计
    twfe_estimate = (decomp['DD_Estimate'] * decomp['Weight']).sum() / decomp['Weight'].sum()

    return decomp, twfe_estimate

# 模拟数据
np.random.seed(42)
units = 50
time_periods = 20

data_list = []
for i in range(units):
    # 采用时间
    if i < 10:
        adopt_time = 10  # 早期
    elif i < 25:
        adopt_time = 15  # 晚期
    else:
        adopt_time = 999  # 从不

    for t in range(time_periods):
        treated = 1 if t >= adopt_time else 0

        # 异质性效应(早期效应更大)
        if adopt_time == 10:
            tau = 5
        elif adopt_time == 15:
            tau = 3
        else:
            tau = 0

        y = 50 + 2*t + 10*i + tau*treated + np.random.normal(0, 5)

        data_list.append({
            'unit': i,
            'time': t,
            'treated': treated,
            'y': y
        })

df_stag = pd.DataFrame(data_list)

# Bacon分解
decomp, twfe_est = bacon_decomposition(df_stag, 'y', 'treated', 'unit', 'time')

print("Bacon分解结果:")
print(decomp)
print(f"\nTWFE估计: {twfe_est:.3f}")

# 可视化
fig, ax = plt.subplots(figsize=(10, 6))
colors = {'处理 vs 从不处理': 'blue', '早 vs 晚': 'red'}
for comp_type in decomp['Type'].unique():
    subset = decomp[decomp['Type'] == comp_type]
    ax.scatter(subset['DD_Estimate'], subset['Weight'],
              s=100, alpha=0.6, label=comp_type, color=colors[comp_type])

ax.axvline(x=0, color='black', linestyle='--', alpha=0.5)
ax.axvline(x=twfe_est, color='green', linestyle='-', linewidth=2, label=f'TWFE = {twfe_est:.2f}')
ax.set_xlabel('2×2 DID估计', fontsize=12)
ax.set_ylabel('权重', fontsize=12)
ax.set_title('Goodman-Bacon分解', fontsize=14, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

解决方案:现代Staggered DID

Callaway & Sant'Anna (2021)

核心思想: 计算群组-时间ATT,然后聚合

群组(Cohort): 同一时间采用处理的单位集合

ATT(g,t): 群组在时间的ATT

其中是对照组(从不处理或尚未处理)。

聚合:

整体ATT:

权重由群组规模决定。

动态效应:

其中是事件时间(相对于采用)。

Python实现:C&S方法

python
# 需要安装: pip install difference_in_differences

from difference_in_differences import ATT

# 使用Callaway & Sant'Anna
att_cs = ATT(
    data=df_stag,
    idname='unit',
    tname='time',
    gname='first_treat',  # 需要添加此列
    yname='y',
    control_group='nevertreated',  # 或'notyettreated'
    est_method='dr',  # doubly robust
    boot_type='multiplier',
    boot_iterations=1000
)

# 结果
print(f"整体ATT: {att_cs.overall_att:.3f}")
print(f"标准误: {att_cs.overall_se:.3f}")

# 动态效应图
att_cs.plot_dynamic_effects()
plt.title('事件研究图(Callaway & Sant'Anna)', fontsize=14)
plt.show()

Sun & Abraham (2021):交互加权估计

核心思想: 用交互固定效应控制异质性

模型:

优势: 可用标准回归软件


专题2:Shift-Share工具变量

Bartik工具变量

Bartik (1991) 提出的结构:

其中:

  • : 地区在行业初始份额(share)
  • : 行业全国增长率(shift)

例子: 中国加入WTO对美国地区劳动市场的影响

Autor et al. (2013):

  • : 1990年地区在行业的就业份额
  • : 该行业进口增长

识别条件

Borusyak et al. (2022) 的新理论:

关键假设:

含义: 条件于份额,增长率外生。

vs传统: 不需要每个都外生!

Python示例

python
def bartik_iv(data, shares, shocks):
    """
    构造Bartik工具变量

    参数:
    ------
    data: DataFrame, 包含地区-行业数据
    shares: str, 份额变量名
    shocks: str, 冲击变量名

    返回:
    ------
    Z: Bartik IV
    """
    # 假设data有列: region, industry, share_0, shock
    Z = data.groupby('region').apply(
        lambda x: (x[shares] * x[shocks]).sum()
    )

    return Z

# 模拟示例
regions = 100
industries = 10

bartik_data = []
for i in range(regions):
    for k in range(industries):
        share = np.random.dirichlet(np.ones(industries))[k]  # 份额
        shock = np.random.normal(0, 1)  # 全国冲击
        bartik_data.append({
            'region': i,
            'industry': k,
            'share_0': share,
            'shock': shock
        })

df_bartik = pd.DataFrame(bartik_data)

# 构造IV
Z_bartik = bartik_iv(df_bartik, 'share_0', 'shock')
print(f"Bartik IV均值: {Z_bartik.mean():.3f}")
print(f"Bartik IV标准差: {Z_bartik.std():.3f}")

专题3:外部效度挑战

内部效度vs外部效度

内部效度(Internal Validity):

"估计的因果效应在研究样本中是否无偏?"

外部效度(External Validity):

"估计的因果效应能否推广到其他人群/情境?"

SUTVA违反

SUTVA (Stable Unit Treatment Value Assumption):

假设1: 无干扰

个体的结果只依赖自己的处理状态。

违反场景:

1. 溢出效应(Spillover):

  • 处理邻居的农民也受益(知识传播)
  • 疫苗接种的群体免疫

2. 一般均衡效应(General Equilibrium):

  • 培训增加劳动供给 → 工资下降
  • 所有人采用技术 → 相对优势消失

示例:疫苗的网络效应

python
import networkx as nx

# 构造社交网络
G = nx.watts_strogatz_graph(100, 4, 0.1)

# 模拟SUTVA违反
np.random.seed(42)
D = np.random.binomial(1, 0.3, 100)  # 30%接种

# 直接效应
tau_direct = 0.8  # 降低80%感染风险

# 间接效应(邻居接种保护)
tau_indirect = 0.3  # 每个邻居接种降低30%

# 结果
Y = []
for i in range(100):
    # 基础感染风险
    p_infect = 0.5

    # 直接保护
    if D[i] == 1:
        p_infect *= (1 - tau_direct)

    # 间接保护(邻居效应)
    neighbors = list(G.neighbors(i))
    vaccinated_neighbors = sum(D[j] for j in neighbors)
    p_infect *= (1 - tau_indirect) ** vaccinated_neighbors

    # 感染
    y = np.random.binomial(1, p_infect)
    Y.append(y)

# 估计
naive_ate = np.mean([Y[i] for i in range(100) if D[i]==1]) - \
            np.mean([Y[i] for i in range(100) if D[i]==0])

print(f"简单ATE估计: {naive_ate:.3f}")
print(f"真实直接效应: -{tau_direct:.3f}")
print("注意: 简单估计混合了直接和间接效应!")

缓解策略

1. 聚类随机化: 整个集群一起处理

2. 网络建模: 明确建模溢出效应

3. 谨慎推广: 报告LATE而非ATE


专题4:机器学习预测vs因果推断

根本差异

维度ML预测因果推断
目标 预测精度无偏因果效应
关注
正则化鼓励(防过拟合)谨慎(可能引入偏差)
变量选择越多越好必须理论驱动
评估预测误差因果效应估计

危险例子:混淆变量控制

ML思维: "加入所有变量,让模型选择"

因果推断: "控制对撞变量 → 引入偏差!"

python
# 数据生成
X = np.random.randn(1000)
D = np.random.randn(1000)
Y = D + X + np.random.randn(1000) * 0.5  # 真实效应 = 1

# 对撞变量
Z = X + Y + np.random.randn(1000) * 0.5

# 错误:控制Z
from sklearn.linear_model import LinearRegression

model_wrong = LinearRegression()
model_wrong.fit(np.column_stack([D, X, Z]), Y)
print(f"控制对撞变量Z: β̂_D = {model_wrong.coef_[0]:.3f} (有偏!)")

# 正确:只控制X
model_right = LinearRegression()
model_right.fit(np.column_stack([D, X]), Y)
print(f"只控制混淆X: β̂_D = {model_right.coef_[0]:.3f} (无偏!)")

Double/Debiased Machine Learning

Chernozhukov et al. (2018) 的解决方案:

核心思想: ML预测 + 去偏

步骤:

1. 交叉拟合: 样本分割

2. 预测残差:

3. 估计效应:

性质: 即使ML模型不完美,仍然-一致!


综合案例:政策评估完整流程

背景:某市教育改革评估

政策: 2018年,某市部分学校试点小班化教学

研究问题: 小班化对学生成绩的因果效应?

数据:

  • 100所学校,2015-2020年
  • 30所学校2018年采用,20所2019年采用,50所从不采用
  • 结果: 学生平均分数
  • 协变量: 学校规模、师生比、地区GDP等

完整Python分析

python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from linearmodels.panel import PanelOLS
from statsmodels.formula.api import ols
import warnings
warnings.filterwarnings('ignore')

plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False

print("="*70)
print("综合案例:教育改革因果评估")
print("="*70)

# =================================================================
# 第1步:数据生成
# =================================================================

np.random.seed(2024)
schools = 100
years = range(2015, 2021)

data_list = []
for school in range(schools):
    # 学校固定效应
    school_effect = np.random.normal(70, 10)

    # 采用时间
    if school < 30:
        adopt_year = 2018  # 早期采用
        tau = 5  # 效应
    elif school < 50:
        adopt_year = 2019  # 晚期采用
        tau = 3
    else:
        adopt_year = 9999  # 从不采用
        tau = 0

    # 协变量
    size = np.random.randint(500, 2000)
    teacher_ratio = np.random.uniform(15, 25)
    gdp = np.random.normal(10, 2)

    for year in years:
        # 处理状态
        treated = 1 if year >= adopt_year else 0

        # 时间趋势
        time_effect = 0.5 * (year - 2015)

        # 结果
        score = (school_effect + time_effect +
                tau * treated +
                0.01 * size +
                -0.5 * teacher_ratio +
                0.8 * gdp +
                np.random.normal(0, 3))

        data_list.append({
            'school': school,
            'year': year,
            'treated': treated,
            'adopt_year': adopt_year if adopt_year < 9999 else 0,
            'score': score,
            'size': size,
            'teacher_ratio': teacher_ratio,
            'gdp': gdp
        })

df = pd.DataFrame(data_list)
df['post_2018'] = (df['year'] >= 2018).astype(int)
df['post_2019'] = (df['year'] >= 2019).astype(int)

print(f"\n样本概况:")
print(f"学校数: {schools}")
print(f"年份: {min(years)}-{max(years)}")
print(f"2018年采用: 30所")
print(f"2019年采用: 20所")
print(f"从不采用: 50所")

# =================================================================
# 第2步:探索性分析
# =================================================================

print("\n" + "="*70)
print("第1步:探索性分析")
print("="*70)

# 趋势图
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# 子图1:平均得分趋势
ax1 = axes[0]
for adopt in [2018, 2019, 0]:
    if adopt == 0:
        subset = df[df['adopt_year'] == 0]
        label = '从不采用'
        color = 'gray'
    else:
        subset = df[df['adopt_year'] == adopt]
        label = f'{adopt}年采用'
        color = 'red' if adopt == 2018 else 'blue'

    trend = subset.groupby('year')['score'].mean()
    ax1.plot(trend.index, trend.values, 'o-', label=label,
            color=color, linewidth=2, markersize=8)

ax1.axvline(x=2018, color='red', linestyle='--', alpha=0.5)
ax1.axvline(x=2019, color='blue', linestyle='--', alpha=0.5)
ax1.set_xlabel('年份', fontsize=12)
ax1.set_ylabel('平均分数', fontsize=12)
ax1.set_title('各组平均分数趋势', fontsize=14, fontweight='bold')
ax1.legend(fontsize=11)
ax1.grid(True, alpha=0.3)

# 子图2:协变量平衡
ax2 = axes[1]
balance_vars = ['size', 'teacher_ratio', 'gdp']
treated_means = df[df['treated']==1][balance_vars].mean()
control_means = df[df['treated']==0][balance_vars].mean()

x = np.arange(len(balance_vars))
width = 0.35
ax2.bar(x - width/2, treated_means, width, label='处理组', alpha=0.7)
ax2.bar(x + width/2, control_means, width, label='对照组', alpha=0.7)
ax2.set_xticks(x)
ax2.set_xticklabels(['学校规模', '师生比', '地区GDP'], fontsize=11)
ax2.set_ylabel('均值', fontsize=12)
ax2.set_title('协变量平衡性', fontsize=14, fontweight='bold')
ax2.legend(fontsize=11)
ax2.grid(True, axis='y', alpha=0.3)

plt.tight_layout()
plt.show()

# =================================================================
# 第3步:方法1 - 传统TWFE(有偏!)
# =================================================================

print("\n" + "="*70)
print("方法1:传统两向固定效应(TWFE) - 可能有偏!")
print("="*70)

# 设置面板索引
df_panel = df.set_index(['school', 'year'])

# TWFE模型
twfe = PanelOLS.from_formula(
    'score ~ treated + EntityEffects + TimeEffects',
    data=df_panel
).fit(cov_type='clustered', cluster_entity=True)

print(twfe.summary.tables[1])
print(f"\nTWFE估计: {twfe.params['treated']:.3f} (se={twfe.std_errors['treated']:.3f})")
print("警告: 此估计可能受Goodman-Bacon偏误影响!")

# =================================================================
# 第4步:方法2 - 事件研究设计
# =================================================================

print("\n" + "="*70)
print("方法2:事件研究设计(Event Study)")
print("="*70)

# 构造事件时间
df['event_time'] = df['year'] - df['adopt_year']
df.loc[df['adopt_year']==0, 'event_time'] = -999  # 从不处理

# 创建事件时间虚拟变量(以-1为基准)
event_times = range(-3, 4)
for e in event_times:
    if e != -1:  # 省略-1作为基准
        df[f'event_{e}'] = ((df['event_time'] == e) &
                           (df['adopt_year'] > 0)).astype(int)

# 事件研究回归
event_formula = 'score ~ ' + ' + '.join([f'event_{e}' for e in event_times if e != -1]) + \
                ' + size + teacher_ratio + gdp + C(school) + C(year)'

event_model = ols(event_formula, data=df).fit(cov_type='cluster',
                                               cov_kwds={'groups': df['school']})

# 提取系数
event_coefs = []
event_ses = []
for e in event_times:
    if e == -1:
        event_coefs.append(0)
        event_ses.append(0)
    else:
        coef_name = f'event_{e}'
        if coef_name in event_model.params:
            event_coefs.append(event_model.params[coef_name])
            event_ses.append(event_model.bse[coef_name])
        else:
            event_coefs.append(0)
            event_ses.append(0)

# 可视化事件研究
fig, ax = plt.subplots(figsize=(12, 7))
ax.errorbar(event_times, event_coefs,
           yerr=1.96*np.array(event_ses),
           fmt='o-', capsize=5, capthick=2,
           color='steelblue', linewidth=2, markersize=8)
ax.axhline(y=0, color='black', linestyle='-', linewidth=1)
ax.axvline(x=-0.5, color='red', linestyle='--', linewidth=2, alpha=0.7)
ax.fill_between(event_times,
                np.array(event_coefs) - 1.96*np.array(event_ses),
                np.array(event_coefs) + 1.96*np.array(event_ses),
                alpha=0.2, color='steelblue')
ax.set_xlabel('事件时间(相对于政策实施)', fontsize=12, fontweight='bold')
ax.set_ylabel('估计效应', fontsize=12, fontweight='bold')
ax.set_title('事件研究图:动态处理效应', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\n平行趋势检验:")
pre_treatment_coefs = [event_coefs[i] for i, e in enumerate(event_times) if e < -1]
print(f"政策前系数: {pre_treatment_coefs}")
print(f"平行趋势成立?" + (" " if all(abs(c) < 2 for c in pre_treatment_coefs) else " "))

# =================================================================
# 第5步:方法3 - Callaway & Sant'Anna
# =================================================================

print("\n" + "="*70)
print("方法3:Callaway & Sant'Anna (2021) - 稳健估计")
print("="*70)

# 准备数据
df_cs = df.copy()
df_cs['first_treat'] = df_cs['adopt_year']
df_cs.loc[df_cs['adopt_year']==0, 'first_treat'] = 0

# 手动实现简化版C&S
# 群组ATT
cohort_atts = {}
for cohort in [2018, 2019]:
    cohort_schools = df_cs[df_cs['adopt_year'] == cohort]['school'].unique()
    never_schools = df_cs[df_cs['adopt_year'] == 0]['school'].unique()

    # 政策后平均
    post_years = [y for y in years if y >= cohort]
    for t in post_years:
        # 处理组
        treated_post = df_cs[(df_cs['school'].isin(cohort_schools)) &
                            (df_cs['year'] == t)]['score'].mean()
        treated_pre = df_cs[(df_cs['school'].isin(cohort_schools)) &
                           (df_cs['year'] == cohort-1)]['score'].mean()

        # 对照组
        control_post = df_cs[(df_cs['school'].isin(never_schools)) &
                            (df_cs['year'] == t)]['score'].mean()
        control_pre = df_cs[(df_cs['school'].isin(never_schools)) &
                           (df_cs['year'] == cohort-1)]['score'].mean()

        # ATT(g,t)
        att_gt = (treated_post - treated_pre) - (control_post - control_pre)
        cohort_atts[(cohort, t)] = att_gt

# 聚合ATT
overall_att_cs = np.mean(list(cohort_atts.values()))
print(f"\n整体ATT (C&S): {overall_att_cs:.3f}")

# 按群组
print(f"\n按群组分解:")
for cohort in [2018, 2019]:
    cohort_effects = [v for (g,t), v in cohort_atts.items() if g == cohort]
    print(f"  {cohort}年群组: {np.mean(cohort_effects):.3f}")

# =================================================================
# 第6步:方法4 - 合成控制(针对早期群组)
# =================================================================

print("\n" + "="*70)
print("方法4:合成控制法(仅针对2018年群组)")
print("="*70)

# 选择2018年群组的一所学校作为示例
treated_school = 0

# 潜在对照:从不处理的学校
control_schools = df[df['adopt_year'] == 0]['school'].unique()

# 构造Y矩阵
pre_years = [2015, 2016, 2017]
post_years = [2018, 2019, 2020]

Y1 = df[(df['school'] == treated_school) &
       (df['year'].isin(pre_years + post_years))].sort_values('year')['score'].values

Y0 = np.zeros((len(pre_years) + len(post_years), len(control_schools)))
for i, school in enumerate(control_schools):
    Y0[:, i] = df[(df['school'] == school) &
                 (df['year'].isin(pre_years + post_years))].sort_values('year')['score'].values

# 简化权重(等权)
weights = np.ones(len(control_schools)) / len(control_schools)
Y_synthetic = Y0 @ weights

sc_effects = Y1[len(pre_years):] - Y_synthetic[len(pre_years):]
print(f"\n合成控制估计(学校{treated_school}): {sc_effects.mean():.3f}")

# =================================================================
# 第7步:结果汇总与比较
# =================================================================

print("\n" + "="*70)
print("所有方法结果汇总")
print("="*70)

results_summary = pd.DataFrame({
    '方法': [
        'TWFE(可能有偏)',
        '事件研究(post平均)',
        'Callaway & Sant'Anna',
        '合成控制(示例)',
        '真实效应'
    ],
    '估计': [
        twfe.params['treated'],
        np.mean([c for c, e in zip(event_coefs, event_times) if e >= 0]),
        overall_att_cs,
        sc_effects.mean(),
        4.0  # 真实加权平均
    ]
})

print(results_summary.to_string(index=False))

# 可视化比较
fig, ax = plt.subplots(figsize=(10, 6))
methods = results_summary['方法'][:-1]
estimates = results_summary['估计'][:-1]
true_effect = results_summary['估计'].iloc[-1]

bars = ax.barh(methods, estimates, color=['red', 'orange', 'green', 'blue'], alpha=0.7)
ax.axvline(x=true_effect, color='red', linestyle='--', linewidth=2, label='真实效应')
ax.set_xlabel('估计的处理效应', fontsize=12, fontweight='bold')
ax.set_title('不同方法估计结果比较', fontsize=14, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(axis='x', alpha=0.3)
plt.tight_layout()
plt.show()

# =================================================================
# 第8步:政策建议
# =================================================================

print("\n" + "="*70)
print("政策建议")
print("="*70)

print("""
基于因果分析结果:

1. **总体效应**: 小班化教学显著提高学生成绩约4分

2. **异质性**: 早期采用学校效应更大(5分 vs 3分)
   - 可能原因:学习曲线、资源配置

3. **动态效应**: 效应在政策后持续稳定,无衰减

4. **稳健性**: 多种方法结果一致,结论可靠

5. **推荐**:
   - 扩大小班化试点范围
   - 优先资源较好的学校
   - 配套教师培训

6. **注意事项**:
   - 效应可能随规模扩大而减弱(一般均衡)
   - 需要监测长期效应
   - 成本-收益分析
""")

print("\n" + "="*70)
print("分析完成!")
print("="*70)

本章总结

核心要点回顾

Module 8: 面板数据与固定效应

  • 消除不可观测异质性

Module 9: 双重差分(DID)

  • 平行趋势假设
  • 事件研究设计

Module 10: 工具变量(IV)

  • 解决内生性
  • LATE估计

Module 11: 断点回归(RDD)

  • 局部随机化
  • 连续性假设

Module 12: Causal Forest

  • CATE估计
  • 诚实分割

Module 13-前沿:

  • 合成控制: 单一处理单位
  • PSM/IPW/DR: 观察性数据匹配
  • Meta-learners: 异质性效应
  • 因果图: 识别假设可视化
  • Staggered DID: 交错采用
  • 外部效度: SUTVA与推广

方法选择总决策树

你的研究问题?

├─ 只有1个处理单位? → 合成控制 (13.2)

├─ 观察性数据,需要匹配? → PSM/IPW/DR (13.3)

├─ 想估计异质性? → Meta-learners/CF (13.4, 12)

├─ 复杂因果结构? → DAG/DoWhy (13.5)

├─ 面板+同时采用? → 标准DID (9)

├─ 面板+交错采用? → C&S/Sun-Abraham (13.6)

├─ 断点处理分配? → RDD (11)

└─ 有工具变量? → IV/2SLS (10)

Python工具箱全景

方法核心库
面板FElinearmodels.PanelOLS
DIDlinearmodels, difference_in_differences
IVlinearmodels.IV2SLS
RDDrdrobust
Causal Foresteconml.CausalForestDML
SCMSparseSC
PSM/IPWcausalml, econml
Meta-learnerseconml.metalearners
因果图dowhy, pgmpy
Staggered DIDdifference_in_differences, pyfixest

学习成果

完成Module 13后,你已经:

掌握了现代因果推断的完整工具箱 能够选择正确方法解决实际问题 理解各方法的假设与局限 熟练使用Python因果推断生态 能够进行完整的政策评估分析 达到顶级期刊(AER, QJE)的方法论标准


进阶学习资源

书籍

  1. Angrist & Pischke (2009). Mostly Harmless Econometrics
  2. Cunningham (2021). Causal Inference: The Mixtape
  3. Huntington-Klein (2022). The Effect
  4. Pearl (2009). Causality
  5. Imbens & Rubin (2015). Causal Inference

在线课程

  1. MIT 14.387: Applied Econometrics (Angrist)
  2. Stanford STATS 361: Causal Inference (Athey)
  3. Brady Neal: Causal Inference Course (YouTube)

软件文档

  1. EconML: https://econml.azurewebsites.net
  2. DoWhy: https://microsoft.github.io/dowhy
  3. CausalML: https://causalml.readthedocs.io

** 恭喜完成Module 13 - 前沿因果推断方法!**

你已经掌握了从诺贝尔奖得主到最新前沿的因果推断武器库!


因果推断:从认知革命到政策实践的完整旅程!

基于 MIT 许可证发布。内容版权归作者所有。