Skip to content

12.2 Causal Forest的数学原理与算法

"Machine learning and econometrics are complementary tools for causal inference.""机器学习和计量经济学是因果推断的互补工具。"— Susan Athey, Stanford Economist (斯坦福经济学家)

从CART到诚实分割:理解Causal Forest的数学基础

难度数学密集


本节目标

完成本节后,你将能够:

  • 理解CART(分类回归树)的分裂准则
  • 掌握Causal Tree的核心创新
  • 深刻理解诚实分割(Honest Splitting)的必要性
  • 理解Causal Forest的渐近理论
  • 比较不同的异质性处理效应估计方法
  • 用Python实现基础的Causal Tree算法

从预测到因果:CART vs Causal Tree

CART回顾:预测的艺术

**CART(Breiman et al., 1984)**的目标是最小化预测误差。

标准回归树的分裂准则

对于节点 (父节点),我们寻找最优分裂

  • :特征索引
  • :分裂点

目标:最小化分裂后的MSE

其中:

  • (左子节点)
  • (右子节点)
  • :左右节点的均值

Python示例:标准CART

python
import numpy as np
from sklearn.tree import DecisionTreeRegressor
import matplotlib.pyplot as plt

# 生成模拟数据
np.random.seed(42)
n = 1000
X = np.random.uniform(0, 10, n).reshape(-1, 1)
y = np.sin(X.ravel()) + 0.5 * np.random.randn(n)

# 训练CART
tree = DecisionTreeRegressor(max_depth=3, random_state=42)
tree.fit(X, y)

# 可视化
X_test = np.linspace(0, 10, 300).reshape(-1, 1)
y_pred = tree.predict(X_test)

plt.figure(figsize=(12, 5))
plt.scatter(X, y, alpha=0.3, label='观测数据')
plt.plot(X_test, y_pred, 'r-', linewidth=2, label='CART预测')
plt.xlabel('X', fontsize=12)
plt.ylabel('Y', fontsize=12)
plt.title('标准回归树(CART)', fontsize=14, fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"树的叶子节点数: {tree.get_n_leaves()}")
print(f"树的深度: {tree.get_depth()}")

Causal Tree的创新:估计处理效应而非预测结果

核心区别

维度CARTCausal Tree
目标预测 估计
分裂准则最小化MSE最大化处理效应方差
叶子节点估计
偏差问题正则化偏差需要诚实估计

Athey & Imbens (2016) 的Causal Tree

目标:在每个叶子节点 内,估计CATE

其中:

  • :叶子节点中处理组样本数
  • :叶子节点中对照组样本数

关键创新1:新的分裂准则

不再最小化MSE,而是最大化处理效应的异质性

直觉

  • 如果分裂后两个子节点的处理效应差异大,说明找到了有价值的异质性
  • 类似于CART中的信息增益,但针对处理效应

关键创新2:诚实估计(Honest Estimation)

这是Causal Tree最重要的贡献!


诚实分割(Honest Splitting):解决过拟合偏差

问题:为什么需要诚实分割?

标准CART的问题:同一数据既用于建树(选择分裂点),又用于估计(计算叶子节点值)。

后果

  1. 过拟合:树会找到数据中的噪声模式
  2. 估计偏差:叶子节点的估计值是有偏的

在因果推断中更严重

数学示例:偏差的来源

假设真实的 (无处理效应),但由于随机噪声:

  • 某个子集中 (纯属偶然)
  • CART会选择这个分裂(因为它最大化了"效应")
  • 结果:估计的 ,但真实

偏差 =

解决方案:样本分割

Athey & Imbens (2016) 的诚实分割算法

步骤1:随机分割样本

  • 样本 (大小 )随机分为两部分:
    • :训练集(用于建树),大小
    • :估计集(用于估计叶子节点效应),大小

步骤2:用 建树

  • 使用训练集选择最优分裂点
  • 确定树的结构

步骤3:用 估计

  • 对于每个叶子节点 ,使用估计集中的样本:

为什么有效?

  • 估计集的数据从未参与分裂决策
  • 因此估计值是无偏的(在条件期望意义下)

Python实现:诚实分割的简单示例

