12.2 Causal Forest的数学原理与算法
"Machine learning and econometrics are complementary tools for causal inference.""机器学习和计量经济学是因果推断的互补工具。"— Susan Athey, Stanford Economist (斯坦福经济学家)
从CART到诚实分割:理解Causal Forest的数学基础
本节目标
完成本节后,你将能够:
- 理解CART(分类回归树)的分裂准则
- 掌握Causal Tree的核心创新
- 深刻理解诚实分割(Honest Splitting)的必要性
- 理解Causal Forest的渐近理论
- 比较不同的异质性处理效应估计方法
- 用Python实现基础的Causal Tree算法
从预测到因果:CART vs Causal Tree
CART回顾:预测的艺术
**CART(Breiman et al., 1984)**的目标是最小化预测误差。
标准回归树的分裂准则
对于节点 (父节点),我们寻找最优分裂 :
- :特征索引
- :分裂点
目标:最小化分裂后的MSE
其中:
- (左子节点)
- (右子节点)
- 、:左右节点的均值
Python示例:标准CART
import numpy as np
from sklearn.tree import DecisionTreeRegressor
import matplotlib.pyplot as plt
# 生成模拟数据
np.random.seed(42)
n = 1000
X = np.random.uniform(0, 10, n).reshape(-1, 1)
y = np.sin(X.ravel()) + 0.5 * np.random.randn(n)
# 训练CART
tree = DecisionTreeRegressor(max_depth=3, random_state=42)
tree.fit(X, y)
# 可视化
X_test = np.linspace(0, 10, 300).reshape(-1, 1)
y_pred = tree.predict(X_test)
plt.figure(figsize=(12, 5))
plt.scatter(X, y, alpha=0.3, label='观测数据')
plt.plot(X_test, y_pred, 'r-', linewidth=2, label='CART预测')
plt.xlabel('X', fontsize=12)
plt.ylabel('Y', fontsize=12)
plt.title('标准回归树(CART)', fontsize=14, fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"树的叶子节点数: {tree.get_n_leaves()}")
print(f"树的深度: {tree.get_depth()}")Causal Tree的创新:估计处理效应而非预测结果
核心区别
| 维度 | CART | Causal Tree |
|---|---|---|
| 目标 | 预测 | 估计 |
| 分裂准则 | 最小化MSE | 最大化处理效应方差 |
| 叶子节点估计 | ||
| 偏差问题 | 正则化偏差 | 需要诚实估计 |
Athey & Imbens (2016) 的Causal Tree
目标:在每个叶子节点 内,估计CATE
其中:
- :叶子节点中处理组样本数
- :叶子节点中对照组样本数
关键创新1:新的分裂准则
不再最小化MSE,而是最大化处理效应的异质性:
直觉:
- 如果分裂后两个子节点的处理效应差异大,说明找到了有价值的异质性
- 类似于CART中的信息增益,但针对处理效应
关键创新2:诚实估计(Honest Estimation)
这是Causal Tree最重要的贡献!
诚实分割(Honest Splitting):解决过拟合偏差
问题:为什么需要诚实分割?
标准CART的问题:同一数据既用于建树(选择分裂点),又用于估计(计算叶子节点值)。
后果:
- 过拟合:树会找到数据中的噪声模式
- 估计偏差:叶子节点的估计值是有偏的
在因果推断中更严重!
数学示例:偏差的来源
假设真实的 (无处理效应),但由于随机噪声:
- 某个子集中 (纯属偶然)
- CART会选择这个分裂(因为它最大化了"效应")
- 结果:估计的 ,但真实
偏差 =
解决方案:样本分割
Athey & Imbens (2016) 的诚实分割算法:
步骤1:随机分割样本
- 样本 (大小 )随机分为两部分:
- :训练集(用于建树),大小
- :估计集(用于估计叶子节点效应),大小
步骤2:用 建树
- 使用训练集选择最优分裂点
- 确定树的结构
步骤3:用 估计
- 对于每个叶子节点 ,使用估计集中的样本:
为什么有效?
- 估计集的数据从未参与分裂决策
- 因此估计值是无偏的(在条件期望意义下)
Python实现:诚实分割的简单示例
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
# 模拟数据生成函数
def simulate_heterogeneous_data(n=2000, seed=42):
"""
生成具有异质性处理效应的数据
真实的CATE: τ(x) = 2*x1 + 5*x2
"""
np.random.seed(seed)
# 协变量
X1 = np.random.uniform(0, 1, n)
X2 = np.random.uniform(0, 1, n)
# 处理分配(随机)
D = np.random.binomial(1, 0.5, n)
# 真实的异质性处理效应
true_tau = 2 * X1 + 5 * X2
# 结果变量
Y0 = 10 + 3*X1 + 2*X2 + np.random.normal(0, 1, n) # 潜在结果Y(0)
Y1 = Y0 + true_tau # 潜在结果Y(1)
# 观测结果
Y = D * Y1 + (1 - D) * Y0
df = pd.DataFrame({
'X1': X1,
'X2': X2,
'D': D,
'Y': Y,
'true_tau': true_tau
})
return df
# 生成数据
df = simulate_heterogeneous_data(n=2000)
print("=" * 70)
print("数据生成完成")
print("=" * 70)
print(f"样本量: {len(df)}")
print(f"处理组: {df['D'].sum()}")
print(f"对照组: {len(df) - df['D'].sum()}")
print(f"\n真实平均处理效应 (ATE): {df['true_tau'].mean():.4f}")
print(f"真实处理效应范围: [{df['true_tau'].min():.4f}, {df['true_tau'].max():.4f}]")
# 诚实分割的Causal Tree(简化版)
class HonestCausalTree:
"""
诚实分割的因果树(简化实现)
仅用于教学演示,实际应用请使用econml或grf包
"""
def __init__(self, max_depth=3, min_samples_leaf=50):
self.max_depth = max_depth
self.min_samples_leaf = min_samples_leaf
self.tree_structure = None
def fit(self, X, D, Y):
"""
训练诚实因果树
参数:
- X: 协变量 (n x p)
- D: 处理指示 (n,)
- Y: 结果变量 (n,)
"""
n = len(X)
# 步骤1: 样本分割
indices = np.arange(n)
train_idx, est_idx = train_test_split(
indices, test_size=0.5, random_state=42
)
# 训练集和估计集
X_train, D_train, Y_train = X[train_idx], D[train_idx], Y[train_idx]
X_est, D_est, Y_est = X[est_idx], D[est_idx], Y[est_idx]
# 步骤2: 用训练集建树
self.tree_structure = self._build_tree(
X_train, D_train, Y_train, depth=0
)
# 步骤3: 用估计集估计叶子节点效应
self._estimate_leaf_effects(X_est, D_est, Y_est)
return self
def _build_tree(self, X, D, Y, depth):
"""递归构建树结构(仅使用训练集)"""
n = len(X)
# 停止条件
if depth >= self.max_depth or n < 2 * self.min_samples_leaf:
return {'leaf': True, 'samples': n, 'tau': None}
# 寻找最优分裂
best_split = self._find_best_split(X, D, Y)
if best_split is None:
return {'leaf': True, 'samples': n, 'tau': None}
feature, threshold = best_split
# 分裂样本
left_mask = X[:, feature] <= threshold
right_mask = ~left_mask
# 递归构建子树
left_tree = self._build_tree(
X[left_mask], D[left_mask], Y[left_mask], depth + 1
)
right_tree = self._build_tree(
X[right_mask], D[right_mask], Y[right_mask], depth + 1
)
return {
'leaf': False,
'feature': feature,
'threshold': threshold,
'left': left_tree,
'right': right_tree
}
def _find_best_split(self, X, D, Y):
"""寻找最优分裂(最大化处理效应方差)"""
n, p = X.shape
best_gain = -np.inf
best_split = None
# 父节点的处理效应
tau_parent = self._estimate_tau(D, Y)
# 遍历所有特征和分裂点
for feature in range(p):
thresholds = np.percentile(X[:, feature], [25, 50, 75])
for threshold in thresholds:
left_mask = X[:, feature] <= threshold
right_mask = ~left_mask
# 检查最小样本量
if left_mask.sum() < self.min_samples_leaf or right_mask.sum() < self.min_samples_leaf:
continue
# 计算左右节点的处理效应
tau_left = self._estimate_tau(D[left_mask], Y[left_mask])
tau_right = self._estimate_tau(D[right_mask], Y[right_mask])
# 计算增益(处理效应方差的增加)
n_left, n_right = left_mask.sum(), right_mask.sum()
gain = (n_left/n * tau_left**2 + n_right/n * tau_right**2 - tau_parent**2)
if gain > best_gain:
best_gain = gain
best_split = (feature, threshold)
return best_split
def _estimate_tau(self, D, Y):
"""估计处理效应"""
treated = D == 1
control = D == 0
if treated.sum() == 0 or control.sum() == 0:
return 0
return Y[treated].mean() - Y[control].mean()
def _estimate_leaf_effects(self, X, D, Y):
"""用估计集估计叶子节点效应(诚实估计)"""
self._recursive_estimate(self.tree_structure, X, D, Y)
def _recursive_estimate(self, node, X, D, Y):
"""递归估计每个叶子节点的效应"""
if node['leaf']:
# 叶子节点:用估计集计算处理效应
if len(Y) > 0:
node['tau'] = self._estimate_tau(D, Y)
node['est_samples'] = len(Y)
else:
node['tau'] = 0
node['est_samples'] = 0
else:
# 内部节点:递归到子节点
mask = X[:, node['feature']] <= node['threshold']
self._recursive_estimate(
node['left'], X[mask], D[mask], Y[mask]
)
self._recursive_estimate(
node['right'], X[~mask], D[~mask], Y[~mask]
)
def predict(self, X):
"""预测CATE"""
n = len(X)
predictions = np.zeros(n)
for i in range(n):
predictions[i] = self._predict_one(X[i])
return predictions
def _predict_one(self, x):
"""预测单个样本的CATE"""
node = self.tree_structure
while not node['leaf']:
if x[node['feature']] <= node['threshold']:
node = node['left']
else:
node = node['right']
return node['tau']
# 训练诚实因果树
X = df[['X1', 'X2']].values
D = df['D'].values
Y = df['Y'].values
honest_tree = HonestCausalTree(max_depth=3, min_samples_leaf=50)
honest_tree.fit(X, D, Y)
# 预测CATE
df['predicted_tau'] = honest_tree.predict(X)
print("\n" + "=" * 70)
print("诚实因果树训练完成")
print("=" * 70)
# 评估
from sklearn.metrics import mean_squared_error, r2_score
mse = mean_squared_error(df['true_tau'], df['predicted_tau'])
r2 = r2_score(df['true_tau'], df['predicted_tau'])
print(f"CATE预测MSE: {mse:.4f}")
print(f"CATE预测R²: {r2:.4f}")
# 可视化:真实 vs 预测的CATE
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 左图:真实CATE
scatter1 = axes[0].scatter(df['X1'], df['X2'], c=df['true_tau'],
cmap='RdYlGn', s=20, alpha=0.6)
axes[0].set_xlabel('X1', fontsize=12)
axes[0].set_ylabel('X2', fontsize=12)
axes[0].set_title('真实CATE: τ(x) = 2X₁ + 5X₂', fontsize=13, fontweight='bold')
plt.colorbar(scatter1, ax=axes[0], label='处理效应')
# 右图:预测CATE
scatter2 = axes[1].scatter(df['X1'], df['X2'], c=df['predicted_tau'],
cmap='RdYlGn', s=20, alpha=0.6)
axes[1].set_xlabel('X1', fontsize=12)
axes[1].set_ylabel('X2', fontsize=12)
axes[1].set_title(f'诚实因果树预测CATE (R² = {r2:.3f})',
fontsize=13, fontweight='bold')
plt.colorbar(scatter2, ax=axes[1], label='处理效应')
plt.tight_layout()
plt.show()输出解读:
- 诚实分割确保估计的处理效应是无偏的
- R²表示模型捕捉异质性的能力
- 可视化显示树如何在特征空间中划分不同的处理效应区域
从Causal Tree到Causal Forest
Wager & Athey (2018) 的核心贡献
Random Forest的优势:
- 降低方差(通过平均多棵树)
- 提高稳定性
Causal Forest = Honest Causal Trees + Random Forest
算法:Causal Forest
输入:
- 数据
- 树的数量 (通常1000-5000)
步骤:
For :
Bootstrap抽样:从 个样本中有放回抽取 个样本,记为
样本分割:将 随机分为两半
- :用于建树
- :用于估计
随机特征:每次分裂时,随机选择 个特征()
构建诚实因果树 :
- 用 选择分裂点
- 用 估计叶子节点效应
预测:对于新样本 ,
关键参数
| 参数 | 含义 | 推荐值 | 影响 |
|---|---|---|---|
n_estimators (B) | 树的数量 | 2000-5000 | 越多越稳定,但计算慢 |
max_depth | 树的最大深度 | 不限制 | 太深过拟合,太浅欠拟合 |
min_samples_leaf | 叶子节点最小样本量 | 5-20 | 太小方差大,太大偏差大 |
max_features | 每次分裂考虑的特征数 | 或 | 增加随机性 |
honest | 是否诚实分割 | True | 必须True! |
渐近理论:为什么Causal Forest有效?
Wager & Athey (2018) 的理论结果
定理1:渐近正态性
在正则性条件下,当 时:
其中 可以被一致估计。
含义:
- 是一致的(consistent):
- 可以构建置信区间
- 可以进行假设检验
定理2:收敛速度
在适当的光滑性条件下:
对比:
- 标准非参数方法:(维度诅咒)
- Causal Forest:(不受维度影响!)
这是自适应的结果:树会自动找到重要的特征。
置信区间的构造
问题:如何量化不确定性?
Wager & Athey的方法:
步骤1:估计方差
步骤2:构造置信区间
其中 是有效样本量(考虑到树之间的相关性)。
Python实现(使用econml):
from econml.grf import CausalForest
import numpy as np
import matplotlib.pyplot as plt
# 使用之前生成的数据
X = df[['X1', 'X2']].values
D = df['D'].values.reshape(-1, 1)
Y = df['Y'].values
# 训练Causal Forest(econml实现)
cf = CausalForest(
n_estimators=2000,
min_samples_leaf=10,
max_depth=None,
random_state=42,
verbose=0
)
cf.fit(X, D, Y)
# 预测CATE和置信区间
tau_pred = cf.predict(X)
tau_pred_interval = cf.predict_interval(X, alpha=0.05) # 95% CI
print("=" * 70)
print("Causal Forest (econml) 结果")
print("=" * 70)
print(f"预测的平均CATE: {tau_pred.mean():.4f}")
print(f"真实的平均CATE: {df['true_tau'].mean():.4f}")
print(f"\nCate预测MSE: {mean_squared_error(df['true_tau'], tau_pred):.4f}")
print(f"CATE预测R²: {r2_score(df['true_tau'], tau_pred):.4f}")
# 可视化:CATE预测 + 置信区间
fig, ax = plt.subplots(figsize=(12, 6))
# 按真实CATE排序
sorted_idx = np.argsort(df['true_tau'].values)
x_plot = np.arange(len(sorted_idx))
ax.plot(x_plot, df['true_tau'].values[sorted_idx],
'g-', linewidth=2, label='真实CATE', alpha=0.7)
ax.plot(x_plot, tau_pred[sorted_idx],
'b-', linewidth=1.5, label='预测CATE', alpha=0.8)
ax.fill_between(x_plot,
tau_pred_interval[0][sorted_idx],
tau_pred_interval[1][sorted_idx],
alpha=0.2, color='blue', label='95% 置信区间')
ax.set_xlabel('样本(按真实CATE排序)', fontsize=12)
ax.set_ylabel('处理效应', fontsize=12)
ax.set_title('Causal Forest: CATE预测与置信区间', fontsize=14, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 统计覆盖率
coverage = np.mean(
(df['true_tau'].values >= tau_pred_interval[0]) &
(df['true_tau'].values <= tau_pred_interval[1])
)
print(f"\n置信区间覆盖率: {coverage:.2%}(理论值应接近95%)")与其他方法的比较
Meta-learners家族
1. S-learner(Single Model)
思路:用一个模型预测 ,把 作为特征
优点:简单 缺点:处理效应可能被 的主效应淹没
2. T-learner(Two Model)
思路:分别对处理组和对照组建模
优点:允许完全的异质性 缺点:两个模型之间没有信息共享
3. X-learner(Künzel et al., 2019)
思路:结合S和T的优点
步骤1:估计 和 (如T-learner)
步骤2:计算"伪处理效应"
步骤3:对伪处理效应建模
步骤4:加权平均
其中 (倾向得分)
4. Causal Forest
思路:直接优化处理效应估计
对比表:
| 方法 | 偏差控制 | 方差控制 | 不平衡数据 | 高维特征 |
|---|---|---|---|---|
| S-learner | 差 | 好 | 好 | 好 |
| T-learner | 好 | 差 | 差 | ️ 一般 |
| X-learner | 好 | 好 | 好 | ️ 一般 |
| Causal Forest | 好 | 好 | 好 | 好 |
Python对比实验:
from sklearn.ensemble import RandomForestRegressor
from econml.metalearners import TLearner, SLearner, XLearner
from econml.grf import CausalForest
# 使用之前的模拟数据
X = df[['X1', 'X2']].values
D = df['D'].values
Y = df['Y'].values
# 1. S-learner
s_learner = SLearner(overall_model=RandomForestRegressor(n_estimators=100, random_state=42))
s_learner.fit(Y, D, X=X)
tau_s = s_learner.effect(X)
# 2. T-learner
t_learner = TLearner(models=RandomForestRegressor(n_estimators=100, random_state=42))
t_learner.fit(Y, D, X=X)
tau_t = t_learner.effect(X)
# 3. X-learner
x_learner = XLearner(
models=RandomForestRegressor(n_estimators=100, random_state=42),
propensity_model=RandomForestRegressor(n_estimators=100, random_state=42)
)
x_learner.fit(Y, D, X=X)
tau_x = x_learner.effect(X)
# 4. Causal Forest
cf = CausalForest(n_estimators=2000, random_state=42)
cf.fit(X, D.reshape(-1, 1), Y)
tau_cf = cf.predict(X)
# 比较性能
methods = ['S-learner', 'T-learner', 'X-learner', 'Causal Forest']
predictions = [tau_s, tau_t, tau_x, tau_cf]
print("=" * 70)
print("Meta-learners 性能对比")
print("=" * 70)
results_comparison = []
for method, pred in zip(methods, predictions):
mse = mean_squared_error(df['true_tau'], pred)
r2 = r2_score(df['true_tau'], pred)
bias = np.mean(pred - df['true_tau'])
results_comparison.append({
'方法': method,
'MSE': mse,
'R²': r2,
'偏差': bias
})
comparison_df = pd.DataFrame(results_comparison)
print(comparison_df.to_string(index=False))
# 可视化对比
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes = axes.ravel()
for idx, (method, pred) in enumerate(zip(methods, predictions)):
ax = axes[idx]
ax.scatter(df['true_tau'], pred, alpha=0.3, s=10)
ax.plot([df['true_tau'].min(), df['true_tau'].max()],
[df['true_tau'].min(), df['true_tau'].max()],
'r--', linewidth=2)
r2 = r2_score(df['true_tau'], pred)
ax.set_xlabel('真实CATE', fontsize=11)
ax.set_ylabel('预测CATE', fontsize=11)
ax.set_title(f'{method} (R² = {r2:.3f})', fontsize=12, fontweight='bold')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()本节小结
核心要点
CART → Causal Tree:
- 目标从预测 变为估计
- 分裂准则从最小化MSE变为最大化处理效应异质性
诚实分割的必要性:
- 解决过拟合偏差
- 确保估计的无偏性
- 样本分为训练集(建树)和估计集(估计)
Causal Forest = Honest Causal Trees + Random Forest:
- Bootstrap + 随机特征 → 降低方差
- 诚实分割 → 控制偏差
- 渐近正态性 → 允许统计推断
理论保证:
- 一致性:
- 渐近正态性:可以构造置信区间
- 收敛速度:,不受维度影响
与Meta-learners对比:
- S-learner:简单但可能有偏差
- T-learner:灵活但方差大
- X-learner:平衡偏差和方差
- Causal Forest:直接优化处理效应,综合性能最好
关键公式
CATE估计:
置信区间:
Python工具
# 主要库
from econml.grf import CausalForest # 推荐!
from econml.metalearners import TLearner, SLearner, XLearner
from causalml.inference.tree import UpliftRandomForestClassifier
# 基本用法
cf = CausalForest(n_estimators=2000, min_samples_leaf=10)
cf.fit(X, T, Y)
tau = cf.predict(X)
tau_interval = cf.predict_interval(X, alpha=0.05)下一节预告
在第3节:异质性处理效应的估计与推断中,我们将深入探讨:
- 如何估计和解释CATE
- 如何进行统计推断和假设检验
- 政策学习(Policy Learning):基于估计的CATE制定最优决策规则
- 异质性的价值:量化个性化决策的收益
从理论到实践:让我们用Causal Forest发现真正的异质性!