12.3 异质性处理效应的估计与推断
"Treatment effect heterogeneity is the rule, not the exception.""处理效应异质性是常态,而非例外。"— Guido Imbens, 2021 Nobel Laureate in Economics (2021年诺贝尔经济学奖得主)
从CATE到政策学习:量化异质性的价值
本节目标
完成本节后,你将能够:
- 估计和解释条件平均处理效应(CATE)
- 构造CATE的置信区间并进行假设检验
- 识别最受益和最不受益的子群体
- 实现政策学习(Policy Learning)
- 量化个性化决策的价值
- 理解异质性的经济学含义
条件平均处理效应(CATE)的估计
定义回顾
CATE:给定协变量 的条件下的平均处理效应
为什么重要?
- ATE () 只告诉我们平均效应
- CATE () 告诉我们谁受益最多
估计方法
方法1:Causal Forest直接估计
python
from econml.grf import CausalForest
import numpy as np
import pandas as pd
# 模拟数据(与12.2节相同)
np.random.seed(42)
n = 3000
# 协变量
age = np.random.uniform(18, 65, n)
income = np.random.uniform(20, 150, n) # 单位:千美元
education = np.random.choice([12, 14, 16, 18], n) # 教育年限
X = np.column_stack([age, income, education])
X_df = pd.DataFrame(X, columns=['age', 'income', 'education'])
# 处理分配(随机化)
propensity = 0.5
D = np.random.binomial(1, propensity, n)
# 真实的异质性处理效应
# 年轻人和高收入者受益更多
true_tau = (
10 + # 基础效应
0.5 * (65 - age) + # 年轻人受益多
0.3 * income - # 高收入者受益多
0.5 * education # 高学历者受益少(已经很好了)
)
# 结果变量
Y0 = 50 + 0.8 * age + 0.5 * income + 2 * education + np.random.normal(0, 5, n)
Y1 = Y0 + true_tau
Y = D * Y1 + (1 - D) * Y0
# 训练Causal Forest
cf = CausalForest(
n_estimators=3000,
min_samples_leaf=15,
max_depth=None,
random_state=42,
verbose=0
)
cf.fit(X, D.reshape(-1, 1), Y)
# 预测CATE
tau_pred = cf.predict(X)
print("=" * 70)
print("CATE估计结果")
print("=" * 70)
print(f"样本量: {n}")
print(f"真实ATE: {true_tau.mean():.2f}")
print(f"估计ATE: {tau_pred.mean():.2f}")
print(f"\nCATE预测R²: {r2_score(true_tau, tau_pred):.3f}")
print(f"CATE预测MSE: {mean_squared_error(true_tau, tau_pred):.3f}")
# 按特征分组分析
X_df['true_tau'] = true_tau
X_df['pred_tau'] = tau_pred
X_df['D'] = D
X_df['Y'] = Y
# 年龄分组
X_df['age_group'] = pd.cut(age, bins=[18, 30, 45, 65], labels=['青年', '中年', '老年'])
age_effects = X_df.groupby('age_group')[['true_tau', 'pred_tau']].mean()
print("\n按年龄组的处理效应:")
print(age_effects.round(2))
# 收入分组
X_df['income_group'] = pd.cut(income, bins=[20, 50, 100, 150], labels=['低收入', '中收入', '高收入'])
income_effects = X_df.groupby('income_group')[['true_tau', 'pred_tau']].mean()
print("\n按收入组的处理效应:")
print(income_effects.round(2))可视化CATE
python
import matplotlib.pyplot as plt
import seaborn as sns
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 1. 真实 vs 预测
axes[0, 0].scatter(true_tau, tau_pred, alpha=0.3, s=20)
axes[0, 0].plot([true_tau.min(), true_tau.max()],
[true_tau.min(), true_tau.max()],
'r--', linewidth=2)
axes[0, 0].set_xlabel('真实CATE', fontsize=12)
axes[0, 0].set_ylabel('预测CATE', fontsize=12)
axes[0, 0].set_title(f'CATE预测准确性 (R² = {r2_score(true_tau, tau_pred):.3f})',
fontsize=13, fontweight='bold')
axes[0, 0].grid(True, alpha=0.3)
# 2. 按年龄的CATE
age_sorted = np.argsort(age)
axes[0, 1].scatter(age, tau_pred, alpha=0.3, s=15, label='预测CATE')
axes[0, 1].scatter(age, true_tau, alpha=0.2, s=10, color='green', label='真实CATE')
axes[0, 1].set_xlabel('年龄', fontsize=12)
axes[0, 1].set_ylabel('处理效应', fontsize=12)
axes[0, 1].set_title('年龄与处理效应', fontsize=13, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 3. 按收入的CATE
axes[1, 0].scatter(income, tau_pred, alpha=0.3, s=15, label='预测CATE')
axes[1, 0].scatter(income, true_tau, alpha=0.2, s=10, color='green', label='真实CATE')
axes[1, 0].set_xlabel('收入(千美元)', fontsize=12)
axes[1, 0].set_ylabel('处理效应', fontsize=12)
axes[1, 0].set_title('收入与处理效应', fontsize=13, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 4. CATE分布
axes[1, 1].hist(true_tau, bins=50, alpha=0.5, label='真实CATE', color='green', density=True)
axes[1, 1].hist(tau_pred, bins=50, alpha=0.5, label='预测CATE', color='blue', density=True)
axes[1, 1].axvline(x=true_tau.mean(), color='green', linestyle='--', linewidth=2, label=f'真实ATE = {true_tau.mean():.1f}')
axes[1, 1].axvline(x=tau_pred.mean(), color='blue', linestyle='--', linewidth=2, label=f'估计ATE = {tau_pred.mean():.1f}')
axes[1, 1].set_xlabel('处理效应', fontsize=12)
axes[1, 1].set_ylabel('密度', fontsize=12)
axes[1, 1].set_title('CATE分布', fontsize=13, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.show()统计推断:置信区间与假设检验
置信区间的构造
问题: 的不确定性有多大?
Wager & Athey (2018) 的方法:
1. 基于渐近正态性的置信区间
python
# 预测CATE及其置信区间
tau_pred = cf.predict(X)
tau_interval = cf.predict_interval(X, alpha=0.05) # 95% CI
# 计算置信区间宽度
ci_width = tau_interval[1] - tau_interval[0]
print("=" * 70)
print("CATE置信区间")
print("=" * 70)
print(f"平均置信区间宽度: {ci_width.mean():.3f}")
print(f"置信区间宽度范围: [{ci_width.min():.3f}, {ci_width.max():.3f}]")
# 检验覆盖率
coverage = np.mean(
(true_tau >= tau_interval[0]) & (true_tau <= tau_interval[1])
)
print(f"\n实际覆盖率: {coverage:.1%}(理论值应接近95%)")
# 可视化置信区间
sorted_idx = np.argsort(tau_pred)[:200] # 显示200个样本
fig, ax = plt.subplots(figsize=(14, 6))
x_plot = np.arange(len(sorted_idx))
ax.scatter(x_plot, true_tau[sorted_idx], s=30, color='green',
alpha=0.6, label='真实CATE', zorder=3)
ax.plot(x_plot, tau_pred[sorted_idx], 'b-', linewidth=1.5,
label='预测CATE', zorder=2)
ax.fill_between(x_plot,
tau_interval[0][sorted_idx],
tau_interval[1][sorted_idx],
alpha=0.25, color='blue', label='95% CI', zorder=1)
ax.set_xlabel('样本(按预测CATE排序)', fontsize=12)
ax.set_ylabel('处理效应', fontsize=12)
ax.set_title('CATE估计与置信区间', fontsize=14, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()假设检验
检验1:处理效应是否为零?
零假设:
检验统计量:
拒绝域:(5%显著性水平)
python
# 计算标准误
tau_se = (tau_interval[1] - tau_interval[0]) / (2 * 1.96)
# 计算z统计量
z_stat = tau_pred / tau_se
# p值
from scipy.stats import norm
p_values = 2 * (1 - norm.cdf(np.abs(z_stat)))
# 显著性
significant = p_values < 0.05
print("=" * 70)
print("假设检验:H₀: τ(x) = 0")
print("=" * 70)
print(f"显著的CATE比例: {significant.mean():.1%}")
print(f"平均|z统计量|: {np.abs(z_stat).mean():.2f}")
# 按显著性分组
X_df['significant'] = significant
X_df['z_stat'] = z_stat
X_df['p_value'] = p_values
print("\n显著 vs 不显著的CATE:")
print(X_df.groupby('significant')['pred_tau'].describe()[['mean', 'std', 'min', 'max']])检验2:不同子群体的处理效应是否不同?
问题:年轻人和老年人的处理效应有显著差异吗?
零假设:
python
# 定义子群体
young = age <= 30
old = age >= 50
# 估计各子群体的ATE
tau_young = tau_pred[young]
tau_old = tau_pred[old]
# 计算差异
diff_mean = tau_young.mean() - tau_old.mean()
# Bootstrap标准误(简化方法)
n_boot = 1000
boot_diffs = []
for _ in range(n_boot):
idx_young = np.random.choice(np.where(young)[0], size=young.sum(), replace=True)
idx_old = np.random.choice(np.where(old)[0], size=old.sum(), replace=True)
boot_diff = tau_pred[idx_young].mean() - tau_pred[idx_old].mean()
boot_diffs.append(boot_diff)
boot_diffs = np.array(boot_diffs)
se_diff = boot_diffs.std()
# 检验统计量
z_stat_diff = diff_mean / se_diff
p_value_diff = 2 * (1 - norm.cdf(abs(z_stat_diff)))
print("\n" + "=" * 70)
print("子群体差异检验:年轻人 vs 老年人")
print("=" * 70)
print(f"年轻人平均CATE: {tau_young.mean():.2f}")
print(f"老年人平均CATE: {tau_old.mean():.2f}")
print(f"差异: {diff_mean:.2f}")
print(f"标准误: {se_diff:.2f}")
print(f"z统计量: {z_stat_diff:.2f}")
print(f"p值: {p_value_diff:.4f}")
if p_value_diff < 0.05:
print("\n 结论:年轻人和老年人的处理效应有显著差异")
else:
print("\n 结论:无法拒绝两组处理效应相同的假设")识别受益最多和最少的子群体
Best Linear Projection (BLP)
Chernozhukov et al. (2018) 提出的方法:将 投影到协变量上
回归模型:
解释:
- :特征 与处理效应的相关性
- : 越大,处理效应越大
python
from sklearn.linear_model import LinearRegression
# BLP回归
blp_model = LinearRegression()
blp_model.fit(X, tau_pred)
# 系数
blp_coefs = pd.DataFrame({
'特征': ['age', 'income', 'education'],
'系数': blp_model.coef_,
'解释': ['年龄增加1岁,处理效应变化', '收入增加1千美元,处理效应变化', '教育增加1年,处理效应变化']
})
print("=" * 70)
print("Best Linear Projection (BLP)")
print("=" * 70)
print(blp_coefs.to_string(index=False))
# 预测BLP
tau_blp = blp_model.predict(X)
blp_r2 = r2_score(tau_pred, tau_blp)
print(f"\nBLP R²: {blp_r2:.3f}(特征能解释 {blp_r2:.1%} 的CATE变异)")
# 可视化
fig, ax = plt.subplots(figsize=(10, 6))
ax.scatter(tau_blp, tau_pred, alpha=0.3, s=15)
ax.plot([tau_blp.min(), tau_blp.max()],
[tau_blp.min(), tau_blp.max()],
'r--', linewidth=2)
ax.set_xlabel('BLP预测的CATE', fontsize=12)
ax.set_ylabel('Causal Forest估计的CATE', fontsize=12)
ax.set_title(f'Best Linear Projection (R² = {blp_r2:.3f})', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()识别Top/Bottom受益者
python
# 按CATE排序
X_df_sorted = X_df.sort_values('pred_tau', ascending=False)
# Top 10%
top_10 = X_df_sorted.head(int(0.1 * n))
print("=" * 70)
print("受益最多的10%人群特征")
print("=" * 70)
print(top_10[['age', 'income', 'education', 'pred_tau']].describe())
# Bottom 10%
bottom_10 = X_df_sorted.tail(int(0.1 * n))
print("\n" + "=" * 70)
print("受益最少的10%人群特征")
print("=" * 70)
print(bottom_10[['age', 'income', 'education', 'pred_tau']].describe())
# 对比可视化
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
features = ['age', 'income', 'education']
labels = ['年龄', '收入(千美元)', '教育年限']
for idx, (feat, label) in enumerate(zip(features, labels)):
ax = axes[idx]
ax.hist(top_10[feat], bins=20, alpha=0.5, label='Top 10%', color='green', density=True)
ax.hist(bottom_10[feat], bins=20, alpha=0.5, label='Bottom 10%', color='red', density=True)
ax.set_xlabel(label, fontsize=12)
ax.set_ylabel('密度', fontsize=12)
ax.set_title(f'{label}分布', fontsize=13, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.show()政策学习(Policy Learning)
问题设定
决策问题:给定个体特征 ,是否应该给予处理?
目标:最大化总福利
其中 是政策函数。
最优政策:
或者考虑成本 :
实现:基于CATE的最优政策
python
# 场景:培训项目
# 成本:每人$5,000
# 收益:CATE(收入增加)
cost = 5.0 # 单位:千美元
# 最优政策:只给CATE > cost的人培训
optimal_policy = (tau_pred > cost).astype(int)
# 当前政策:给所有人培训(或随机50%)
current_policy_all = np.ones(n, dtype=int)
current_policy_random = D
print("=" * 70)
print("政策学习:培训项目")
print("=" * 70)
print(f"成本: ${cost * 1000} 每人")
# 计算各政策的价值
def policy_value(policy, tau, cost):
"""计算政策的净收益"""
# 收益 = 处理效应 - 成本
benefit = tau * policy
total_cost = cost * policy
net_benefit = benefit - total_cost
return net_benefit.mean()
# 计算价值
value_optimal = policy_value(optimal_policy, true_tau, cost)
value_all = policy_value(current_policy_all, true_tau, cost)
value_random = policy_value(current_policy_random, true_tau, cost)
print(f"\n最优政策价值: ${value_optimal * 1000:.0f} 每人")
print(f"全员培训价值: ${value_all * 1000:.0f} 每人")
print(f"随机50%价值: ${value_random * 1000:.0f} 每人")
# 提升幅度
improvement_vs_all = (value_optimal - value_all) / abs(value_all) * 100
improvement_vs_random = (value_optimal - value_random) / abs(value_random) * 100
print(f"\n相比全员培训提升: {improvement_vs_all:.1f}%")
print(f"相比随机50%提升: {improvement_vs_random:.1f}%")
# 处理率
treatment_rate_optimal = optimal_policy.mean()
print(f"\n最优政策处理率: {treatment_rate_optimal:.1%}")
print(f"当前政策处理率: 100%")
# 成本节约
cost_saving = (1 - treatment_rate_optimal) * cost * n
print(f"\n成本节约: ${cost_saving * 1000:,.0f} (总计)")可视化政策规则
python
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 左图:CATE分布与决策阈值
axes[0].hist(tau_pred, bins=50, alpha=0.6, edgecolor='black')
axes[0].axvline(x=cost, color='red', linestyle='--', linewidth=3,
label=f'决策阈值 (成本 = ${cost * 1000})')
axes[0].axvline(x=tau_pred.mean(), color='blue', linestyle='--', linewidth=2,
label=f'平均CATE = ${tau_pred.mean() * 1000:.0f}')
axes[0].set_xlabel('预测CATE(千美元)', fontsize=12)
axes[0].set_ylabel('频数', fontsize=12)
axes[0].set_title('CATE分布与政策决策', fontsize=13, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3, axis='y')
# 标注区域
ylim = axes[0].get_ylim()
axes[0].fill_betweenx(ylim, cost, tau_pred.max(), alpha=0.2, color='green', label='接受处理')
axes[0].fill_betweenx(ylim, tau_pred.min(), cost, alpha=0.2, color='red', label='不接受处理')
axes[0].set_ylim(ylim)
# 右图:按年龄的最优政策
age_bins = np.linspace(18, 65, 20)
policy_by_age = []
for i in range(len(age_bins) - 1):
mask = (age >= age_bins[i]) & (age < age_bins[i+1])
policy_by_age.append(optimal_policy[mask].mean())
axes[1].bar(age_bins[:-1], policy_by_age, width=np.diff(age_bins),
alpha=0.7, edgecolor='black', align='edge')
axes[1].axhline(y=0.5, color='red', linestyle='--', linewidth=2, label='随机政策')
axes[1].set_xlabel('年龄', fontsize=12)
axes[1].set_ylabel('处理率', fontsize=12)
axes[1].set_title('最优政策:按年龄的处理率', fontsize=13, fontweight='bold')
axes[1].set_ylim([0, 1])
axes[1].legend()
axes[1].grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.show()量化异质性的价值
异质性的价值(Value of Heterogeneity)
定义:个性化政策相比平均政策的额外收益
公式:
- 第一项:个性化决策的价值
- 第二项:基于ATE的统一决策价值
python
# 计算异质性的价值
# 个性化政策
individualized_benefit = np.maximum(true_tau - cost, 0).mean()
# 基于ATE的政策
ate = true_tau.mean()
uniform_benefit = max(ate - cost, 0)
# 异质性价值
heterogeneity_value = individualized_benefit - uniform_benefit
print("=" * 70)
print("异质性的价值")
print("=" * 70)
print(f"ATE: ${ate * 1000:.0f}")
print(f"成本: ${cost * 1000:.0f}")
print(f"\n基于ATE的统一政策:")
if ate > cost:
print(f" 决策:给所有人处理")
print(f" 净收益:${uniform_benefit * 1000:.0f} 每人")
else:
print(f" 决策:不给任何人处理")
print(f" 净收益:$0 每人")
print(f"\n个性化政策:")
print(f" 平均净收益:${individualized_benefit * 1000:.0f} 每人")
print(f"\n异质性的价值:${heterogeneity_value * 1000:.0f} 每人")
print(f"总体价值提升:${heterogeneity_value * n * 1000:,.0f}")
# 相对提升
if uniform_benefit > 0:
relative_gain = (heterogeneity_value / uniform_benefit) * 100
print(f"相对提升:{relative_gain:.1f}%")社会福利分析
python
# 不同成本水平下的政策分析
costs = np.linspace(0, 30, 50)
values_optimal = []
values_uniform = []
treatment_rates = []
for c in costs:
# 最优政策
opt_policy = (tau_pred > c).astype(int)
val_opt = policy_value(opt_policy, true_tau, c)
values_optimal.append(val_opt)
# 统一政策
if ate > c:
val_uni = ate - c
else:
val_uni = 0
values_uniform.append(val_uni)
# 处理率
treatment_rates.append(opt_policy.mean())
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 左图:价值曲线
axes[0].plot(costs, np.array(values_optimal) * 1000, 'b-', linewidth=2.5,
label='个性化政策', marker='o', markersize=4)
axes[0].plot(costs, np.array(values_uniform) * 1000, 'r--', linewidth=2.5,
label='统一政策(基于ATE)', marker='s', markersize=4)
axes[0].fill_between(costs,
np.array(values_uniform) * 1000,
np.array(values_optimal) * 1000,
alpha=0.3, color='green', label='异质性价值')
axes[0].axvline(x=ate, color='gray', linestyle=':', linewidth=2, label=f'ATE = ${ate * 1000:.0f}')
axes[0].set_xlabel('成本(千美元)', fontsize=12)
axes[0].set_ylabel('净收益(美元/人)', fontsize=12)
axes[0].set_title('政策价值随成本变化', fontsize=13, fontweight='bold')
axes[0].legend(fontsize=10)
axes[0].grid(True, alpha=0.3)
# 右图:处理率
axes[1].plot(costs, np.array(treatment_rates) * 100, 'g-', linewidth=2.5,
marker='o', markersize=4)
axes[1].axhline(y=100, color='red', linestyle='--', linewidth=2, label='统一政策(100%)')
axes[1].axvline(x=ate, color='gray', linestyle=':', linewidth=2, label=f'ATE = ${ate * 1000:.0f}')
axes[1].set_xlabel('成本(千美元)', fontsize=12)
axes[1].set_ylabel('处理率 (%)', fontsize=12)
axes[1].set_title('最优处理率随成本变化', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()高级话题:条件政策学习
Athey & Wager (2021):带约束的政策学习
问题:预算有限,只能处理 的人
约束优化:
解:选择CATE最大的 的人
python
# 预算约束:只能处理30%的人
budget_rate = 0.30
# 按CATE排序,选择Top 30%
k = int(budget_rate * n)
top_k_idx = np.argsort(tau_pred)[-k:]
budget_policy = np.zeros(n, dtype=int)
budget_policy[top_k_idx] = 1
# 计算价值
value_budget_optimal = policy_value(budget_policy, true_tau, cost)
value_budget_random = policy_value((D * budget_rate).astype(int), true_tau, cost)
print("=" * 70)
print(f"预算约束政策(处理率 = {budget_rate:.0%})")
print("=" * 70)
print(f"最优选择价值: ${value_budget_optimal * 1000:.0f} 每人")
print(f"随机选择价值: ${value_budget_random * 1000:.0f} 每人")
print(f"提升: {(value_budget_optimal - value_budget_random) / abs(value_budget_random) * 100:.1f}%")
# 被选中人群的特征
selected = budget_policy == 1
print(f"\n被选中人群特征:")
print(X_df[selected][['age', 'income', 'education']].describe())本节小结
核心要点
CATE估计:
- Causal Forest直接估计
- 提供点估计和置信区间
- 可以识别异质性的来源
统计推断:
- 渐近正态性 → 置信区间
- 假设检验: 或
- BLP:理解哪些特征驱动异质性
政策学习:
- 最优政策:
- 量化个性化决策的价值
- 预算约束下的最优分配
异质性的价值:
- 个性化政策 vs 统一政策
- 可以带来显著的福利提升
- 对成本敏感
关键公式
CATE:
最优政策:
异质性价值:
Python工具
python
# CATE估计
from econml.grf import CausalForest
cf = CausalForest(n_estimators=3000)
cf.fit(X, T, Y)
tau = cf.predict(X)
tau_interval = cf.predict_interval(X, alpha=0.05)
# BLP
from sklearn.linear_model import LinearRegression
blp = LinearRegression().fit(X, tau)
# 政策学习
optimal_policy = (tau > cost).astype(int)下一节预告
在第4节:Python实现与经典案例中,我们将:
- 使用真实数据集(401k计划、就业培训)
- 完整实现Causal Forest工作流
- 可视化和解释结果
- 对比不同方法的性能
从估计到决策:让数据驱动个性化政策!