python
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# 模拟数据生成函数
def simulate_heterogeneous_data(n=2000, seed=42):
    """
    生成具有异质性处理效应的数据

    真实的CATE: τ(x) = 2*x1 + 5*x2
    """
    np.random.seed(seed)

    # 协变量
    X1 = np.random.uniform(0, 1, n)
    X2 = np.random.uniform(0, 1, n)

    # 处理分配(随机)
    D = np.random.binomial(1, 0.5, n)

    # 真实的异质性处理效应
    true_tau = 2 * X1 + 5 * X2

    # 结果变量
    Y0 = 10 + 3*X1 + 2*X2 + np.random.normal(0, 1, n)  # 潜在结果Y(0)
    Y1 = Y0 + true_tau  # 潜在结果Y(1)

    # 观测结果
    Y = D * Y1 + (1 - D) * Y0

    df = pd.DataFrame({
        'X1': X1,
        'X2': X2,
        'D': D,
        'Y': Y,
        'true_tau': true_tau
    })

    return df

# 生成数据
df = simulate_heterogeneous_data(n=2000)

print("=" * 70)
print("数据生成完成")
print("=" * 70)
print(f"样本量: {len(df)}")
print(f"处理组: {df['D'].sum()}")
print(f"对照组: {len(df) - df['D'].sum()}")
print(f"\n真实平均处理效应 (ATE): {df['true_tau'].mean():.4f}")
print(f"真实处理效应范围: [{df['true_tau'].min():.4f}, {df['true_tau'].max():.4f}]")


# 诚实分割的Causal Tree(简化版)
class HonestCausalTree:
    """
    诚实分割的因果树(简化实现)

    仅用于教学演示,实际应用请使用econml或grf包
    """

    def __init__(self, max_depth=3, min_samples_leaf=50):
        self.max_depth = max_depth
        self.min_samples_leaf = min_samples_leaf
        self.tree_structure = None

    def fit(self, X, D, Y):
        """
        训练诚实因果树

        参数:
        - X: 协变量 (n x p)
        - D: 处理指示 (n,)
        - Y: 结果变量 (n,)
        """
        n = len(X)

        # 步骤1: 样本分割
        indices = np.arange(n)
        train_idx, est_idx = train_test_split(
            indices, test_size=0.5, random_state=42
        )

        # 训练集和估计集
        X_train, D_train, Y_train = X[train_idx], D[train_idx], Y[train_idx]
        X_est, D_est, Y_est = X[est_idx], D[est_idx], Y[est_idx]

        # 步骤2: 用训练集建树
        self.tree_structure = self._build_tree(
            X_train, D_train, Y_train, depth=0
        )

        # 步骤3: 用估计集估计叶子节点效应
        self._estimate_leaf_effects(X_est, D_est, Y_est)

        return self

    def _build_tree(self, X, D, Y, depth):
        """递归构建树结构(仅使用训练集)"""
        n = len(X)

        # 停止条件
        if depth >= self.max_depth or n < 2 * self.min_samples_leaf:
            return {'leaf': True, 'samples': n, 'tau': None}

        # 寻找最优分裂
        best_split = self._find_best_split(X, D, Y)

        if best_split is None:
            return {'leaf': True, 'samples': n, 'tau': None}

        feature, threshold = best_split

        # 分裂样本
        left_mask = X[:, feature] <= threshold
        right_mask = ~left_mask

        # 递归构建子树
        left_tree = self._build_tree(
            X[left_mask], D[left_mask], Y[left_mask], depth + 1
        )
        right_tree = self._build_tree(
            X[right_mask], D[right_mask], Y[right_mask], depth + 1
        )

        return {
            'leaf': False,
            'feature': feature,
            'threshold': threshold,
            'left': left_tree,
            'right': right_tree
        }

    def _find_best_split(self, X, D, Y):
        """寻找最优分裂(最大化处理效应方差)"""
        n, p = X.shape
        best_gain = -np.inf
        best_split = None

        # 父节点的处理效应
        tau_parent = self._estimate_tau(D, Y)

        # 遍历所有特征和分裂点
        for feature in range(p):
            thresholds = np.percentile(X[:, feature], [25, 50, 75])

            for threshold in thresholds:
                left_mask = X[:, feature] <= threshold
                right_mask = ~left_mask

                # 检查最小样本量
                if left_mask.sum() < self.min_samples_leaf or right_mask.sum() < self.min_samples_leaf:
                    continue

                # 计算左右节点的处理效应
                tau_left = self._estimate_tau(D[left_mask], Y[left_mask])
                tau_right = self._estimate_tau(D[right_mask], Y[right_mask])

                # 计算增益(处理效应方差的增加)
                n_left, n_right = left_mask.sum(), right_mask.sum()
                gain = (n_left/n * tau_left**2 + n_right/n * tau_right**2 - tau_parent**2)

                if gain > best_gain:
                    best_gain = gain
                    best_split = (feature, threshold)

        return best_split

    def _estimate_tau(self, D, Y):
        """估计处理效应"""
        treated = D == 1
        control = D == 0

        if treated.sum() == 0 or control.sum() == 0:
            return 0

        return Y[treated].mean() - Y[control].mean()

    def _estimate_leaf_effects(self, X, D, Y):
        """用估计集估计叶子节点效应(诚实估计)"""
        self._recursive_estimate(self.tree_structure, X, D, Y)

    def _recursive_estimate(self, node, X, D, Y):
        """递归估计每个叶子节点的效应"""
        if node['leaf']:
            # 叶子节点:用估计集计算处理效应
            if len(Y) > 0:
                node['tau'] = self._estimate_tau(D, Y)
                node['est_samples'] = len(Y)
            else:
                node['tau'] = 0
                node['est_samples'] = 0
        else:
            # 内部节点:递归到子节点
            mask = X[:, node['feature']] <= node['threshold']
            self._recursive_estimate(
                node['left'], X[mask], D[mask], Y[mask]
            )
            self._recursive_estimate(
                node['right'], X[~mask], D[~mask], Y[~mask]
            )

    def predict(self, X):
        """预测CATE"""
        n = len(X)
        predictions = np.zeros(n)

        for i in range(n):
            predictions[i] = self._predict_one(X[i])

        return predictions

    def _predict_one(self, x):
        """预测单个样本的CATE"""
        node = self.tree_structure

        while not node['leaf']:
            if x[node['feature']] <= node['threshold']:
                node = node['left']
            else:
                node = node['right']

        return node['tau']


