Skip to content

12.3 异质性处理效应的估计与推断

"Treatment effect heterogeneity is the rule, not the exception.""处理效应异质性是常态,而非例外。"— Guido Imbens, 2021 Nobel Laureate in Economics (2021年诺贝尔经济学奖得主)

从CATE到政策学习:量化异质性的价值

难度应用导向


本节目标

完成本节后,你将能够:

  • 估计和解释条件平均处理效应(CATE)
  • 构造CATE的置信区间并进行假设检验
  • 识别最受益和最不受益的子群体
  • 实现政策学习(Policy Learning)
  • 量化个性化决策的价值
  • 理解异质性的经济学含义

条件平均处理效应(CATE)的估计

定义回顾

CATE:给定协变量 的条件下的平均处理效应

为什么重要?

  • ATE () 只告诉我们平均效应
  • CATE () 告诉我们受益最多

估计方法

方法1:Causal Forest直接估计

python
from econml.grf import CausalForest
import numpy as np
import pandas as pd

# 模拟数据(与12.2节相同)
np.random.seed(42)
n = 3000

# 协变量
age = np.random.uniform(18, 65, n)
income = np.random.uniform(20, 150, n)  # 单位:千美元
education = np.random.choice([12, 14, 16, 18], n)  # 教育年限

X = np.column_stack([age, income, education])
X_df = pd.DataFrame(X, columns=['age', 'income', 'education'])

# 处理分配(随机化)
propensity = 0.5
D = np.random.binomial(1, propensity, n)

# 真实的异质性处理效应
# 年轻人和高收入者受益更多
true_tau = (
    10 +  # 基础效应
    0.5 * (65 - age) +  # 年轻人受益多
    0.3 * income -  # 高收入者受益多
    0.5 * education  # 高学历者受益少(已经很好了)
)

# 结果变量
Y0 = 50 + 0.8 * age + 0.5 * income + 2 * education + np.random.normal(0, 5, n)
Y1 = Y0 + true_tau
Y = D * Y1 + (1 - D) * Y0

# 训练Causal Forest
cf = CausalForest(
    n_estimators=3000,
    min_samples_leaf=15,
    max_depth=None,
    random_state=42,
    verbose=0
)

cf.fit(X, D.reshape(-1, 1), Y)

# 预测CATE
tau_pred = cf.predict(X)

print("=" * 70)
print("CATE估计结果")
print("=" * 70)
print(f"样本量: {n}")
print(f"真实ATE: {true_tau.mean():.2f}")
print(f"估计ATE: {tau_pred.mean():.2f}")
print(f"\nCATE预测R²: {r2_score(true_tau, tau_pred):.3f}")
print(f"CATE预测MSE: {mean_squared_error(true_tau, tau_pred):.3f}")

# 按特征分组分析
X_df['true_tau'] = true_tau
X_df['pred_tau'] = tau_pred
X_df['D'] = D
X_df['Y'] = Y

# 年龄分组
X_df['age_group'] = pd.cut(age, bins=[18, 30, 45, 65], labels=['青年', '中年', '老年'])
age_effects = X_df.groupby('age_group')[['true_tau', 'pred_tau']].mean()

print("\n按年龄组的处理效应:")
print(age_effects.round(2))

# 收入分组
X_df['income_group'] = pd.cut(income, bins=[20, 50, 100, 150], labels=['低收入', '中收入', '高收入'])
income_effects = X_df.groupby('income_group')[['true_tau', 'pred_tau']].mean()

print("\n按收入组的处理效应:")
print(income_effects.round(2))

可视化CATE

python
import matplotlib.pyplot as plt
import seaborn as sns

fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# 1. 真实 vs 预测
axes[0, 0].scatter(true_tau, tau_pred, alpha=0.3, s=20)
axes[0, 0].plot([true_tau.min(), true_tau.max()],
                [true_tau.min(), true_tau.max()],
                'r--', linewidth=2)
