Skip to content

14.3 因果发现与前沿方法

从"给定因果图"到"学习因果图":因果推断的下一个前沿

难度综合性


本节目标

  • 了解因果发现算法的基本原理(PC, FCI, GES)
  • 掌握Shift-Share IV(Bartik工具变量)方法
  • 理解现代Staggered DID的最新进展
  • 了解DoWhy因果推断框架
  • 认识LLM/AI在因果推断中的新兴应用

专题1:因果发现算法

从"假设因果图"到"学习因果图"

在前面所有章节中,我们的因果推断都基于一个前提:因果图(DAG)由研究者根据领域知识假设给定。但在很多实际场景中,尤其是高维数据和复杂系统中,研究者可能不知道变量之间的因果关系方向。

因果发现(Causal Discovery)的目标:从观察数据中学习因果结构。

PC算法(Peter-Clark)

核心思想:通过条件独立性检验逐步消除因果图中的边。

步骤

  1. 从完全图开始(所有变量两两相连)
  2. 检验无条件独立性:如果,删除的边
  3. 检验条件独立性:如果,删除的边
  4. 确定边的方向(基于v-structure)

假设

  • 因果马尔可夫条件:每个变量条件于其父节点,独立于非后代
  • 忠实性(Faithfulness):所有条件独立性都由因果结构产生
  • 因果充分性:无隐变量

FCI算法(Fast Causal Inference)

扩展:允许隐变量(latent confounders)存在

输出:不是DAG,而是PAG(Partial Ancestral Graph),表示因果关系的等价类

核心思想:在等价类空间中贪心搜索,优化BIC得分

优势:基于得分(score-based),比基于约束(constraint-based)的PC更稳定

Python实现:因果发现

python
from causallearn.search.ConstraintBased.PC import pc
from causallearn.search.ScoreBased.GES import ges
from causallearn.utils.cit import fisherz
import numpy as np

# 生成数据:X -> Y -> Z, X -> Z (直接效应)
np.random.seed(42)
n = 1000

X = np.random.randn(n)
Y = 0.5 * X + np.random.randn(n) * 0.5
Z = 0.3 * X + 0.7 * Y + np.random.randn(n) * 0.5

data = np.column_stack([X, Y, Z])

# PC算法
cg = pc(data, alpha=0.05, indep_test=fisherz)
print("PC算法发现的因果图:")
print(cg.G)  # 邻接矩阵

# GES算法
Record = ges(data, score_func='local_score_BIC')
print("\nGES算法发现的因果图:")
print(Record['G'])

DoWhy因果推断框架

微软的DoWhy提供了从因果建模到推断的完整流程:

python
import dowhy
from dowhy import CausalModel

# 定义因果模型
model = CausalModel(
    data=df,
    treatment='D',
    outcome='Y',
    common_causes=['X1', 'X2', 'X3'],
    instruments=['Z']
)

# 识别因果效应
identified_estimand = model.identify_effect()

# 估计因果效应
estimate = model.estimate_effect(
    identified_estimand,
    method_name="backdoor.linear_regression"
)

# 反驳检验
refutation = model.refute_estimate(
    identified_estimand,
    estimate,
    method_name="random_common_cause"
)

print(estimate)
print(refutation)

DoWhy的四步流程

  1. Model:建立因果模型(DAG)
  2. Identify:识别因果效应(后门准则、前门准则、IV)
  3. Estimate:选择估计方法
  4. Refute:反驳检验(安慰剂、随机混淆等)

专题2:Shift-Share工具变量

Bartik工具变量

Bartik (1991) 提出的结构:

其中:

  • : 地区在行业初始份额(share)
  • : 行业全国增长率(shift)

例子: 中国加入WTO对美国地区劳动市场的影响

Autor et al. (2013):

  • : 1990年地区在行业的就业份额
  • : 该行业进口增长

识别条件

Borusyak et al. (2022) 的新理论:

关键假设:

含义: 条件于份额,增长率外生。

vs传统: 不需要每个都外生!

Python示例

python
def bartik_iv(data, shares, shocks):
    """
    构造Bartik工具变量

    参数:
    ------
    data: DataFrame, 包含地区-行业数据
    shares: str, 份额变量名
    shocks: str, 冲击变量名

    返回:
    ------
    Z: Bartik IV
    """
    # 假设data有列: region, industry, share_0, shock
    Z = data.groupby('region').apply(
        lambda x: (x[shares] * x[shocks]).sum()
    )

    return Z

# 模拟示例
regions = 100
industries = 10

bartik_data = []
for i in range(regions):
    for k in range(industries):
        share = np.random.dirichlet(np.ones(industries))[k]  # 份额
        shock = np.random.normal(0, 1)  # 全国冲击
        bartik_data.append({
            'region': i,
            'industry': k,
            'share_0': share,
            'shock': shock
        })

df_bartik = pd.DataFrame(bartik_data)

# 构造IV
Z_bartik = bartik_iv(df_bartik, 'share_0', 'shock')
print(f"Bartik IV均值: {Z_bartik.mean():.3f}")
print(f"Bartik IV标准差: {Z_bartik.std():.3f}")

