7.4 ARIMA 模型(AutoRegressive Integrated Moving Average)
"Remember that all models are wrong; the practical question is how wrong do they have to be to not be useful.""记住,所有模型都是错的;实际问题是它们要错到什么程度才会变得无用。"— George E. P. Box, Statistician (统计学家)
时间序列预测的经典方法:从理论到实践
本节目标
完成本节后,你将能够:
- 理解 AR、MA、ARMA、ARIMA 模型的数学原理
- 掌握模型识别方法(ACF/PACF 分析)
- 使用 Box-Jenkins 方法进行模型选择
- 实施 ARIMA 模型估计和诊断
- 进行时间序列预测和预测区间计算
- 掌握 SARIMA(季节性 ARIMA)
- 使用 Python 完整实现 ARIMA 建模流程
ARIMA 家族概览
模型家族树
ARIMA 家族
├── AR(p) - 自回归模型
│ └── 当前值依赖过去 p 个值
│
├── MA(q) - 移动平均模型
│ └── 当前值依赖过去 q 个误差
│
├── ARMA(p,q) - 自回归移动平均
│ └── AR + MA 的组合
│
├── ARIMA(p,d,q) - 差分后的 ARMA
│ └── d 次差分 + ARMA(p,q)
│
└── SARIMA(p,d,q)(P,D,Q)s - 季节性 ARIMA
└── ARIMA + 季节性成分为什么需要 ARIMA?
时间序列的特点:
- 当前值与过去值相关(自相关)
- OLS 回归假设独立性,不适用
ARIMA 的优势:
- 显式建模时间依赖结构
- 灵活适应不同数据特征
- 预测能力强
AR 模型(Autoregressive Model)
AR(p) 模型定义
数学表达:
其中:
- :当前值
- :自回归系数
- :常数项
- :白噪声
滞后算子表示:
其中 是滞后算子:
AR(1) 模型详解
最简单的 AR 模型:
平稳性条件:
自相关函数(ACF):
特征:指数衰减
Python 实现:AR 模型
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
# 设置
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")
np.random.seed(42)
# 生成 AR(1) 数据
def generate_ar1(phi, n=200, c=0):
"""生成 AR(1) 过程"""
y = np.zeros(n)
epsilon = np.random.normal(0, 1, n)
y[0] = epsilon[0]
for t in range(1, n):
y[t] = c + phi * y[t-1] + epsilon[t]
return y
# 不同 φ 值的 AR(1)
phis = [0.3, 0.7, 0.95, -0.7]
fig, axes = plt.subplots(len(phis), 3, figsize=(16, 12))
for i, phi in enumerate(phis):
y = generate_ar1(phi)
# 时间序列图
axes[i, 0].plot(y, linewidth=1.5)
axes[i, 0].set_title(f'AR(1): φ={phi}', fontsize=12, fontweight='bold')
axes[i, 0].set_ylabel('值')
axes[i, 0].grid(True, alpha=0.3)
# ACF
plot_acf(y, lags=20, ax=axes[i, 1], alpha=0.05)
axes[i, 1].set_title(f'ACF (φ={phi})', fontsize=12, fontweight='bold')
# PACF
plot_pacf(y, lags=20, ax=axes[i, 2], alpha=0.05, method='ywm')
axes[i, 2].set_title(f'PACF (φ={phi})', fontsize=12, fontweight='bold')
axes[-1, 0].set_xlabel('时间')
axes[-1, 1].set_xlabel('滞后阶数')
axes[-1, 2].set_xlabel('滞后阶数')
plt.tight_layout()
plt.savefig('ar1_different_phi.png', dpi=300, bbox_inches='tight')
plt.show()
print("="*70)
print("AR(1) 模型特征")
print("="*70)
print("观察:")
print(" 1. ACF: 指数衰减(φ > 0)或交替衰减(φ < 0)")
print(" 2. PACF: 滞后 1 阶显著,其余不显著")
print(" 3. 这是识别 AR(1) 的关键特征!")AR(p) 模型识别
PACF 截尾法则:
- AR(p) 的 PACF 在滞后 p 阶后截尾
- ACF 缓慢衰减
| 模型 | ACF | PACF |
|---|---|---|
| AR(1) | 指数衰减 | 滞后1显著,其余不显著 |
| AR(2) | 指数/正弦衰减 | 滞后1,2显著,其余不显著 |
| AR(p) | 逐渐衰减 | 滞后p显著,其余不显著 |
MA 模型(Moving Average Model)
MA(q) 模型定义
数学表达:
其中:
- :移动平均系数
- :白噪声
滞后算子表示:
MA(1) 模型详解
可逆性条件:
自相关函数(ACF):
特征:ACF 截尾
Python 实现:MA 模型
def generate_ma1(theta, n=200, mu=0):
"""生成 MA(1) 过程"""
epsilon = np.random.normal(0, 1, n+1)
y = np.zeros(n)
for t in range(n):
y[t] = mu + epsilon[t] + theta * epsilon[t-1] if t > 0 else mu + epsilon[t]
return y
# 不同 θ 值的 MA(1)
thetas = [0.3, 0.7, 0.95, -0.7]
fig, axes = plt.subplots(len(thetas), 3, figsize=(16, 12))
for i, theta in enumerate(thetas):
y = generate_ma1(theta)
# 时间序列图
axes[i, 0].plot(y, linewidth=1.5)
axes[i, 0].set_title(f'MA(1): θ={theta}', fontsize=12, fontweight='bold')
axes[i, 0].set_ylabel('值')
axes[i, 0].grid(True, alpha=0.3)
# ACF
plot_acf(y, lags=20, ax=axes[i, 1], alpha=0.05)
axes[i, 1].set_title(f'ACF (θ={theta})', fontsize=12, fontweight='bold')
# PACF
plot_pacf(y, lags=20, ax=axes[i, 2], alpha=0.05, method='ywm')
axes[i, 2].set_title(f'PACF (θ={theta})', fontsize=12, fontweight='bold')
axes[-1, 0].set_xlabel('时间')
axes[-1, 1].set_xlabel('滞后阶数')
axes[-1, 2].set_xlabel('滞后阶数')
plt.tight_layout()
plt.savefig('ma1_different_theta.png', dpi=300, bbox_inches='tight')
plt.show()
print("\n" + "="*70)
print("MA(1) 模型特征")
print("="*70)
print("观察:")
print(" 1. ACF: 滞后 1 阶显著,其余不显著(截尾)")
print(" 2. PACF: 逐渐衰减")
print(" 3. 这与 AR(1) 正好相反!")MA(q) 模型识别
ACF 截尾法则:
- MA(q) 的 ACF 在滞后 q 阶后截尾
- PACF 缓慢衰减
| 模型 | ACF | PACF |
|---|---|---|
| MA(1) | 滞后1显著,其余不显著 | 指数衰减 |
| MA(2) | 滞后1,2显著,其余不显著 | 指数/正弦衰减 |
| MA(q) | 滞后q显著,其余不显著 | 逐渐衰减 |
ARMA 模型(Autoregressive Moving Average)
ARMA(p,q) 模型定义
数学表达:
滞后算子形式:
其中:
ARMA 模型识别
ACF 和 PACF 特征:
| 模型 | ACF | PACF |
|---|---|---|
| AR(p) | 逐渐衰减 | p 阶后截尾 |
| MA(q) | q 阶后截尾 | 逐渐衰减 |
| ARMA(p,q) | 逐渐衰减 | 逐渐衰减 |
问题:ARMA 的 ACF 和 PACF 都不截尾,如何识别?
解决:
- 使用信息准则(AIC/BIC)
- 比较多个候选模型
- 诊断残差
ARIMA 模型(Integrated ARMA)
ARIMA(p,d,q) 模型定义
核心思想:对非平稳序列进行 d 次差分,然后建立 ARMA(p,q)
步骤:
- 差分 d 次:
- 对 建立 ARMA(p,q)
数学表达:
差分阶数 d 的确定
原则:
- :序列已平稳
- :序列有单位根(I(1))
- :序列有两个单位根(罕见)
方法:
- ADF 检验(见 7.2 节)
- 观察差分后的 ACF
- 避免过度差分
from statsmodels.tsa.stattools import adfuller, kpss
def determine_differencing(series, max_diff=2):
"""
确定差分阶数
"""
print("="*70)
print("确定差分阶数")
print("="*70)
current_series = series.copy()
for d in range(max_diff + 1):
# ADF 检验
adf_result = adfuller(current_series.dropna(), autolag='AIC')
# KPSS 检验
kpss_result = kpss(current_series.dropna(), regression='c', nlags='auto')
print(f"\n{d} 阶差分:")
print(f" ADF 统计量: {adf_result[0]:.4f}, p-value: {adf_result[1]:.4f}")
print(f" KPSS 统计量: {kpss_result[0]:.4f}, p-value: {kpss_result[1]:.4f}")
# 判断平稳性
adf_stationary = adf_result[1] < 0.05
kpss_stationary = kpss_result[1] > 0.05
if adf_stationary and kpss_stationary:
print(f" 结论: 序列平稳 → 建议 d={d}")
return d
# 进行差分
if d < max_diff:
current_series = current_series.diff().dropna()
print(f"\n建议 d={max_diff}")
return max_diff
# 示例:随机游走
np.random.seed(123)
random_walk = np.cumsum(np.random.randn(200))
ts_rw = pd.Series(random_walk)
d_optimal = determine_differencing(ts_rw)Box-Jenkins 建模流程
完整流程图
Box-Jenkins 方法
├── 1. 识别(Identification)
│ ├── 绘制时间序列图
│ ├── 平稳性检验(ADF)
│ ├── 确定差分阶数 d
│ ├── 绘制 ACF/PACF
│ └── 初步确定 p, q
│
├── 2. 估计(Estimation)
│ ├── 最大似然估计
│ ├── 拟合多个候选模型
│ └── 计算 AIC/BIC
│
├── 3. 诊断(Diagnostic Checking)
│ ├── 残差白噪声检验
│ ├── Ljung-Box 检验
│ ├── 残差正态性检验
│ └── 残差 ACF/PACF
│
└── 4. 预测(Forecasting)
├── 点预测
├── 预测区间
└── 预测评估Python 完整实现
class ARIMAModeler:
"""ARIMA 建模完整流程"""
def __init__(self, data):
self.data = data
self.model = None
self.results = None
def step1_identification(self):
"""步骤1: 识别"""
print("\n" + "="*70)
print("步骤 1: 模型识别")
print("="*70)
# 1.1 绘制时间序列
fig, axes = plt.subplots(3, 1, figsize=(14, 10))
axes[0].plot(self.data, linewidth=1.5)
axes[0].set_title('原始时间序列', fontsize=14, fontweight='bold')
axes[0].set_ylabel('值')
axes[0].grid(True, alpha=0.3)
# 1.2 ACF
plot_acf(self.data.dropna(), lags=40, ax=axes[1], alpha=0.05)
axes[1].set_title('自相关函数(ACF)', fontsize=12, fontweight='bold')
# 1.3 PACF
plot_pacf(self.data.dropna(), lags=40, ax=axes[2], alpha=0.05, method='ywm')
axes[2].set_title('偏自相关函数(PACF)', fontsize=12, fontweight='bold')
axes[2].set_xlabel('滞后阶数')
plt.tight_layout()
plt.show()
# 1.4 平稳性检验
d = determine_differencing(self.data)
return d
def step2_estimation(self, order_list):
"""步骤2: 估计多个候选模型"""
print("\n" + "="*70)
print("步骤 2: 模型估计")
print("="*70)
results_summary = []
for order in order_list:
p, d, q = order
try:
model = ARIMA(self.data, order=(p, d, q))
fitted = model.fit()
results_summary.append({
'order': f'ARIMA({p},{d},{q})',
'AIC': fitted.aic,
'BIC': fitted.bic,
'HQIC': fitted.hqic,
'Log-Likelihood': fitted.llf
})
print(f"ARIMA({p},{d},{q}): AIC={fitted.aic:.2f}, BIC={fitted.bic:.2f}")
except Exception as e:
print(f"ARIMA({p},{d},{q}): 估计失败 - {str(e)}")
# 转为 DataFrame
df_results = pd.DataFrame(results_summary)
df_results = df_results.sort_values('AIC')
print("\n" + "="*70)
print("模型比较(按 AIC 排序)")
print("="*70)
print(df_results.to_string(index=False))
# 选择最优模型
best_order = df_results.iloc[0]['order']
print(f"\n最优模型: {best_order}")
return df_results
def step3_diagnostics(self, model_fit):
"""步骤3: 诊断检验"""
print("\n" + "="*70)
print("步骤 3: 模型诊断")
print("="*70)
# 残差
residuals = model_fit.resid
# 3.1 残差统计
print(f"\n残差统计:")
print(f" 均值: {residuals.mean():.6f} (应接近0)")
print(f" 标准差: {residuals.std():.4f}")
print(f" 偏度: {residuals.skew():.4f} (应接近0)")
print(f" 峰度: {residuals.kurtosis():.4f} (应接近3)")
# 3.2 Ljung-Box 检验
lb_test = acorr_ljungbox(residuals, lags=20, return_df=True)
print(f"\nLjung-Box 检验(H0: 残差无自相关):")
print(f" 滞后20阶 p-value: {lb_test['lb_pvalue'].iloc[-1]:.4f}")
if lb_test['lb_pvalue'].iloc[-1] > 0.05:
print(" 结论: 残差无显著自相关 ")
else:
print(" 结论: 残差存在自相关 ")
# 3.3 可视化诊断
fig = model_fit.plot_diagnostics(figsize=(14, 10))
plt.tight_layout()
plt.show()
return residuals
def step4_forecast(self, model_fit, steps=12):
"""步骤4: 预测"""
print("\n" + "="*70)
print("步骤 4: 预测")
print("="*70)
# 预测
forecast_result = model_fit.get_forecast(steps=steps)
forecast_mean = forecast_result.predicted_mean
forecast_ci = forecast_result.conf_int(alpha=0.05)
# 可视化
fig, ax = plt.subplots(figsize=(14, 6))
# 历史数据
ax.plot(self.data.index, self.data.values, linewidth=1.5, label='历史数据')
# 预测值
forecast_index = pd.date_range(start=self.data.index[-1],
periods=steps+1, freq=self.data.index.freq)[1:]
ax.plot(forecast_index, forecast_mean, linewidth=2, color='red',
label='预测值', marker='o')
# 预测区间
ax.fill_between(forecast_index,
forecast_ci.iloc[:, 0],
forecast_ci.iloc[:, 1],
alpha=0.3, color='red', label='95% 预测区间')
ax.set_title('ARIMA 预测', fontsize=14, fontweight='bold')
ax.set_xlabel('时间')
ax.set_ylabel('值')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"\n未来 {steps} 期预测:")
for i, (date, value) in enumerate(zip(forecast_index, forecast_mean)):
print(f" {date.strftime('%Y-%m')}: {value:.2f} "
f"[{forecast_ci.iloc[i, 0]:.2f}, {forecast_ci.iloc[i, 1]:.2f}]")
return forecast_mean, forecast_ci
# 使用示例
print("\n" + "="*70)
print("完整 ARIMA 建模流程示例")
print("="*70)
# 生成示例数据:ARIMA(1,1,1)
np.random.seed(42)
n = 200
y = np.zeros(n)
epsilon = np.random.normal(0, 1, n)
phi, theta = 0.7, -0.5
for t in range(1, n):
if t == 1:
y[t] = epsilon[t] + theta * epsilon[t-1]
else:
y[t] = y[t-1] + phi * (y[t-1] - y[t-2]) + epsilon[t] + theta * epsilon[t-1]
dates = pd.date_range('2010-01', periods=n, freq='M')
ts_example = pd.Series(y, index=dates)
# 初始化建模器
modeler = ARIMAModeler(ts_example)
# 步骤1: 识别
d_suggested = modeler.step1_identification()
# 步骤2: 估计候选模型
candidate_orders = [
(0, 1, 1), (1, 1, 0), (1, 1, 1),
(2, 1, 1), (1, 1, 2), (2, 1, 2)
]
comparison = modeler.step2_estimation(candidate_orders)
# 拟合最优模型
best_model = ARIMA(ts_example, order=(1, 1, 1))
best_fit = best_model.fit()
print("\n" + "="*70)
print("最优模型摘要")
print("="*70)
print(best_fit.summary())
# 步骤3: 诊断
residuals = modeler.step3_diagnostics(best_fit)
# 步骤4: 预测
forecast_mean, forecast_ci = modeler.step4_forecast(best_fit, steps=12)SARIMA 模型(Seasonal ARIMA)
SARIMA(p,d,q)(P,D,Q)s 定义
完整形式:
参数说明:
- :非季节部分
- :季节部分
- :季节周期(月度=12,季度=4)
SARIMA 识别
季节性特征:
- ACF/PACF 在季节滞后(s, 2s, 3s...)显著
- 需要季节差分(D=1)
Python 实现 SARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
# 生成季节性数据
np.random.seed(123)
n = 120
dates_seasonal = pd.date_range('2010-01', periods=n, freq='M')
# 趋势 + 季节性 + 随机
trend = 100 + 0.5 * np.arange(n)
seasonal = 10 * np.sin(2 * np.pi * np.arange(n) / 12)
random_component = np.random.normal(0, 3, n)
y_seasonal = trend + seasonal + random_component
ts_seasonal = pd.Series(y_seasonal, index=dates_seasonal)
# SARIMA 建模
print("\n" + "="*70)
print("SARIMA 建模")
print("="*70)
# 拟合 SARIMA(1,1,1)(1,1,1,12)
sarima_model = SARIMAX(ts_seasonal,
order=(1, 1, 1),
seasonal_order=(1, 1, 1, 12),
enforce_stationarity=False,
enforce_invertibility=False)
sarima_fit = sarima_model.fit(disp=False)
print(sarima_fit.summary())
# 预测
sarima_forecast = sarima_fit.get_forecast(steps=24)
forecast_mean_s = sarima_forecast.predicted_mean
forecast_ci_s = sarima_forecast.conf_int()
# 可视化
fig, ax = plt.subplots(figsize=(14, 6))
ax.plot(ts_seasonal, linewidth=1.5, label='历史数据')
ax.plot(forecast_mean_s.index, forecast_mean_s, linewidth=2,
color='red', label='SARIMA 预测', marker='o')
ax.fill_between(forecast_mean_s.index,
forecast_ci_s.iloc[:, 0],
forecast_ci_s.iloc[:, 1],
alpha=0.3, color='red', label='95% 预测区间')
ax.set_title('SARIMA 季节性预测', fontsize=14, fontweight='bold')
ax.set_xlabel('时间')
ax.set_ylabel('值')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('sarima_forecast.png', dpi=300, bbox_inches='tight')
plt.show()模型选择:信息准则
AIC、BIC、HQIC
AIC(Akaike Information Criterion):
BIC(Bayesian Information Criterion):
HQIC(Hannan-Quinn IC):
其中:
- :似然函数
- :参数个数
- :样本量
选择原则:值越小越好
对比:
- AIC:偏向复杂模型
- BIC:惩罚参数更严格,偏向简约模型
- 实践中:两者结合
实战案例:失业率预测
# 模拟美国失业率数据
np.random.seed(2024)
n_months = 180
dates_unemp = pd.date_range('2010-01', periods=n_months, freq='M')
# 生成 ARIMA(2,1,2) 数据
phi1, phi2 = 0.5, -0.3
theta1, theta2 = -0.4, 0.2
unemployment = np.zeros(n_months)
epsilon = np.random.normal(0, 0.3, n_months)
unemployment[0] = 5.0 + epsilon[0]
unemployment[1] = 5.0 + epsilon[1]
for t in range(2, n_months):
diff = (unemployment[t-1] - unemployment[t-2])
unemployment[t] = (unemployment[t-1] + phi1 * diff + phi2 * (unemployment[t-2] - unemployment[t-3] if t > 2 else 0) +
epsilon[t] + theta1 * epsilon[t-1] + theta2 * epsilon[t-2])
ts_unemp = pd.Series(unemployment, index=dates_unemp, name='失业率')
print("\n" + "="*70)
print("案例:失业率预测(完整流程)")
print("="*70)
# 分割训练集和测试集
train_size = int(n_months * 0.85)
train = ts_unemp[:train_size]
test = ts_unemp[train_size:]
print(f"\n训练集: {len(train)} 个月 ({train.index[0].strftime('%Y-%m')} - {train.index[-1].strftime('%Y-%m')})")
print(f"测试集: {len(test)} 个月 ({test.index[0].strftime('%Y-%m')} - {test.index[-1].strftime('%Y-%m')})")
# 自动选择最优 ARIMA
from pmdarima import auto_arima
print("\n使用 auto_arima 自动选择模型...")
auto_model = auto_arima(train,
start_p=0, start_q=0,
max_p=5, max_q=5,
d=None, # 自动确定
seasonal=False,
stepwise=True,
suppress_warnings=True,
information_criterion='aic',
trace=True)
print("\n" + "="*70)
print("最优模型")
print("="*70)
print(auto_model.summary())
# 预测
n_forecast = len(test)
forecast_auto = auto_model.predict(n_periods=n_forecast, return_conf_int=True)
forecast_values = forecast_auto[0]
forecast_ci = forecast_auto[1]
# 评估预测误差
from sklearn.metrics import mean_absolute_error, mean_squared_error
mae = mean_absolute_error(test, forecast_values)
rmse = np.sqrt(mean_squared_error(test, forecast_values))
mape = np.mean(np.abs((test - forecast_values) / test)) * 100
print("\n" + "="*70)
print("预测误差评估")
print("="*70)
print(f"MAE (平均绝对误差): {mae:.4f}")
print(f"RMSE (均方根误差): {rmse:.4f}")
print(f"MAPE (平均绝对百分比误差): {mape:.2f}%")
# 可视化
fig, ax = plt.subplots(figsize=(14, 6))
ax.plot(train.index, train, linewidth=2, label='训练集', color='blue')
ax.plot(test.index, test, linewidth=2, label='测试集(真实值)', color='green')
ax.plot(test.index, forecast_values, linewidth=2, label='预测值',
color='red', marker='o', markersize=4)
ax.fill_between(test.index,
forecast_ci[:, 0],
forecast_ci[:, 1],
alpha=0.3, color='red', label='95% 预测区间')
ax.axvline(x=train.index[-1], color='black', linestyle='--', alpha=0.5,
label='训练/测试分割点')
ax.set_title('失业率 ARIMA 预测', fontsize=14, fontweight='bold')
ax.set_xlabel('时间')
ax.set_ylabel('失业率 (%)')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('unemployment_forecast.png', dpi=300, bbox_inches='tight')
plt.show()本节小结
ARIMA 模型族
| 模型 | 公式 | 识别特征 |
|---|---|---|
| AR(p) | PACF 截尾 | |
| MA(q) | ACF 截尾 | |
| ARMA(p,q) | AR + MA | 均衰减 |
| ARIMA(p,d,q) | 差分 + ARMA | 非平稳数据 |
| SARIMA | ARIMA + 季节性 | 季节性数据 |
Box-Jenkins 流程
- 识别:ACF/PACF、平稳性检验
- 估计:最大似然估计、信息准则
- 诊断:残差检验、Ljung-Box
- 预测:点预测、预测区间
实践要点
- 始终检查平稳性
- 对比多个候选模型
- 诊断残差(白噪声检验)
- 使用测试集验证
下节预告
在下一节中,我们将学习如何使用事件研究方法评估政策效应。
延伸阅读
Box, G. E. P., Jenkins, G. M., & Reinsel, G. C. (2015). Time Series Analysis: Forecasting and Control (5th ed.). Wiley.
Hamilton, J. D. (1994). Time Series Analysis. Princeton University Press.
Hyndman, R. J., & Athanasopoulos, G. (2021). Forecasting: Principles and Practice (3rd ed.). Chapters 8-9.
掌握 ARIMA,预测未来!