axes[0, 0].set_xlabel('真实CATE', fontsize=12)
axes[0, 0].set_ylabel('预测CATE', fontsize=12)
axes[0, 0].set_title(f'CATE预测准确性 (R² = {r2_score(true_tau, tau_pred):.3f})',
                     fontsize=13, fontweight='bold')
axes[0, 0].grid(True, alpha=0.3)

# 2. 按年龄的CATE
age_sorted = np.argsort(age)
axes[0, 1].scatter(age, tau_pred, alpha=0.3, s=15, label='预测CATE')
axes[0, 1].scatter(age, true_tau, alpha=0.2, s=10, color='green', label='真实CATE')
axes[0, 1].set_xlabel('年龄', fontsize=12)
axes[0, 1].set_ylabel('处理效应', fontsize=12)
axes[0, 1].set_title('年龄与处理效应', fontsize=13, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 3. 按收入的CATE
axes[1, 0].scatter(income, tau_pred, alpha=0.3, s=15, label='预测CATE')
axes[1, 0].scatter(income, true_tau, alpha=0.2, s=10, color='green', label='真实CATE')
axes[1, 0].set_xlabel('收入(千美元)', fontsize=12)
axes[1, 0].set_ylabel('处理效应', fontsize=12)
axes[1, 0].set_title('收入与处理效应', fontsize=13, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# 4. CATE分布
axes[1, 1].hist(true_tau, bins=50, alpha=0.5, label='真实CATE', color='green', density=True)
axes[1, 1].hist(tau_pred, bins=50, alpha=0.5, label='预测CATE', color='blue', density=True)
axes[1, 1].axvline(x=true_tau.mean(), color='green', linestyle='--', linewidth=2, label=f'真实ATE = {true_tau.mean():.1f}')
axes[1, 1].axvline(x=tau_pred.mean(), color='blue', linestyle='--', linewidth=2, label=f'估计ATE = {tau_pred.mean():.1f}')
axes[1, 1].set_xlabel('处理效应', fontsize=12)
axes[1, 1].set_ylabel('密度', fontsize=12)
axes[1, 1].set_title('CATE分布', fontsize=13, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

统计推断:置信区间与假设检验

置信区间的构造

问题 的不确定性有多大?

Wager & Athey (2018) 的方法

1. 基于渐近正态性的置信区间

python
# 预测CATE及其置信区间
tau_pred = cf.predict(X)
tau_interval = cf.predict_interval(X, alpha=0.05)  # 95% CI

# 计算置信区间宽度
ci_width = tau_interval[1] - tau_interval[0]

print("=" * 70)
print("CATE置信区间")
print("=" * 70)
print(f"平均置信区间宽度: {ci_width.mean():.3f}")
print(f"置信区间宽度范围: [{ci_width.min():.3f}, {ci_width.max():.3f}]")

# 检验覆盖率
coverage = np.mean(
    (true_tau >= tau_interval[0]) & (true_tau <= tau_interval[1])
)
print(f"\n实际覆盖率: {coverage:.1%}(理论值应接近95%)")

# 可视化置信区间
sorted_idx = np.argsort(tau_pred)[:200]  # 显示200个样本

fig, ax = plt.subplots(figsize=(14, 6))
x_plot = np.arange(len(sorted_idx))

ax.scatter(x_plot, true_tau[sorted_idx], s=30, color='green',
           alpha=0.6, label='真实CATE', zorder=3)
ax.plot(x_plot, tau_pred[sorted_idx], 'b-', linewidth=1.5,
        label='预测CATE', zorder=2)
ax.fill_between(x_plot,
                tau_interval[0][sorted_idx],
                tau_interval[1][sorted_idx],
                alpha=0.25, color='blue', label='95% CI', zorder=1)

ax.set_xlabel('样本(按预测CATE排序)', fontsize=12)
ax.set_ylabel('处理效应', fontsize=12)
ax.set_title('CATE估计与置信区间', fontsize=14, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

假设检验

检验1:处理效应是否为零?

零假设

检验统计量

拒绝域(5%显著性水平)

python
# 计算标准误
tau_se = (tau_interval[1] - tau_interval[0]) / (2 * 1.96)

# 计算z统计量
z_stat = tau_pred / tau_se

# p值
from scipy.stats import norm
p_values = 2 * (1 - norm.cdf(np.abs(z_stat)))

# 显著性
significant = p_values < 0.05

print("=" * 70)
print("假设检验:H₀: τ(x) = 0")
print("=" * 70)
print(f"显著的CATE比例: {significant.mean():.1%}")
print(f"平均|z统计量|: {np.abs(z_stat).mean():.2f}")

# 按显著性分组
X_df['significant'] = significant
X_df['z_stat'] = z_stat
X_df['p_value'] = p_values

print("\n显著 vs 不显著的CATE:")
print(X_df.groupby('significant')['pred_tau'].describe()[['mean', 'std', 'min', 'max']])

检验2:不同子群体的处理效应是否不同?

问题:年轻人和老年人的处理效应有显著差异吗?

零假设

python
# 定义子群体
young = age <= 30
old = age >= 50

# 估计各子群体的ATE
tau_young = tau_pred[young]
tau_old = tau_pred[old]

# 计算差异
diff_mean = tau_young.mean() - tau_old.mean()

# Bootstrap标准误(简化方法)
n_boot = 1000
boot_diffs = []

for _ in range(n_boot):
    idx_young = np.random.choice(np.where(young)[0], size=young.sum(), replace=True)
    idx_old = np.random.choice(np.where(old)[0], size=old.sum(), replace=True)

    boot_diff = tau_pred[idx_young].mean() - tau_pred[idx_old].mean()
    boot_diffs.append(boot_diff)

boot_diffs = np.array(boot_diffs)
se_diff = boot_diffs.std()

# 检验统计量
z_stat_diff = diff_mean / se_diff
p_value_diff = 2 * (1 - norm.cdf(abs(z_stat_diff)))

print("\n" + "=" * 70)
print("子群体差异检验:年轻人 vs 老年人")
print("=" * 70)
print(f"年轻人平均CATE: {tau_young.mean():.2f}")
print(f"老年人平均CATE: {tau_old.mean():.2f}")
print(f"差异: {diff_mean:.2f}")
print(f"标准误: {se_diff:.2f}")
print(f"z统计量: {z_stat_diff:.2f}")
print(f"p值: {p_value_diff:.4f}")

if p_value_diff < 0.05:
    print("\n 结论:年轻人和老年人的处理效应有显著差异")
else:
    print("\n 结论:无法拒绝两组处理效应相同的假设")

识别受益最多和最少的子群体

Best Linear Projection (BLP)

Chernozhukov et al. (2018) 提出的方法:将 投影到协变量上

回归模型

解释

  • :特征 与处理效应的相关性
  • 越大,处理效应越大
python
from sklearn.linear_model import LinearRegression

# BLP回归
blp_model = LinearRegression()
blp_model.fit(X, tau_pred)

# 系数
blp_coefs = pd.DataFrame({
    '特征': ['age', 'income', 'education'],
    '系数': blp_model.coef_,
    '解释': ['年龄增加1岁,处理效应变化', '收入增加1千美元,处理效应变化', '教育增加1年,处理效应变化']
})

print("=" * 70)
print("Best Linear Projection (BLP)")
print("=" * 70)
print(blp_coefs.to_string(index=False))

# 预测BLP
tau_blp = blp_model.predict(X)
blp_r2 = r2_score(tau_pred, tau_blp)
print(f"\nBLP R²: {blp_r2:.3f}(特征能解释 {blp_r2:.1%} 的CATE变异)")

# 可视化
fig, ax = plt.subplots(figsize=(10, 6))
ax.scatter(tau_blp, tau_pred, alpha=0.3, s=15)
ax.plot([tau_blp.min(), tau_blp.max()],
        [tau_blp.min(), tau_blp.max()],
        'r--', linewidth=2)
ax.set_xlabel('BLP预测的CATE', fontsize=12)
ax.set_ylabel('Causal Forest估计的CATE', fontsize=12)
ax.set_title(f'Best Linear Projection (R² = {blp_r2:.3f})', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

识别Top/Bottom受益者

python
# 按CATE排序
X_df_sorted = X_df.sort_values('pred_tau', ascending=False)

# Top 10%
top_10 = X_df_sorted.head(int(0.1 * n))
print("=" * 70)
print("受益最多的10%人群特征")
print("=" * 70)
print(top_10[['age', 'income', 'education', 'pred_tau']].describe())

# Bottom 10%
bottom_10 = X_df_sorted.tail(int(0.1 * n))
print("\n" + "=" * 70)
print("受益最少的10%人群特征")
print("=" * 70)
print(bottom_10[['age', 'income', 'education', 'pred_tau']].describe())

# 对比可视化
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

features = ['age', 'income', 'education']
labels = ['年龄', '收入(千美元)', '教育年限']

for idx, (feat, label) in enumerate(zip(features, labels)):
    ax = axes[idx]

    ax.hist(top_10[feat], bins=20, alpha=0.5, label='Top 10%', color='green', density=True)
    ax.hist(bottom_10[feat], bins=20, alpha=0.5, label='Bottom 10%', color='red', density=True)

    ax.set_xlabel(label, fontsize=12)
    ax.set_ylabel('密度', fontsize=12)
    ax.set_title(f'{label}分布', fontsize=13, fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

政策学习(Policy Learning)

问题设定

决策问题:给定个体特征 ,是否应该给予处理?

目标:最大化总福利

其中 是政策函数。

最优政策

或者考虑成本

实现:基于CATE的最优政策

python
# 场景:培训项目
# 成本:每人$5,000
# 收益:CATE(收入增加)

cost = 5.0  # 单位:千美元

# 最优政策:只给CATE > cost的人培训
optimal_policy = (tau_pred > cost).astype(int)

# 当前政策:给所有人培训(或随机50%)
current_policy_all = np.ones(n, dtype=int)
current_policy_random = D

print("=" * 70)
print("政策学习:培训项目")
print("=" * 70)
print(f"成本: ${cost * 1000} 每人")

# 计算各政策的价值
def policy_value(policy, tau, cost):
    """计算政策的净收益"""
    # 收益 = 处理效应 - 成本
    benefit = tau * policy
    total_cost = cost * policy
    net_benefit = benefit - total_cost
    return net_benefit.mean()

# 计算价值
value_optimal = policy_value(optimal_policy, true_tau, cost)
value_all = policy_value(current_policy_all, true_tau, cost)
value_random = policy_value(current_policy_random, true_tau, cost)

print(f"\n最优政策价值: ${value_optimal * 1000:.0f} 每人")
print(f"全员培训价值: ${value_all * 1000:.0f} 每人")
print(f"随机50%价值: ${value_random * 1000:.0f} 每人")

# 提升幅度
improvement_vs_all = (value_optimal - value_all) / abs(value_all) * 100
improvement_vs_random = (value_optimal - value_random) / abs(value_random) * 100

print(f"\n相比全员培训提升: {improvement_vs_all:.1f}%")
print(f"相比随机50%提升: {improvement_vs_random:.1f}%")

# 处理率
treatment_rate_optimal = optimal_policy.mean()
print(f"\n最优政策处理率: {treatment_rate_optimal:.1%}")
print(f"当前政策处理率: 100%")

# 成本节约
cost_saving = (1 - treatment_rate_optimal) * cost * n
print(f"\n成本节约: ${cost_saving * 1000:,.0f} (总计)")

可视化政策规则

python
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 左图:CATE分布与决策阈值
axes[0].hist(tau_pred, bins=50, alpha=0.6, edgecolor='black')
axes[0].axvline(x=cost, color='red', linestyle='--', linewidth=3,
                label=f'决策阈值 (成本 = ${cost * 1000})')
axes[0].axvline(x=tau_pred.mean(), color='blue', linestyle='--', linewidth=2,
                label=f'平均CATE = ${tau_pred.mean() * 1000:.0f}')
axes[0].set_xlabel('预测CATE(千美元)', fontsize=12)
axes[0].set_ylabel('频数', fontsize=12)
axes[0].set_title('CATE分布与政策决策', fontsize=13, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3, axis='y')

# 标注区域
ylim = axes[0].get_ylim()
axes[0].fill_betweenx(ylim, cost, tau_pred.max(), alpha=0.2, color='green', label='接受处理')
axes[0].fill_betweenx(ylim, tau_pred.min(), cost, alpha=0.2, color='red', label='不接受处理')
axes[0].set_ylim(ylim)

# 右图:按年龄的最优政策
age_bins = np.linspace(18, 65, 20)
policy_by_age = []

for i in range(len(age_bins) - 1):
    mask = (age >= age_bins[i]) & (age < age_bins[i+1])
    policy_by_age.append(optimal_policy[mask].mean())

axes[1].bar(age_bins[:-1], policy_by_age, width=np.diff(age_bins),
            alpha=0.7, edgecolor='black', align='edge')
axes[1].axhline(y=0.5, color='red', linestyle='--', linewidth=2, label='随机政策')
axes[1].set_xlabel('年龄', fontsize=12)
axes[1].set_ylabel('处理率', fontsize=12)
axes[1].set_title('最优政策:按年龄的处理率', fontsize=13, fontweight='bold')
axes[1].set_ylim([0, 1])
axes[1].legend()
axes[1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

量化异质性的价值

异质性的价值(Value of Heterogeneity)

定义:个性化政策相比平均政策的额外收益

公式

  • 第一项:个性化决策的价值
  • 第二项:基于ATE的统一决策价值
python
# 计算异质性的价值
# 个性化政策
individualized_benefit = np.maximum(true_tau - cost, 0).mean()

# 基于ATE的政策
ate = true_tau.mean()
uniform_benefit = max(ate - cost, 0)

# 异质性价值
heterogeneity_value = individualized_benefit - uniform_benefit

print("=" * 70)
print("异质性的价值")
print("=" * 70)
print(f"ATE: ${ate * 1000:.0f}")
print(f"成本: ${cost * 1000:.0f}")
print(f"\n基于ATE的统一政策:")

if ate > cost:
    print(f"  决策:给所有人处理")
    print(f"  净收益:${uniform_benefit * 1000:.0f} 每人")
else:
    print(f"  决策:不给任何人处理")
    print(f"  净收益:$0 每人")

print(f"\n个性化政策:")
print(f"  平均净收益:${individualized_benefit * 1000:.0f} 每人")

print(f"\n异质性的价值:${heterogeneity_value * 1000:.0f} 每人")
print(f"总体价值提升:${heterogeneity_value * n * 1000:,.0f}")

# 相对提升
if uniform_benefit > 0:
    relative_gain = (heterogeneity_value / uniform_benefit) * 100
    print(f"相对提升:{relative_gain:.1f}%")

社会福利分析

python
# 不同成本水平下的政策分析
costs = np.linspace(0, 30, 50)

values_optimal = []
values_uniform = []
treatment_rates = []

for c in costs:
    # 最优政策
    opt_policy = (tau_pred > c).astype(int)
    val_opt = policy_value(opt_policy, true_tau, c)
    values_optimal.append(val_opt)

    # 统一政策
    if ate > c:
        val_uni = ate - c
    else:
        val_uni = 0
    values_uniform.append(val_uni)

    # 处理率
    treatment_rates.append(opt_policy.mean())

# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 左图:价值曲线
axes[0].plot(costs, np.array(values_optimal) * 1000, 'b-', linewidth=2.5,
             label='个性化政策', marker='o', markersize=4)
axes[0].plot(costs, np.array(values_uniform) * 1000, 'r--', linewidth=2.5,
             label='统一政策(基于ATE)', marker='s', markersize=4)
axes[0].fill_between(costs,
                     np.array(values_uniform) * 1000,
                     np.array(values_optimal) * 1000,
                     alpha=0.3, color='green', label='异质性价值')
axes[0].axvline(x=ate, color='gray', linestyle=':', linewidth=2, label=f'ATE = ${ate * 1000:.0f}')
axes[0].set_xlabel('成本(千美元)', fontsize=12)
axes[0].set_ylabel('净收益(美元/人)', fontsize=12)
axes[0].set_title('政策价值随成本变化', fontsize=13, fontweight='bold')
axes[0].legend(fontsize=10)
axes[0].grid(True, alpha=0.3)

# 右图:处理率
axes[1].plot(costs, np.array(treatment_rates) * 100, 'g-', linewidth=2.5,
             marker='o', markersize=4)
axes[1].axhline(y=100, color='red', linestyle='--', linewidth=2, label='统一政策(100%)')
axes[1].axvline(x=ate, color='gray', linestyle=':', linewidth=2, label=f'ATE = ${ate * 1000:.0f}')
axes[1].set_xlabel('成本(千美元)', fontsize=12)
axes[1].set_ylabel('处理率 (%)', fontsize=12)
axes[1].set_title('最优处理率随成本变化', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

高级话题:条件政策学习

Athey & Wager (2021):带约束的政策学习

问题:预算有限,只能处理 的人

约束优化

:选择CATE最大的 的人

python
# 预算约束:只能处理30%的人
budget_rate = 0.30

# 按CATE排序,选择Top 30%
k = int(budget_rate * n)
top_k_idx = np.argsort(tau_pred)[-k:]

budget_policy = np.zeros(n, dtype=int)
budget_policy[top_k_idx] = 1

# 计算价值
value_budget_optimal = policy_value(budget_policy, true_tau, cost)
value_budget_random = policy_value((D * budget_rate).astype(int), true_tau, cost)

print("=" * 70)
print(f"预算约束政策(处理率 = {budget_rate:.0%})")
print("=" * 70)
print(f"最优选择价值: ${value_budget_optimal * 1000:.0f} 每人")
print(f"随机选择价值: ${value_budget_random * 1000:.0f} 每人")
print(f"提升: {(value_budget_optimal - value_budget_random) / abs(value_budget_random) * 100:.1f}%")

# 被选中人群的特征
selected = budget_policy == 1
print(f"\n被选中人群特征:")
print(X_df[selected][['age', 'income', 'education']].describe())

本节小结

核心要点

  1. CATE估计:

    • Causal Forest直接估计
    • 提供点估计和置信区间
    • 可以识别异质性的来源
  2. 统计推断:

    • 渐近正态性 → 置信区间
    • 假设检验:
    • BLP:理解哪些特征驱动异质性
  3. 政策学习:

    • 最优政策:
    • 量化个性化决策的价值
    • 预算约束下的最优分配
  4. 异质性的价值:

    • 个性化政策 vs 统一政策
    • 可以带来显著的福利提升
    • 对成本敏感

关键公式

CATE:

最优政策:

异质性价值:

Python工具

python
# CATE估计
from econml.grf import CausalForest
cf = CausalForest(n_estimators=3000)
cf.fit(X, T, Y)
tau = cf.predict(X)
tau_interval = cf.predict_interval(X, alpha=0.05)

# BLP
from sklearn.linear_model import LinearRegression
blp = LinearRegression().fit(X, tau)

# 政策学习
optimal_policy = (tau > cost).astype(int)

下一节预告

第4节:Python实现与经典案例中,我们将:

  • 使用真实数据集(401k计划、就业培训)
  • 完整实现Causal Forest工作流
  • 可视化和解释结果
  • 对比不同方法的性能

从估计到决策:让数据驱动个性化政策!

基于 MIT 许可证发布。内容版权归作者所有。