# 训练诚实因果树
X = df[['X1', 'X2']].values
D = df['D'].values
Y = df['Y'].values

honest_tree = HonestCausalTree(max_depth=3, min_samples_leaf=50)
honest_tree.fit(X, D, Y)

# 预测CATE
df['predicted_tau'] = honest_tree.predict(X)

print("\n" + "=" * 70)
print("诚实因果树训练完成")
print("=" * 70)

# 评估
from sklearn.metrics import mean_squared_error, r2_score

mse = mean_squared_error(df['true_tau'], df['predicted_tau'])
r2 = r2_score(df['true_tau'], df['predicted_tau'])

print(f"CATE预测MSE: {mse:.4f}")
print(f"CATE预测R²: {r2:.4f}")

# 可视化:真实 vs 预测的CATE
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 左图:真实CATE
scatter1 = axes[0].scatter(df['X1'], df['X2'], c=df['true_tau'],
                           cmap='RdYlGn', s=20, alpha=0.6)
axes[0].set_xlabel('X1', fontsize=12)
axes[0].set_ylabel('X2', fontsize=12)
axes[0].set_title('真实CATE: τ(x) = 2X₁ + 5X₂', fontsize=13, fontweight='bold')
plt.colorbar(scatter1, ax=axes[0], label='处理效应')

# 右图:预测CATE
scatter2 = axes[1].scatter(df['X1'], df['X2'], c=df['predicted_tau'],
                           cmap='RdYlGn', s=20, alpha=0.6)
axes[1].set_xlabel('X1', fontsize=12)
axes[1].set_ylabel('X2', fontsize=12)
axes[1].set_title(f'诚实因果树预测CATE (R² = {r2:.3f})',
                  fontsize=13, fontweight='bold')
plt.colorbar(scatter2, ax=axes[1], label='处理效应')

plt.tight_layout()
plt.show()

输出解读

  • 诚实分割确保估计的处理效应是无偏的
  • R²表示模型捕捉异质性的能力
  • 可视化显示树如何在特征空间中划分不同的处理效应区域

从Causal Tree到Causal Forest

Wager & Athey (2018) 的核心贡献

Random Forest的优势

  1. 降低方差(通过平均多棵树)
  2. 提高稳定性

Causal Forest = Honest Causal Trees + Random Forest

算法:Causal Forest

输入

  • 数据
  • 树的数量 (通常1000-5000)

步骤

