13.3 倾向得分匹配 (Propensity Score Matching)
"The propensity score is the probability of treatment assignment conditional on observed baseline characteristics.""倾向得分是基于观察到的基线特征的处理分配概率。"— Paul Rosenbaum & Donald Rubin, 1983
Rosenbaum & Rubin的降维奇迹:从高维匹配到单一标量
本节目标
- 深刻理解条件独立性假设(CIA)和倾向得分定理
- 掌握PSM、IPW、Doubly Robust三大方法
- 进行平衡性检验和敏感性分析
- 完整复现LaLonde (1986)就业培训评估
- 使用Python实现(sklearn + causalml + econml)
- 理解倾向得分方法的优势与局限
引言:匹配的维度诅咒
观察性数据的根本问题
场景: 评估就业培训项目的效应
理想: RCT - 随机分配谁接受培训
现实: 观察性数据 - 人们自己选择是否参加
选择性偏差(Selection Bias):
问题: 参加培训的人本来就可能与不参加的人不同!
匹配的直觉
核心思想: 找到"可比"的对照个体
精确匹配(Exact Matching):
对于每个处理个体,找到特征完全相同的对照个体:
维度诅咒: 如果有个连续变量,找到精确匹配几乎不可能!
例子:
- 年龄: 25岁 vs 25.5岁?
- 收入: $30,000 vs $30,100?
- 教育: 12年 vs 12.5年?
问题: 随着,精确匹配个体数
倾向得分定理:降维的魔法
Rosenbaum & Rubin (1983)的伟大贡献
定义: 倾向得分(Propensity Score)
直觉: 给定特征,个体接受处理的概率。
Propensity Score Theorem:
定理1: 如果
则
惊人结论:
不需要在高维上匹配,只需要在单一标量上匹配!
证明草图:
最后一步用了: 是的函数,给定,的分布是确定的。
定理2 (平衡性):
含义: 给定倾向得分,处理组和对照组的协变量分布相同(平衡)!
核心假设
假设1: 条件独立性(CIA)
强无混淆假设(Strongly Ignorable Treatment Assignment):
白话: 给定观测到的协变量,处理分配与潜在结果独立。
等价表述:
含义: 控制后,没有未观测的混淆变量。
关键: CIA是不可检验的假设!依赖领域知识。
假设2: 共同支撑(Common Support)
含义:
- 每个个体都有正概率接受或不接受处理
- 排除"确定性处理"的情况
检查: 倾向得分分布重叠图
plt.hist(e[D==1], alpha=0.5, label='处理组', bins=30)
plt.hist(e[D==0], alpha=0.5, label='对照组', bins=30)
plt.xlabel('倾向得分')
plt.ylabel('频数')
plt.legend()如果违反:
- 处理组和对照组在某些区域完全不重叠
- 匹配/加权可能失效
方法1: 倾向得分匹配(PSM)
算法流程
步骤1: 估计倾向得分
使用逻辑回归(Logit):
或Probit:
Python:
from sklearn.linear_model import LogisticRegression
# 估计倾向得分
ps_model = LogisticRegression(penalty='l2', C=1.0)
ps_model.fit(X, D)
e_hat = ps_model.predict_proba(X)[:, 1]步骤2: 匹配
对于每个处理个体,找到倾向得分最接近的对照个体:
匹配方法:
A. 最近邻匹配(Nearest Neighbor):
- 1:1匹配(每个处理个体配1个对照)
- 1:匹配(每个处理个体配个对照)
B. 卡尺匹配(Caliper Matching):
其中是卡尺宽度(如)
C. 核匹配(Kernel Matching):
其中是核函数(如Epanechnikov核),是带宽。
步骤3: 估计ATT
其中是匹配得到的反事实结果。
Python实现:PSM
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import NearestNeighbors
def estimate_psm(X, D, Y, n_neighbors=1, caliper=None):
"""
倾向得分匹配(PSM)
参数:
------
X: (N, K) 协变量
D: (N,) 处理指示器
Y: (N,) 结果变量
n_neighbors: 匹配邻居数
caliper: 卡尺宽度(None表示无卡尺)
返回:
------
ATT: 处理组平均处理效应
matched_data: 匹配后数据
"""
# 第1步:估计倾向得分
ps_model = LogisticRegression(max_iter=1000)
ps_model.fit(X, D)
e_hat = ps_model.predict_proba(X)[:, 1]
# 第2步:匹配
treated_idx = np.where(D == 1)[0]
control_idx = np.where(D == 0)[0]
# 用最近邻匹配
nbrs = NearestNeighbors(n_neighbors=n_neighbors, metric='euclidean')
nbrs.fit(e_hat[control_idx].reshape(-1, 1))
distances, indices = nbrs.kneighbors(e_hat[treated_idx].reshape(-1, 1))
# 应用卡尺
matched_pairs = []
for i, (dist, idx) in enumerate(zip(distances, indices)):
for d, j in zip(dist, idx):
if caliper is None or d < caliper:
matched_pairs.append({
'treated_idx': treated_idx[i],
'control_idx': control_idx[j],
'distance': d
})
# 第3步:计算ATT
ATT_list = []
for pair in matched_pairs:
i_t = pair['treated_idx']
i_c = pair['control_idx']
ATT_list.append(Y[i_t] - Y[i_c])
ATT = np.mean(ATT_list)
# 构造匹配后数据
matched_data = pd.DataFrame({
'treated_idx': [p['treated_idx'] for p in matched_pairs],
'control_idx': [p['control_idx'] for p in matched_pairs],
'Y_treated': [Y[p['treated_idx']] for p in matched_pairs],
'Y_control': [Y[p['control_idx']] for p in matched_pairs],
'effect': ATT_list
})
return ATT, matched_data, e_hat
# 示例
np.random.seed(42)
N = 1000
X = np.random.randn(N, 5)
true_e = 1 / (1 + np.exp(-X[:, 0] - 0.5*X[:, 1]))
D = (np.random.rand(N) < true_e).astype(int)
Y0 = X[:, 0] + X[:, 1] + np.random.randn(N)
Y1 = Y0 + 2 # 真实ATE=2
Y = D * Y1 + (1-D) * Y0
ATT, matched_data, e_hat = estimate_psm(X, D, Y, n_neighbors=1)
print(f"PSM估计的ATT: {ATT:.3f}")方法2: 逆概率加权(IPW)
核心思想
Horvitz-Thompson估计量 (1952):
不是匹配,而是加权:
直觉:
- 处理组: 倾向得分低的个体权重高(因为稀有)
- 对照组: 倾向得分高的个体权重高
为什么有效?
类似地:
所以:
稳定IPW
问题: 当接近0或1时,权重或爆炸!
解决: 稳定权重(Stabilized Weights)
估计量:
权重截断
Truncation: 限制极端权重
或使用对称截断:
Python实现:IPW
def estimate_ipw(X, D, Y, stabilized=True, trim=None):
"""
逆概率加权(IPW)
参数:
------
X: (N, K) 协变量
D: (N,) 处理指示器
Y: (N,) 结果变量
stabilized: 是否使用稳定权重
trim: 倾向得分截断阈值,如(0.1, 0.9)
返回:
------
ATE: 平均处理效应
weights: 权重向量
"""
N = len(D)
# 估计倾向得分
ps_model = LogisticRegression(max_iter=1000)
ps_model.fit(X, D)
e_hat = ps_model.predict_proba(X)[:, 1]
# 截断
if trim is not None:
e_hat = np.clip(e_hat, trim[0], trim[1])
# 计算权重
if stabilized:
p_D = D.mean()
w = np.where(D == 1, p_D / e_hat, (1 - p_D) / (1 - e_hat))
else:
w = np.where(D == 1, 1 / e_hat, 1 / (1 - e_hat))
# 估计ATE
if stabilized:
ATE = (np.sum(w[D==1] * Y[D==1]) / np.sum(w[D==1]) -
np.sum(w[D==0] * Y[D==0]) / np.sum(w[D==0]))
else:
ATE = np.mean(w * (D * Y - (1-D) * Y))
return ATE, w
# 使用
ATE_ipw, weights = estimate_ipw(X, D, Y, stabilized=True, trim=(0.1, 0.9))
print(f"IPW估计的ATE: {ATE_ipw:.3f}")方法3: 双重稳健估计(Doubly Robust)
核心思想
结合: 结果回归(Outcome Regression) + 倾向得分加权
估计量 (Bang & Robins 2005):
其中:
- : 处理组结果模型
- : 对照组结果模型
神奇性质: Double Robustness
定理: 只要或之一正确,就一致!
证明直觉:
情况1: 如果正确
所以IPW部分消失,剩下
情况2: 如果正确
IPW部分给出一致估计,即使错误。
优势: "双重保险"!
Python实现:Doubly Robust
from sklearn.ensemble import RandomForestRegressor
def estimate_doubly_robust(X, D, Y, trim=(0.1, 0.9)):
"""
双重稳健估计
参数:
------
X: (N, K) 协变量
D: (N,) 处理指示器
Y: (N,) 结果变量
trim: 倾向得分截断
返回:
------
ATE: 平均处理效应
"""
N = len(D)
# 第1步:估计倾向得分
ps_model = LogisticRegression(max_iter=1000)
ps_model.fit(X, D)
e_hat = np.clip(ps_model.predict_proba(X)[:, 1], trim[0], trim[1])
# 第2步:估计结果模型
# 处理组
model_1 = RandomForestRegressor(n_estimators=100, random_state=42)
model_1.fit(X[D==1], Y[D==1])
mu_1 = model_1.predict(X)
# 对照组
model_0 = RandomForestRegressor(n_estimators=100, random_state=42)
model_0.fit(X[D==0], Y[D==0])
mu_0 = model_0.predict(X)
# 第3步:双重稳健估计
term1 = D * (Y - mu_1) / e_hat
term2 = (1 - D) * (Y - mu_0) / (1 - e_hat)
term3 = mu_1 - mu_0
ATE = np.mean(term1 - term2 + term3)
return ATE, {'e_hat': e_hat, 'mu_1': mu_1, 'mu_0': mu_0}
# 使用
ATE_dr, components = estimate_doubly_robust(X, D, Y)
print(f"双重稳健估计的ATE: {ATE_dr:.3f}")平衡性检验
为什么重要?
倾向得分定理:
实践: 需要检验匹配/加权后协变量是否平衡!
标准化均值差(SMD)
定义:
其中:
- : 处理组第个协变量均值
- : 对照组第个协变量均值
- : 方差
阈值:
- 优:
- 可接受:
- 差:
Love Plot
可视化: 所有协变量的SMD
def balance_table(X, D, w=None, varnames=None):
"""
计算平衡性统计
参数:
------
X: (N, K) 协变量
D: (N,) 处理指示器
w: (N,) 权重(None表示无加权)
varnames: 变量名列表
返回:
------
df: 平衡性统计表
"""
K = X.shape[1]
if varnames is None:
varnames = [f'X{k+1}' for k in range(K)]
results = []
for k in range(K):
x = X[:, k]
if w is None:
# 无加权
mean_t = x[D==1].mean()
mean_c = x[D==0].mean()
std_t = x[D==1].std()
std_c = x[D==0].std()
else:
# 加权
mean_t = np.average(x[D==1], weights=w[D==1])
mean_c = np.average(x[D==0], weights=w[D==0])
std_t = np.sqrt(np.average((x[D==1] - mean_t)**2, weights=w[D==1]))
std_c = np.sqrt(np.average((x[D==0] - mean_c)**2, weights=w[D==0]))
# SMD
smd = (mean_t - mean_c) / np.sqrt((std_t**2 + std_c**2) / 2) * 100
results.append({
'Variable': varnames[k],
'Mean_Treated': mean_t,
'Mean_Control': mean_c,
'SMD': smd
})
return pd.DataFrame(results)
def plot_love(balance_before, balance_after):
"""绘制Love Plot"""
fig, ax = plt.subplots(figsize=(10, 8))
y = np.arange(len(balance_before))
ax.scatter(balance_before['SMD'], y, s=100, marker='o',
color='red', label='匹配前', alpha=0.7)
ax.scatter(balance_after['SMD'], y, s=100, marker='s',
color='blue', label='匹配后', alpha=0.7)
ax.axvline(x=-10, color='gray', linestyle='--', linewidth=1, alpha=0.5)
ax.axvline(x=10, color='gray', linestyle='--', linewidth=1, alpha=0.5)
ax.axvline(x=0, color='black', linestyle='-', linewidth=1)
ax.set_yticks(y)
ax.set_yticklabels(balance_before['Variable'])
ax.set_xlabel('标准化均值差 (%)', fontsize=12, fontweight='bold')
ax.set_title('Love Plot: 协变量平衡性', fontsize=14, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(True, axis='x', alpha=0.3)
plt.tight_layout()
return fig
# 使用
balance_before = balance_table(X, D, w=None)
balance_after = balance_table(X, D, w=weights)
print("匹配前:")
print(balance_before)
print("\n匹配后:")
print(balance_after)
fig = plot_love(balance_before, balance_after)
plt.show()敏感性分析
Rosenbaum Bounds
问题: CIA假设无法检验!
问题: 如果存在未观测混淆,结论如何改变?
Rosenbaum (2002) 的敏感性分析:
设定: 两个匹配个体,倾向得分的几率比:
解释:
- : 完美匹配(无隐藏偏差)
- : 匹配个体的接受处理几率相差2倍
- : 完全的选择性偏差
分析: 对于不同的,计算效应的上下界
Python:
def rosenbaum_bounds(matched_data, gamma_range):
"""
Rosenbaum边界敏感性分析
参数:
------
matched_data: 匹配数据(包含treated和control结果)
gamma_range: Gamma值列表
返回:
------
bounds: 上下界
"""
effects = matched_data['Y_treated'] - matched_data['Y_control']
n = len(effects)
results = []
for gamma in gamma_range:
# 计算Wilcoxon统计量的上下界
# (简化版,完整版需要更复杂的计算)
p_plus = gamma / (1 + gamma)
p_minus = 1 / (1 + gamma)
# 上界(最不利情况)
upper = np.mean(effects) + np.std(effects) / np.sqrt(n) * 1.96
# 下界
lower = np.mean(effects) - np.std(effects) / np.sqrt(n) * 1.96
results.append({
'Gamma': gamma,
'Lower': lower,
'Upper': upper
})
return pd.DataFrame(results)
# 使用
gammas = [1.0, 1.5, 2.0, 2.5, 3.0]
bounds = rosenbaum_bounds(matched_data, gammas)
print(bounds)经典案例:LaLonde (1986)
背景
Robert LaLonde (1986): "Evaluating the Econometric Evaluations of Training Programs"
研究问题: 非实验方法能复现实验结果吗?
NSW (National Supported Work Demonstration): 就业培训项目RCT
- 处理组: 185人接受培训
- 对照组: 260人(实验对照)
LaLonde的贡献: 用非实验数据(PSID, CPS)替换实验对照组,检验PSM等方法。
数据
结果变量: 1978年收入(培训后)
协变量:
age: 年龄education: 受教育年限black: 是否黑人hispanic: 是否西裔married: 是否已婚nodegree: 是否无高中文凭re74: 1974年收入(培训前2年)re75: 1975年收入(培训前1年)
真实效应(来自RCT):
Python实现:完整分析
# 加载数据(模拟)
np.random.seed(123)
n_treated = 185
n_control = 2490 # PSID对照
# 处理组(类似NSW参与者)
X_t = pd.DataFrame({
'age': np.random.normal(25, 7, n_treated),
'education': np.random.normal(10, 2, n_treated),
'black': np.random.binomial(1, 0.8, n_treated),
'hispanic': np.random.binomial(1, 0.1, n_treated),
'married': np.random.binomial(1, 0.2, n_treated),
'nodegree': np.random.binomial(1, 0.7, n_treated),
're74': np.random.gamma(2, 1500, n_treated),
're75': np.random.gamma(2, 1500, n_treated)
})
# 对照组(来自PSID,更高收入/教育)
X_c = pd.DataFrame({
'age': np.random.normal(35, 10, n_control),
'education': np.random.normal(12, 3, n_control),
'black': np.random.binomial(1, 0.25, n_control),
'hispanic': np.random.binomial(1, 0.05, n_control),
'married': np.random.binomial(1, 0.7, n_control),
'nodegree': np.random.binomial(1, 0.3, n_control),
're74': np.random.gamma(5, 3000, n_control),
're75': np.random.gamma(5, 3000, n_control)
})
X = pd.concat([X_t, X_c], ignore_index=True)
D = np.array([1]*n_treated + [0]*n_control)
# 生成结果(模拟真实效应$1794)
true_effect = 1794
Y_c = X['re75'] * 1.1 + X['education'] * 500 + np.random.normal(0, 3000, len(X))
Y_t = Y_c + true_effect
Y = D * Y_t + (1-D) * Y_c
# =================================================================
# 分析1:简单均值差(有偏!)
# =================================================================
naive = Y[D==1].mean() - Y[D==0].mean()
print(f"简单均值差(Naive): ${naive:.0f}")
# =================================================================
# 分析2:线性回归
# =================================================================
import statsmodels.formula.api as smf
data = X.copy()
data['D'] = D
data['Y'] = Y
ols = smf.ols('Y ~ D + age + education + black + hispanic + married + nodegree + re74 + re75', data=data).fit()
print(f"\n线性回归估计: ${ols.params['D']:.0f}")
# =================================================================
# 分析3:PSM
# =================================================================
ATT_psm, matched_data, e_hat = estimate_psm(X.values, D, Y, n_neighbors=1, caliper=0.1)
print(f"\nPSM估计(1:1匹配): ${ATT_psm:.0f}")
# =================================================================
# 分析4:IPW
# =================================================================
ATE_ipw, weights = estimate_ipw(X.values, D, Y, stabilized=True, trim=(0.1, 0.9))
print(f"IPW估计(稳定权重): ${ATE_ipw:.0f}")
# =================================================================
# 分析5:Doubly Robust
# =================================================================
ATE_dr, _ = estimate_doubly_robust(X.values, D, Y, trim=(0.1, 0.9))
print(f"双重稳健估计: ${ATE_dr:.0f}")
print(f"\n真实效应(RCT): ${true_effect:.0f}")
# =================================================================
# 平衡性检验
# =================================================================
balance_before = balance_table(X.values, D, w=None, varnames=X.columns)
balance_after_ipw = balance_table(X.values, D, w=weights, varnames=X.columns)
print("\n平衡性检验:")
print(balance_before[['Variable', 'SMD']])
print("\nIPW后:")
print(balance_after_ipw[['Variable', 'SMD']])️ 常见陷阱
陷阱1:忽略共同支撑
问题: 处理组和对照组倾向得分不重叠
后果: 外推,结果不可靠
诊断:
plt.figure(figsize=(10, 4))
plt.hist(e_hat[D==1], bins=30, alpha=0.5, label='处理组', density=True)
plt.hist(e_hat[D==0], bins=30, alpha=0.5, label='对照组', density=True)
plt.xlabel('倾向得分')
plt.ylabel('密度')
plt.legend()
plt.title('倾向得分分布:检查共同支撑')
plt.show()解决: 剔除不重叠区域的样本
陷阱2:忽略平衡性检验
错误: 只报告估计值,不检查协变量平衡
正确: 报告Love Plot和SMD表
陷阱3:过度自信
问题: CIA是强假设,无法检验!
缓解:
- 敏感性分析
- 报告多种方法
- 谨慎解释
陷阱4:模型依赖
PSM: 依赖倾向得分模型 IPW: 对极端权重敏感 DR: 虽然双重稳健,但两个模型都错则失效
建议: 使用机器学习估计和(如Random Forest)
高级主题
使用CausalML库
from causalml.propensity import ElasticNetPropensityModel
from causalml.match import NearestNeighborMatch
# 估计倾向得分
pm = ElasticNetPropensityModel()
pm.fit(X.values, D)
e_hat_causal = pm.predict(X.values)
# 匹配
matcher = NearestNeighborMatch(caliper=0.1, replace=False)
matched = matcher.match(data={'X': X.values, 'treatment': D, 'outcome': Y})
print(f"CausalML PSM ATT: ${matched['ATT']:.0f}")使用EconML库
from econml.metalearners import XLearner
from econml.dr import DRLearner
# X-learner
xl = XLearner(models=RandomForestRegressor())
xl.fit(Y, D, X=X.values)
ate_xl = xl.ate(X.values)
print(f"EconML X-Learner ATE: ${ate_xl:.0f}")
# DR-learner
dr = DRLearner(model_propensity=LogisticRegression(),
model_regression=RandomForestRegressor())
dr.fit(Y, D, X=X.values)
ate_dr_econml = dr.effect(X.values).mean()
print(f"EconML DR-Learner ATE: ${ate_dr_econml:.0f}")本节小结
核心要点
1. 倾向得分定理: 降维奇迹 - 从高维到单维
2. 三大方法:
- PSM: 基于匹配,估计ATT
- IPW: 加权创造"伪RCT",估计ATE
- Doubly Robust: 结合两者,双重保险
3. 核心假设:
- CIA(条件独立性): 不可检验!
- 共同支撑: 可检验,必须满足
4. 必做检验:
- 平衡性检验(Love Plot)
- 敏感性分析(Rosenbaum Bounds)
关键公式
倾向得分:
PSM-ATT:
IPW-ATE:
Doubly Robust:
Python工具
- sklearn:
LogisticRegression,NearestNeighbors - causalml:
ElasticNetPropensityModel,NearestNeighborMatch - econml:
DRLearner, Meta-learners - statsmodels: 回归基准
下一节: 13.4 异质性处理效应与Meta-learners - 从ATE到CATE
倾向得分:匹配的降维艺术!