专题3:Staggered DID的革命

问题:传统TWFE的偏误

**经典2WFE (Two-Way Fixed Effects)**模型:

假设: 所有单位在同一时间接受处理

现实: 交错采用(Staggered Adoption)

示例:州级政策
单位    2015  2016  2017  2018  2019
州A      0     1     1     1     1    (2016采用)
州B      0     0     1     1     1    (2017采用)
州C      0     0     0     1     1    (2018采用)
州D      0     0     0     0     0    (从不采用)

Goodman-Bacon分解(2021)

发现: TWFE估计量是多个2x2 DID的加权平均:

问题: 权重可能为!

三类比较:

1. 早vs晚(Timing Groups):

权重

2. 处理vs从不处理:

权重

3. 禁止比较(Forbidden Comparison):

危险: 如果效应随时间变化,组充当组的"坏对照"!

结果: TWFE可能有负权重,估计有偏甚至错符号!

Python实现:Bacon Decomposition

python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']

def bacon_decomposition(data, outcome, treatment, unit_id, time_id):
    """
    Goodman-Bacon分解

    参数:
    ------
    data: DataFrame
    outcome: 结果变量名
    treatment: 处理变量名(0/1)
    unit_id: 单位ID
    time_id: 时间ID

    返回:
    ------
    decomp: DataFrame, 包含各2x2 DID及权重
    """
    df = data.copy()

    # 识别采用时间
    treated_units = df[df[treatment]==1][unit_id].unique()
    adoption_times = {}
    for unit in treated_units:
        unit_data = df[df[unit_id]==unit]
        first_treated = unit_data[unit_data[treatment]==1][time_id].min()
        adoption_times[unit] = first_treated

    # 分组
    timing_groups = {}
    for unit, time in adoption_times.items():
        if time not in timing_groups:
            timing_groups[time] = []
        timing_groups[time].append(unit)

    # 从不处理组
    never_treated = df.groupby(unit_id)[treatment].sum()
    never_treated = never_treated[never_treated == 0].index.tolist()

    # 计算所有2x2 DID
    comparisons = []

    # 处理组 vs 从不处理
    for g_time, g_units in timing_groups.items():
        if len(never_treated) > 0:
            pre = df[df[time_id] < g_time]
            post = df[df[time_id] >= g_time]

            treated_pre = pre[pre[unit_id].isin(g_units)][outcome].mean()
            treated_post = post[post[unit_id].isin(g_units)][outcome].mean()

            control_pre = pre[pre[unit_id].isin(never_treated)][outcome].mean()
            control_post = post[post[unit_id].isin(never_treated)][outcome].mean()

            dd = (treated_post - treated_pre) - (control_post - control_pre)

            n_treated = len(g_units)
            n_control = len(never_treated)
            weight = n_treated * n_control / ((n_treated + n_control) * len(df))

            comparisons.append({
                'Type': '处理 vs 从不处理',
                'Treated_Group': f'G{g_time}',
                'Control_Group': '从不',
                'DD_Estimate': dd,
                'Weight': weight
            })

    # 早 vs 晚
    sorted_times = sorted(timing_groups.keys())
    for i, t1 in enumerate(sorted_times):
        for t2 in sorted_times[i+1:]:
            early_units = timing_groups[t1]
            late_units = timing_groups[t2]

            pre = df[df[time_id] < t2]
            post = df[df[time_id] >= t2]

            early_pre = pre[(pre[unit_id].isin(early_units)) & (pre[time_id] >= t1)][outcome].mean()
            early_post = post[post[unit_id].isin(early_units)][outcome].mean()

            late_pre = pre[pre[unit_id].isin(late_units)][outcome].mean()
            late_post = post[post[unit_id].isin(late_units)][outcome].mean()

            dd = (late_post - late_pre) - (early_post - early_pre)

            weight = len(early_units) * len(late_units) / (len(df) * (len(early_units) + len(late_units)))

            comparisons.append({
                'Type': '早 vs 晚',
                'Treated_Group': f'G{t2}',
                'Control_Group': f'G{t1}',
                'DD_Estimate': dd,
                'Weight': weight
            })

    decomp = pd.DataFrame(comparisons)

    # TWFE估计
    twfe_estimate = (decomp['DD_Estimate'] * decomp['Weight']).sum() / decomp['Weight'].sum()

    return decomp, twfe_estimate

# 模拟数据
np.random.seed(42)
units = 50
time_periods = 20

data_list = []
for i in range(units):
    if i < 10:
        adopt_time = 10  # 早期
    elif i < 25:
        adopt_time = 15  # 晚期
    else:
        adopt_time = 999  # 从不

    for t in range(time_periods):
        treated = 1 if t >= adopt_time else 0

        if adopt_time == 10:
            tau = 5
        elif adopt_time == 15:
            tau = 3
        else:
            tau = 0

        y = 50 + 2*t + 10*i + tau*treated + np.random.normal(0, 5)

        data_list.append({
            'unit': i,
            'time': t,
            'treated': treated,
            'y': y
        })