For

  1. Bootstrap抽样:从 个样本中有放回抽取 个样本,记为

  2. 样本分割:将 随机分为两半

    • :用于建树
    • :用于估计
  3. 随机特征:每次分裂时,随机选择 个特征(

  4. 构建诚实因果树

    • 选择分裂点
    • 估计叶子节点效应

预测:对于新样本

关键参数

参数含义推荐值影响
n_estimators (B)树的数量2000-5000越多越稳定,但计算慢
max_depth树的最大深度不限制太深过拟合,太浅欠拟合
min_samples_leaf叶子节点最小样本量5-20太小方差大,太大偏差大
max_features每次分裂考虑的特征数增加随机性
honest是否诚实分割True必须True!

渐近理论:为什么Causal Forest有效?

Wager & Athey (2018) 的理论结果

定理1:渐近正态性

在正则性条件下,当 时:

其中 可以被一致估计。

含义

  1. 一致的(consistent):
  2. 可以构建置信区间
  3. 可以进行假设检验

定理2:收敛速度

在适当的光滑性条件下:

对比

  • 标准非参数方法:(维度诅咒)
  • Causal Forest:(不受维度影响!)

这是自适应的结果:树会自动找到重要的特征。

置信区间的构造

问题:如何量化不确定性?

Wager & Athey的方法

步骤1:估计方差

步骤2:构造置信区间

其中 是有效样本量(考虑到树之间的相关性)。

Python实现(使用econml):

python
from econml.grf import CausalForest
import numpy as np
import matplotlib.pyplot as plt

# 使用之前生成的数据
X = df[['X1', 'X2']].values
D = df['D'].values.reshape(-1, 1)
Y = df['Y'].values

# 训练Causal Forest(econml实现)
cf = CausalForest(
    n_estimators=2000,
    min_samples_leaf=10,
    max_depth=None,
    random_state=42,
    verbose=0
)

cf.fit(X, D, Y)

# 预测CATE和置信区间
tau_pred = cf.predict(X)
tau_pred_interval = cf.predict_interval(X, alpha=0.05)  # 95% CI

print("=" * 70)
print("Causal Forest (econml) 结果")
print("=" * 70)
print(f"预测的平均CATE: {tau_pred.mean():.4f}")
print(f"真实的平均CATE: {df['true_tau'].mean():.4f}")
print(f"\nCate预测MSE: {mean_squared_error(df['true_tau'], tau_pred):.4f}")
print(f"CATE预测R²: {r2_score(df['true_tau'], tau_pred):.4f}")

# 可视化:CATE预测 + 置信区间
fig, ax = plt.subplots(figsize=(12, 6))

# 按真实CATE排序
sorted_idx = np.argsort(df['true_tau'].values)
x_plot = np.arange(len(sorted_idx))

ax.plot(x_plot, df['true_tau'].values[sorted_idx],
        'g-', linewidth=2, label='真实CATE', alpha=0.7)
ax.plot(x_plot, tau_pred[sorted_idx],
        'b-', linewidth=1.5, label='预测CATE', alpha=0.8)
ax.fill_between(x_plot,
                tau_pred_interval[0][sorted_idx],
                tau_pred_interval[1][sorted_idx],
                alpha=0.2, color='blue', label='95% 置信区间')

ax.set_xlabel('样本(按真实CATE排序)', fontsize=12)
ax.set_ylabel('处理效应', fontsize=12)
ax.set_title('Causal Forest: CATE预测与置信区间', fontsize=14, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 统计覆盖率
coverage = np.mean(
    (df['true_tau'].values >= tau_pred_interval[0]) &
    (df['true_tau'].values <= tau_pred_interval[1])
)
print(f"\n置信区间覆盖率: {coverage:.2%}(理论值应接近95%)")

与其他方法的比较

Meta-learners家族

1. S-learner(Single Model)

思路:用一个模型预测 ,把 作为特征

优点:简单 缺点:处理效应可能被 的主效应淹没

2. T-learner(Two Model)

思路:分别对处理组和对照组建模

优点:允许完全的异质性 缺点:两个模型之间没有信息共享

3. X-learner(Künzel et al., 2019)

思路:结合S和T的优点

步骤1:估计 (如T-learner)

步骤2:计算"伪处理效应"

步骤3:对伪处理效应建模

步骤4:加权平均

其中 (倾向得分)

4. Causal Forest

思路:直接优化处理效应估计

对比表

方法偏差控制方差控制不平衡数据高维特征
S-learner
T-learner️ 一般
X-learner️ 一般
Causal Forest

Python对比实验

python
from sklearn.ensemble import RandomForestRegressor
from econml.metalearners import TLearner, SLearner, XLearner
from econml.grf import CausalForest

# 使用之前的模拟数据
X = df[['X1', 'X2']].values
D = df['D'].values
Y = df['Y'].values

# 1. S-learner
s_learner = SLearner(overall_model=RandomForestRegressor(n_estimators=100, random_state=42))
s_learner.fit(Y, D, X=X)
tau_s = s_learner.effect(X)

# 2. T-learner
t_learner = TLearner(models=RandomForestRegressor(n_estimators=100, random_state=42))
t_learner.fit(Y, D, X=X)
tau_t = t_learner.effect(X)

# 3. X-learner
x_learner = XLearner(
    models=RandomForestRegressor(n_estimators=100, random_state=42),
    propensity_model=RandomForestRegressor(n_estimators=100, random_state=42)
)
x_learner.fit(Y, D, X=X)
tau_x = x_learner.effect(X)

# 4. Causal Forest
cf = CausalForest(n_estimators=2000, random_state=42)
cf.fit(X, D.reshape(-1, 1), Y)
tau_cf = cf.predict(X)

# 比较性能
methods = ['S-learner', 'T-learner', 'X-learner', 'Causal Forest']
predictions = [tau_s, tau_t, tau_x, tau_cf]

print("=" * 70)
print("Meta-learners 性能对比")
print("=" * 70)

results_comparison = []
for method, pred in zip(methods, predictions):
    mse = mean_squared_error(df['true_tau'], pred)
    r2 = r2_score(df['true_tau'], pred)
    bias = np.mean(pred - df['true_tau'])

    results_comparison.append({
        '方法': method,
        'MSE': mse,
        'R²': r2,
        '偏差': bias
    })

comparison_df = pd.DataFrame(results_comparison)
print(comparison_df.to_string(index=False))

# 可视化对比
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes = axes.ravel()

for idx, (method, pred) in enumerate(zip(methods, predictions)):
    ax = axes[idx]
    ax.scatter(df['true_tau'], pred, alpha=0.3, s=10)
    ax.plot([df['true_tau'].min(), df['true_tau'].max()],
            [df['true_tau'].min(), df['true_tau'].max()],
            'r--', linewidth=2)

    r2 = r2_score(df['true_tau'], pred)
    ax.set_xlabel('真实CATE', fontsize=11)
    ax.set_ylabel('预测CATE', fontsize=11)
    ax.set_title(f'{method} (R² = {r2:.3f})', fontsize=12, fontweight='bold')
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

本节小结

核心要点

  1. CART → Causal Tree

    • 目标从预测 变为估计
    • 分裂准则从最小化MSE变为最大化处理效应异质性
  2. 诚实分割的必要性

    • 解决过拟合偏差
    • 确保估计的无偏性
    • 样本分为训练集(建树)和估计集(估计)
  3. Causal Forest = Honest Causal Trees + Random Forest

    • Bootstrap + 随机特征 → 降低方差
    • 诚实分割 → 控制偏差
    • 渐近正态性 → 允许统计推断
  4. 理论保证

    • 一致性:
    • 渐近正态性:可以构造置信区间
    • 收敛速度:,不受维度影响
  5. 与Meta-learners对比

    • S-learner:简单但可能有偏差
    • T-learner:灵活但方差大
    • X-learner:平衡偏差和方差
    • Causal Forest:直接优化处理效应,综合性能最好

关键公式

CATE估计

置信区间

Python工具

python
# 主要库
from econml.grf import CausalForest  # 推荐!
from econml.metalearners import TLearner, SLearner, XLearner
from causalml.inference.tree import UpliftRandomForestClassifier

# 基本用法
cf = CausalForest(n_estimators=2000, min_samples_leaf=10)
cf.fit(X, T, Y)
tau = cf.predict(X)
tau_interval = cf.predict_interval(X, alpha=0.05)

下一节预告

第3节:异质性处理效应的估计与推断中,我们将深入探讨:

  • 如何估计和解释CATE
  • 如何进行统计推断和假设检验
  • 政策学习(Policy Learning):基于估计的CATE制定最优决策规则
  • 异质性的价值:量化个性化决策的收益

从理论到实践:让我们用Causal Forest发现真正的异质性!

基于 MIT 许可证发布。内容版权归作者所有。