14.3 因果发现与前沿方法
从"给定因果图"到"学习因果图":因果推断的下一个前沿
本节目标
- 了解因果发现算法的基本原理(PC, FCI, GES)
- 掌握Shift-Share IV(Bartik工具变量)方法
- 理解现代Staggered DID的最新进展
- 了解DoWhy因果推断框架
- 认识LLM/AI在因果推断中的新兴应用
专题1:因果发现算法
从"假设因果图"到"学习因果图"
在前面所有章节中,我们的因果推断都基于一个前提:因果图(DAG)由研究者根据领域知识假设给定。但在很多实际场景中,尤其是高维数据和复杂系统中,研究者可能不知道变量之间的因果关系方向。
因果发现(Causal Discovery)的目标:从观察数据中学习因果结构。
PC算法(Peter-Clark)
核心思想:通过条件独立性检验逐步消除因果图中的边。
步骤:
- 从完全图开始(所有变量两两相连)
- 检验无条件独立性:如果,删除的边
- 检验条件独立性:如果,删除的边
- 确定边的方向(基于v-structure)
假设:
- 因果马尔可夫条件:每个变量条件于其父节点,独立于非后代
- 忠实性(Faithfulness):所有条件独立性都由因果结构产生
- 因果充分性:无隐变量
FCI算法(Fast Causal Inference)
扩展:允许隐变量(latent confounders)存在
输出:不是DAG,而是PAG(Partial Ancestral Graph),表示因果关系的等价类
GES算法(Greedy Equivalence Search)
核心思想:在等价类空间中贪心搜索,优化BIC得分
优势:基于得分(score-based),比基于约束(constraint-based)的PC更稳定
Python实现:因果发现
from causallearn.search.ConstraintBased.PC import pc
from causallearn.search.ScoreBased.GES import ges
from causallearn.utils.cit import fisherz
import numpy as np
# 生成数据:X -> Y -> Z, X -> Z (直接效应)
np.random.seed(42)
n = 1000
X = np.random.randn(n)
Y = 0.5 * X + np.random.randn(n) * 0.5
Z = 0.3 * X + 0.7 * Y + np.random.randn(n) * 0.5
data = np.column_stack([X, Y, Z])
# PC算法
cg = pc(data, alpha=0.05, indep_test=fisherz)
print("PC算法发现的因果图:")
print(cg.G) # 邻接矩阵
# GES算法
Record = ges(data, score_func='local_score_BIC')
print("\nGES算法发现的因果图:")
print(Record['G'])DoWhy因果推断框架
微软的DoWhy提供了从因果建模到推断的完整流程:
import dowhy
from dowhy import CausalModel
# 定义因果模型
model = CausalModel(
data=df,
treatment='D',
outcome='Y',
common_causes=['X1', 'X2', 'X3'],
instruments=['Z']
)
# 识别因果效应
identified_estimand = model.identify_effect()
# 估计因果效应
estimate = model.estimate_effect(
identified_estimand,
method_name="backdoor.linear_regression"
)
# 反驳检验
refutation = model.refute_estimate(
identified_estimand,
estimate,
method_name="random_common_cause"
)
print(estimate)
print(refutation)DoWhy的四步流程:
- Model:建立因果模型(DAG)
- Identify:识别因果效应(后门准则、前门准则、IV)
- Estimate:选择估计方法
- Refute:反驳检验(安慰剂、随机混淆等)
专题2:Shift-Share工具变量
Bartik工具变量
Bartik (1991) 提出的结构:
其中:
- : 地区在行业的初始份额(share)
- : 行业的全国增长率(shift)
例子: 中国加入WTO对美国地区劳动市场的影响
Autor et al. (2013):
- : 1990年地区在行业的就业份额
- : 该行业进口增长
识别条件
Borusyak et al. (2022) 的新理论:
关键假设:
含义: 条件于份额,增长率外生。
vs传统: 不需要每个都外生!
Python示例
def bartik_iv(data, shares, shocks):
"""
构造Bartik工具变量
参数:
------
data: DataFrame, 包含地区-行业数据
shares: str, 份额变量名
shocks: str, 冲击变量名
返回:
------
Z: Bartik IV
"""
# 假设data有列: region, industry, share_0, shock
Z = data.groupby('region').apply(
lambda x: (x[shares] * x[shocks]).sum()
)
return Z
# 模拟示例
regions = 100
industries = 10
bartik_data = []
for i in range(regions):
for k in range(industries):
share = np.random.dirichlet(np.ones(industries))[k] # 份额
shock = np.random.normal(0, 1) # 全国冲击
bartik_data.append({
'region': i,
'industry': k,
'share_0': share,
'shock': shock
})
df_bartik = pd.DataFrame(bartik_data)
# 构造IV
Z_bartik = bartik_iv(df_bartik, 'share_0', 'shock')
print(f"Bartik IV均值: {Z_bartik.mean():.3f}")
print(f"Bartik IV标准差: {Z_bartik.std():.3f}")专题3:Staggered DID的革命
问题:传统TWFE的偏误
**经典2WFE (Two-Way Fixed Effects)**模型:
假设: 所有单位在同一时间接受处理
现实: 交错采用(Staggered Adoption)
示例:州级政策
单位 2015 2016 2017 2018 2019
州A 0 1 1 1 1 (2016采用)
州B 0 0 1 1 1 (2017采用)
州C 0 0 0 1 1 (2018采用)
州D 0 0 0 0 0 (从不采用)Goodman-Bacon分解(2021)
发现: TWFE估计量是多个2x2 DID的加权平均:
问题: 权重可能为负!
三类比较:
1. 早vs晚(Timing Groups):
权重
2. 处理vs从不处理:
权重
3. 禁止比较(Forbidden Comparison):
危险: 如果效应随时间变化,组充当组的"坏对照"!
结果: TWFE可能有负权重,估计有偏甚至错符号!
Python实现:Bacon Decomposition
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']
def bacon_decomposition(data, outcome, treatment, unit_id, time_id):
"""
Goodman-Bacon分解
参数:
------
data: DataFrame
outcome: 结果变量名
treatment: 处理变量名(0/1)
unit_id: 单位ID
time_id: 时间ID
返回:
------
decomp: DataFrame, 包含各2x2 DID及权重
"""
df = data.copy()
# 识别采用时间
treated_units = df[df[treatment]==1][unit_id].unique()
adoption_times = {}
for unit in treated_units:
unit_data = df[df[unit_id]==unit]
first_treated = unit_data[unit_data[treatment]==1][time_id].min()
adoption_times[unit] = first_treated
# 分组
timing_groups = {}
for unit, time in adoption_times.items():
if time not in timing_groups:
timing_groups[time] = []
timing_groups[time].append(unit)
# 从不处理组
never_treated = df.groupby(unit_id)[treatment].sum()
never_treated = never_treated[never_treated == 0].index.tolist()
# 计算所有2x2 DID
comparisons = []
# 处理组 vs 从不处理
for g_time, g_units in timing_groups.items():
if len(never_treated) > 0:
pre = df[df[time_id] < g_time]
post = df[df[time_id] >= g_time]
treated_pre = pre[pre[unit_id].isin(g_units)][outcome].mean()
treated_post = post[post[unit_id].isin(g_units)][outcome].mean()
control_pre = pre[pre[unit_id].isin(never_treated)][outcome].mean()
control_post = post[post[unit_id].isin(never_treated)][outcome].mean()
dd = (treated_post - treated_pre) - (control_post - control_pre)
n_treated = len(g_units)
n_control = len(never_treated)
weight = n_treated * n_control / ((n_treated + n_control) * len(df))
comparisons.append({
'Type': '处理 vs 从不处理',
'Treated_Group': f'G{g_time}',
'Control_Group': '从不',
'DD_Estimate': dd,
'Weight': weight
})
# 早 vs 晚
sorted_times = sorted(timing_groups.keys())
for i, t1 in enumerate(sorted_times):
for t2 in sorted_times[i+1:]:
early_units = timing_groups[t1]
late_units = timing_groups[t2]
pre = df[df[time_id] < t2]
post = df[df[time_id] >= t2]
early_pre = pre[(pre[unit_id].isin(early_units)) & (pre[time_id] >= t1)][outcome].mean()
early_post = post[post[unit_id].isin(early_units)][outcome].mean()
late_pre = pre[pre[unit_id].isin(late_units)][outcome].mean()
late_post = post[post[unit_id].isin(late_units)][outcome].mean()
dd = (late_post - late_pre) - (early_post - early_pre)
weight = len(early_units) * len(late_units) / (len(df) * (len(early_units) + len(late_units)))
comparisons.append({
'Type': '早 vs 晚',
'Treated_Group': f'G{t2}',
'Control_Group': f'G{t1}',
'DD_Estimate': dd,
'Weight': weight
})
decomp = pd.DataFrame(comparisons)
# TWFE估计
twfe_estimate = (decomp['DD_Estimate'] * decomp['Weight']).sum() / decomp['Weight'].sum()
return decomp, twfe_estimate
# 模拟数据
np.random.seed(42)
units = 50
time_periods = 20
data_list = []
for i in range(units):
if i < 10:
adopt_time = 10 # 早期
elif i < 25:
adopt_time = 15 # 晚期
else:
adopt_time = 999 # 从不
for t in range(time_periods):
treated = 1 if t >= adopt_time else 0
if adopt_time == 10:
tau = 5
elif adopt_time == 15:
tau = 3
else:
tau = 0
y = 50 + 2*t + 10*i + tau*treated + np.random.normal(0, 5)
data_list.append({
'unit': i,
'time': t,
'treated': treated,
'y': y
})
df_stag = pd.DataFrame(data_list)
# Bacon分解
decomp, twfe_est = bacon_decomposition(df_stag, 'y', 'treated', 'unit', 'time')
print("Bacon分解结果:")
print(decomp)
print(f"\nTWFE估计: {twfe_est:.3f}")
# 可视化
fig, ax = plt.subplots(figsize=(10, 6))
colors = {'处理 vs 从不处理': 'blue', '早 vs 晚': 'red'}
for comp_type in decomp['Type'].unique():
subset = decomp[decomp['Type'] == comp_type]
ax.scatter(subset['DD_Estimate'], subset['Weight'],
s=100, alpha=0.6, label=comp_type, color=colors[comp_type])
ax.axvline(x=0, color='black', linestyle='--', alpha=0.5)
ax.axvline(x=twfe_est, color='green', linestyle='-', linewidth=2, label=f'TWFE = {twfe_est:.2f}')
ax.set_xlabel('2x2 DID估计', fontsize=12)
ax.set_ylabel('权重', fontsize=12)
ax.set_title('Goodman-Bacon分解', fontsize=14, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()解决方案:Callaway & Sant'Anna (2021)
核心思想: 计算群组-时间ATT,然后聚合
群组(Cohort): 同一时间采用处理的单位集合
ATT(g,t): 群组在时间的ATT
其中是对照组(从不处理或尚未处理)。
聚合:
整体ATT:
动态效应:
其中是事件时间(相对于采用)。
Sun & Abraham (2021):交互加权估计
核心思想: 用交互固定效应控制异质性
模型:
优势: 可用标准回归软件
专题4:外部效度与SUTVA
内部效度vs外部效度
内部效度(Internal Validity):
"估计的因果效应在研究样本中是否无偏?"
外部效度(External Validity):
"估计的因果效应能否推广到其他人群/情境?"
SUTVA违反
SUTVA (Stable Unit Treatment Value Assumption):
假设1: 无干扰
个体的结果只依赖自己的处理状态。
违反场景:
1. 溢出效应(Spillover):
- 处理邻居的农民也受益(知识传播)
- 疫苗接种的群体免疫
2. 一般均衡效应(General Equilibrium):
- 培训增加劳动供给 → 工资下降
- 所有人采用技术 → 相对优势消失
示例:疫苗的网络效应
import networkx as nx
# 构造社交网络
G = nx.watts_strogatz_graph(100, 4, 0.1)
# 模拟SUTVA违反
np.random.seed(42)
D = np.random.binomial(1, 0.3, 100) # 30%接种
# 直接效应
tau_direct = 0.8 # 降低80%感染风险
# 间接效应(邻居接种保护)
tau_indirect = 0.3 # 每个邻居接种降低30%
# 结果
Y = []
for i in range(100):
p_infect = 0.5
if D[i] == 1:
p_infect *= (1 - tau_direct)
neighbors = list(G.neighbors(i))
vaccinated_neighbors = sum(D[j] for j in neighbors)
p_infect *= (1 - tau_indirect) ** vaccinated_neighbors
y = np.random.binomial(1, p_infect)
Y.append(y)
# 估计
naive_ate = np.mean([Y[i] for i in range(100) if D[i]==1]) - \
np.mean([Y[i] for i in range(100) if D[i]==0])
print(f"简单ATE估计: {naive_ate:.3f}")
print(f"真实直接效应: -{tau_direct:.3f}")
print("注意: 简单估计混合了直接和间接效应!")缓解策略
1. 聚类随机化: 整个集群一起处理
2. 网络建模: 明确建模溢出效应
3. 谨慎推广: 报告LATE而非ATE
专题5:LLM/AI在因果推断中的应用
新兴方向
1. 因果图自动构建
- LLM基于领域文献建议DAG结构
- 结合专家知识和数据驱动方法
2. 变量选择辅助
- LLM帮助识别潜在的混淆变量
- 基于文献综述自动推荐控制变量
3. 研究设计评估
- LLM评估研究设计的识别策略
- 自动检查假设合理性
4. 但需要谨慎
- LLM基于相关性训练,不理解因果
- 可能"幻觉"出不存在的因果关系
- 仍需研究者进行最终判断
本节小结
核心要点
- 因果发现:PC、FCI、GES等算法可以从数据中学习因果结构
- Shift-Share IV:Bartik工具变量利用"份额x冲击"的结构进行识别
- Staggered DID:传统TWFE在交错采用下可能有偏,需用C&S或Sun-Abraham方法
- SUTVA违反:溢出效应和一般均衡效应是因果推断的重要威胁
- DoWhy框架:提供从建模到反驳的完整因果推断流程
- LLM辅助:可以辅助因果推断,但不能替代严格的识别策略