df_stag = pd.DataFrame(data_list)

# Bacon分解
decomp, twfe_est = bacon_decomposition(df_stag, 'y', 'treated', 'unit', 'time')

print("Bacon分解结果:")
print(decomp)
print(f"\nTWFE估计: {twfe_est:.3f}")

# 可视化
fig, ax = plt.subplots(figsize=(10, 6))
colors = {'处理 vs 从不处理': 'blue', '早 vs 晚': 'red'}
for comp_type in decomp['Type'].unique():
    subset = decomp[decomp['Type'] == comp_type]
    ax.scatter(subset['DD_Estimate'], subset['Weight'],
              s=100, alpha=0.6, label=comp_type, color=colors[comp_type])

ax.axvline(x=0, color='black', linestyle='--', alpha=0.5)
ax.axvline(x=twfe_est, color='green', linestyle='-', linewidth=2, label=f'TWFE = {twfe_est:.2f}')
ax.set_xlabel('2x2 DID估计', fontsize=12)
ax.set_ylabel('权重', fontsize=12)
ax.set_title('Goodman-Bacon分解', fontsize=14, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

解决方案:Callaway & Sant'Anna (2021)

核心思想: 计算群组-时间ATT,然后聚合

群组(Cohort): 同一时间采用处理的单位集合

ATT(g,t): 群组在时间的ATT

其中是对照组(从不处理或尚未处理)。

聚合:

整体ATT:

动态效应:

其中是事件时间(相对于采用)。

Sun & Abraham (2021):交互加权估计

核心思想: 用交互固定效应控制异质性

模型:

优势: 可用标准回归软件


专题4:外部效度与SUTVA

内部效度vs外部效度

内部效度(Internal Validity):

"估计的因果效应在研究样本中是否无偏?"

外部效度(External Validity):

"估计的因果效应能否推广到其他人群/情境?"

SUTVA违反

SUTVA (Stable Unit Treatment Value Assumption):

假设1: 无干扰

个体的结果只依赖自己的处理状态。

违反场景:

1. 溢出效应(Spillover):

  • 处理邻居的农民也受益(知识传播)
  • 疫苗接种的群体免疫

2. 一般均衡效应(General Equilibrium):

  • 培训增加劳动供给 → 工资下降
  • 所有人采用技术 → 相对优势消失

示例:疫苗的网络效应

python
import networkx as nx

# 构造社交网络
G = nx.watts_strogatz_graph(100, 4, 0.1)

# 模拟SUTVA违反
np.random.seed(42)
D = np.random.binomial(1, 0.3, 100)  # 30%接种

# 直接效应
tau_direct = 0.8  # 降低80%感染风险

# 间接效应(邻居接种保护)
tau_indirect = 0.3  # 每个邻居接种降低30%

# 结果
Y = []
for i in range(100):
    p_infect = 0.5

    if D[i] == 1:
        p_infect *= (1 - tau_direct)

    neighbors = list(G.neighbors(i))
    vaccinated_neighbors = sum(D[j] for j in neighbors)
    p_infect *= (1 - tau_indirect) ** vaccinated_neighbors

    y = np.random.binomial(1, p_infect)
    Y.append(y)

# 估计
naive_ate = np.mean([Y[i] for i in range(100) if D[i]==1]) - \
            np.mean([Y[i] for i in range(100) if D[i]==0])

print(f"简单ATE估计: {naive_ate:.3f}")
print(f"真实直接效应: -{tau_direct:.3f}")
print("注意: 简单估计混合了直接和间接效应!")

缓解策略

1. 聚类随机化: 整个集群一起处理

2. 网络建模: 明确建模溢出效应

3. 谨慎推广: 报告LATE而非ATE


专题5:LLM/AI在因果推断中的应用

新兴方向

1. 因果图自动构建

  • LLM基于领域文献建议DAG结构
  • 结合专家知识和数据驱动方法

2. 变量选择辅助

  • LLM帮助识别潜在的混淆变量
  • 基于文献综述自动推荐控制变量

3. 研究设计评估

  • LLM评估研究设计的识别策略
  • 自动检查假设合理性

4. 但需要谨慎

  • LLM基于相关性训练,不理解因果
  • 可能"幻觉"出不存在的因果关系
  • 仍需研究者进行最终判断

本节小结

核心要点

  1. 因果发现:PC、FCI、GES等算法可以从数据中学习因果结构
  2. Shift-Share IV:Bartik工具变量利用"份额x冲击"的结构进行识别
  3. Staggered DID:传统TWFE在交错采用下可能有偏,需用C&S或Sun-Abraham方法
  4. SUTVA违反:溢出效应和一般均衡效应是因果推断的重要威胁
  5. DoWhy框架:提供从建模到反驳的完整因果推断流程
  6. LLM辅助:可以辅助因果推断,但不能替代严格的识别策略

<< 上一节:14.2 Double Machine Learning | 下一节:14.4 本章小结与展望 >>

基于 MIT 许可证发布。内容版权归作者所有。