我正在尝试在 python 中对时变载荷的动态因子模型进行建模,特别是在 python 中建立 TVP-FAVAR 模型。我查看了 statsmodels statspace sm.tsa.DynamicFactor 或 sm.tsa.statespace.MLEModel 类。我在 statsmodels 中找到了 TVP-VAR 和 FAVAR 的用例示例,但没有找到 TVP-FAVAR。对于简单的 AR(1) 情况,模型可以表示为:
我尝试将 beta 项放入状态方程中,但结果看起来不是线性的......
我正在阅读的论文使用卡尔曼滤波器在 matlab 中实现了模型,但如果 statsmodels 是不行的,我可能不得不求助于使用 MCMC(PyMC 可能可以帮助建模非线性的东西)。任何提示将不胜感激。
因此,在阅读了 Gary Koop 和 Dimitris Korobilis 的论文后,对于“新的财务状况指数”中我需要的具体模型,我意识到作者使用了间接方法进行估计。具体来说,是一个三步算法。首先,他们初始化参数并用一个分量(代理索引)估计 PCA。然后他们用它来获取一些随时间变化的参数。然后他们使用“方差贴现”来获得随时间变化的协方差矩阵。最后,使用这些我们通过卡尔曼滤波和平滑获得最终的(预期的)时变系数。估计模型后,我们可以生成/绘制 f_t 状态。我使用他们共享的 Matlab 代码在 python 中实现该模型。请参阅下面的完整代码:
class TVP_FAVAR():
# Function to estimate time-varying loadings, coefficients, and covariances
# from a TVP-FAVAR, conditional on feeding in an estimate of the factors
# (Principal Components). This function runs the Kalman filter and smoother
# for all time-varying parameters using an adaptive algorithm (EWMA filter
# for the covariances).
# Written by Dimitris Korobilis, 2012, edited by YV
def __init__(self, endog, exog, nlags=4, nfactors=1):
nstates = exog.shape[1]
t, nvars = endog.shape
self.nlags = nlags
self.nmacro = nstates - nfactors
nstates_all = nstates*self.nlags
nVAR = self.nlags*(nstates**2)
# setup: lambda
self.lambda_pred = np.zeros((nvars, nstates, t))
self.lambda_update = np.zeros((nvars, nstates, t))
for i in range(t):
self.lambda_pred[:nstates, :nstates, i] = np.eye(nstates)
self.lambda_update[:nstates, :nstates, i] = np.eye(nstates)
# setup: beta
self.beta_pred = np.zeros((nVAR, t))
self.beta_update = np.zeros((nVAR, t))
# setup: states
self.Rl_t = np.zeros((nstates, nstates, nvars, t))
self.Sl_t = np.zeros((nstates, nstates, nvars, t))
self.Rb_t = np.zeros((nVAR, nVAR, t))
self.Sb_t = np.zeros((nVAR, nVAR, t))
# setup: expectations
self.x_t_pred = np.zeros((t, nvars))
self.e_t = np.zeros((nvars, t))
self.lambda_t = np.zeros((nvars, nstates, t))
self.beta_t = np.zeros((nstates_all, nstates_all, t))
# setup: covariance
self.Q_t = np.zeros((nstates, nstates, t))
self.V_t = np.zeros((nvars, nvars, t))
# define lags of the factors to be used in the state (VAR) equation
lf = LagFeatures(periods=list(range(1, self.nlags+1)), drop_original=True)
xx = lf.fit_transform(exog).dropna()
yy = exog.loc[xx.index].copy()
self.xx = np.array(xx)
self.yy = np.array(yy)
self.exog = np.array(exog)
self.endog = np.array(endog)
Flagtemp, m = create_RHS_NI(self.xx.copy(), nstates, self.nlags, t)
self.Flag = np.concatenate((np.zeros((nstates_all, nVAR)), Flagtemp), axis=0)
def kalman_setup(self, priors):
# init
t, nvars = self.endog.shape
self.Q_t[:, :, 0] = priors['Q_0']
self.V_t[:, :, 0] = np.diag(np.diag(priors['V_0']))
self.lambda_pred[:, :, 0] = priors['lambda_0_mean']
self.beta_pred[:, 0] = priors['beta_0_mean']
self.beta_update[:, 0] = self.beta_pred[:, 0]
self.Rb_t[:, :, 0] = priors['beta_0_var']
for i in range(self.nmacro, nvars):
self.Rl_t[:, :, i, 0] = priors['lambda_0_var']
def kalman_update_coeffs(self, t):
nstates = self.exog.shape[1]
# Update VAR coefficients conditional on Principal Componets estimates
Rx = self.Rb_t[:, :, t] @ self.Flag[t*nstates:(t+1)*nstates, :].T
KV_b = self.Q_t[:, :, t] + self.Flag[t*nstates:(t+1)*nstates, :] @ Rx
KG = Rx @ np.linalg.inv(KV_b)
beta_adj = KG @ (self.exog[t, :].T - self.Flag[t*nstates:(t+1)*nstates, :] @ self.beta_pred[:, t])
self.beta_update[:, t] = self.beta_pred[:, t] + beta_adj
Sb_adj = KG @ (self.Flag[t*nstates:(t+1)*nstates, :] @ self.Rb_t[:, :, t])
self.Sb_t[:, :, t] = self.Rb_t[:, :, t] - Sb_adj
def kalman_filter_early(self, t, decays, priors):
nstates = self.exog.shape[1]
exog_i = self.exog[t, :]
# priors
self.beta_pred[:, t] = priors['beta_0_mean']
self.beta_update[:, t] = self.beta_pred[:, t]
self.Rb_t[:, :, t] = priors['beta_0_var']
# expectations
Gf_t = 0.1*(exog_i.reshape(-1, 1) @ exog_i.reshape(1, -1))
if t > 0:
self.Q_t[:, :, t] = decays[1]*self.Q_t[:, :, t-1] + (1-decays[1])*Gf_t[:nstates, :nstates]
def kalman_filter(self, t, decays):
nstates = self.exog.shape[1]
nstates_all = nstates*self.nlags
yy_i = self.yy[t-self.nlags]
xx_i = self.xx[t-self.nlags]
# expectations
ee_i = (yy_i - xx_i @ self.B[:nstates, :nstates_all].T)
Gf_t = ee_i.reshape(-1, 1) @ ee_i.reshape(1, -1)
self.Q_t[:, :, t] = decays[1]*self.Q_t[:, :, t-1] + (1-decays[1])*Gf_t[:nstates, :nstates]
# update beta
self.beta_pred[:, t] = self.beta_update[:, t-1]
self.Rb_t[:, :, t] = (1 / decays[3]) * self.Sb_t[:, :, t-1]
def kalman_predict(self, t, decays):
nstates = self.exog.shape[1]
nvars = self.endog.shape[1]
n_undefined_rows = nstates*(self.nlags-1)
p = self.nmacro
# Kalman predict steps
if t > 0:
self.lambda_pred[:, :, t] = self.lambda_update[:, :, t-1]
self.Rl_t[:, :, :, t] = (1/decays[2])*self.Sl_t[:, :, :, t-1]
# One step ahead prediction based on PC factor
self.x_t_pred[t, :] = self.lambda_pred[:, :, t] @ self.exog[t, :].T
# Prediction error
self.e_t[:, t] = self.endog[t, :] - self.x_t_pred[t, :]
# Get the measurement error variance
self.A_t = self.e_t[self.nmacro:, t].reshape(-1, 1) @ self.e_t[self.nmacro:, t].reshape(1, -1)
if t > 0:
self.V_t[p:, p:, t] = decays[0]*self.V_t[p:, p:, t-1] + (1-decays[0])*np.diag(np.diag(self.A_t))
# Update loadings conditional on Principal Components estimates
for i in range(self.nmacro, nvars):
Rx = self.Rl_t[:nstates, :nstates, i, t] @ self.exog[t, :nstates].T
KV_l = self.V_t[i, i, t] + self.exog[t, :nstates] @ Rx
KG = Rx / KV_l
lambda_adj = KG * (self.endog[t, i].T - self.lambda_pred[i, :nstates, t] @ self.exog[t, :nstates].T)
self.lambda_update[i, :nstates, t] = self.lambda_pred[i, :nstates, t] + lambda_adj
Sl_adj = KG.reshape(-1, 1) @ (self.exog[t, :nstates] @ self.Rl_t[:nstates, :nstates, i, t]).reshape(1, -1)
self.Sl_t[:nstates, :nstates, i, t] = self.Rl_t[:nstates, :nstates, i, t] - Sl_adj
# update coeffs
if t >= self.nlags:
self.kalman_update_coeffs(t)
# Assign coefficients
biga = self.beta_update[:, t].reshape(nstates, self.nlags, nstates).transpose(1, 0, 2).reshape(nstates, -1)
b_other = np.concatenate((np.eye(n_undefined_rows), np.zeros((n_undefined_rows, nstates))), axis=1)
self.B = np.concatenate((biga, b_other), axis=0)
self.lambda_t[:, :, t] = self.lambda_update[:, :, t]
if np.max(np.abs(np.linalg.eig(self.B)[0])) < 0.9999:
self.beta_t[:, :, t] = self.B
else:
self.beta_t[:, :, t] = self.beta_t[:, :, t-1]
self.beta_update[:, t] = 0.95*self.beta_update[:, t-1]
def smoother_setup(self, t):
self.lambda_new = 0*self.lambda_update
self.lambda_new[:, :, t] = self.lambda_update[:, :, t]
self.beta_new = 0*self.beta_update
self.beta_new[:, t] = self.beta_update[:, t]
self.Q_t_new = 0*self.Q_t
self.Q_t_new[:, :, t] = self.Q_t[:, :, t]
self.V_t_new = 0*self.V_t
self.V_t_new[:, :, t] = self.V_t[:, :, t]
def kalman_smoother(self, t):
# init
nstates = self.exog.shape[1]
nvars = self.endog.shape[1]
p = self.nmacro
# smooth lambda
self.lambda_new[:nstates, :, t] = self.lambda_update[:nstates, :, t]
for i in range(nstates, nvars):
Ul_t = self.Sl_t[:nstates, :nstates, i, t] @ np.linalg.inv(self.Rl_t[:nstates, :nstates, i, t+1])
lambda_adj = (self.lambda_new[i, :nstates, t+1] - self.lambda_pred[i, :nstates, t+1]) @ Ul_t.T
self.lambda_new[i, :nstates, t] = self.lambda_update[i, :nstates, t] + lambda_adj
# smooth beta
if np.sum(self.Rb_t[:, :, t+1]) == 0:
self.beta_new[:, t] = self.beta_update[:, t]
else:
Ub_t = self.Sb_t[:, :, t] @ np.linalg.inv(self.Rb_t[:, :, t+1])
self.beta_new[:, t] = self.beta_update[:, t] + Ub_t @ (self.beta_new[:, t+1] - self.beta_pred[:, t+1])
# smooth Q_t
self.Q_t_new[:, :, t] = 0.9*self.Q_t[:, :, t] + 0.1*self.Q_t_new[:, :, t+1]
# smooth V_t
self.V_t_new[p:, p:, t] = 0.9*self.V_t[p:, p:, t] + 0.1*self.V_t_new[p:, p:, t+1]
def kalman_reassign_coefficients(self, t):
nstates = self.exog.shape[1]
n_undefined_rows = nstates*(self.nlags-1)
biga = self.beta_new[:, t].reshape(nstates, self.nlags, nstates).transpose(1, 0, 2).reshape(nstates, -1)
b_other = np.concatenate((np.eye(n_undefined_rows), np.zeros((n_undefined_rows, nstates))), axis=1)
B = np.concatenate((biga, b_other), axis=0)
self.lambda_t[:, :, t] = self.lambda_new[:, :, t]
self.beta_t[:, :, t] = B
def fit(self, nfactors, decays, priors):
# initialize matrices for filtering
t = self.endog.shape[0]
self.kalman_setup(priors)
# perform predictions at time t=0
self.kalman_predict(0, decays)
# 1. Kalman filter
for irep in range(1, t):
# Update the state covariances,
# get the variance of the factor and update Q[t]
if irep <= self.nlags:
self.kalman_filter_early(irep, decays, priors)
else:
self.kalman_filter(irep, decays)
self.kalman_predict(irep, decays)
# 2. Kalman smoother
self.smoother_setup(t-1)
for irep in range(t-2, -1, -1):
self.kalman_smoother(irep)
# using smoothed values for beta and lambda
for irep in range(t):
self.kalman_reassign_coefficients(irep)
return self.beta_t, self.beta_new, self.lambda_t, self.V_t, self.Q_t
def kalman_factor_update(self, t):
# setup
nstates = self.exog.shape[1]
# One step ahead prediction based on Kalman factor
self.x_t_predf[t, :] = self.lambda_t[:, :, t] @ self.factor_pred[:nstates, t]
# Prediction error
self.ef_t[:, t] = self.endog[t, :] - self.x_t_predf[t, :]
# Update the factors conditional on the estimate of lambda_t and beta_t
KV_f = self.V_t[:, :, t] + self.lambda_t[:, :, t] @ self.Rf_t[:nstates, :nstates, t] @ self.lambda_t[:, :, t].T
KG = (self.Rf_t[:nstates, :nstates, t] @ self.lambda_t[:, :, t].T) @ np.linalg.inv(KV_f)
self.factor_update[:nstates, t] = self.factor_pred[:nstates, t] + KG @ self.ef_t[:, t]
self.Sf_t[:nstates, :nstates, t] = self.Rf_t[:nstates, :nstates, t] - KG @ (self.lambda_t[:, :, t] @ self.Rf_t[:nstates, :nstates, t])
def factor_filter(self, priors):
# setup
t, nvars = self.endog.shape
nstates = self.exog.shape[1]
nstates_all = nstates*self.nlags
# init
self.factor_pred = np.zeros((nstates_all, t))
self.factor_update = np.zeros((nstates_all, t))
self.Rf_t = np.zeros((nstates_all, nstates_all, t))
self.Sf_t = np.zeros((nstates_all, nstates_all, t))
self.x_t_predf = np.zeros((t, nvars))
self.ef_t = np.zeros((nvars, t))
# priors
self.factor_pred[:, 0] = priors['factor_0_mean']
self.Rf_t[:, :, 0] = priors['factor_0_var']
self.kalman_factor_update(0)
# filtering step
for irep in range(1, t):
self.factor_pred[:, irep] = self.beta_t[:, :, irep-1] @ self.factor_update[:, irep-1]
Sfb_t = self.beta_t[:, :, irep-1] @ self.Sf_t[:, :, irep-1] @ self.beta_t[:, :, irep-1].T
Q_t_sparse = np.concatenate((self.Q_t[:, :, irep], np.zeros((nstates, nstates*(self.nlags-1)))), axis=1)
Q_t_sparse = np.concatenate((Q_t_sparse, np.zeros((nstates*(self.nlags-1), nstates*nlags))), axis=0)
self.Rf_t[:, :, irep] = Sfb_t + Q_t_sparse
self.kalman_factor_update(irep)
def factor_smoother(self):
# setup
t, nvars = self.endog.shape
nstates = self.exog.shape[1]
# fixed-interval smoother for the factors
self.factor_new = 0*self.factor_update
self.factor_new[:, t-1] = self.factor_update[:, t-1]
self.Sf_t_new = 0*self.Sf_t
self.Sf_t_new[:, :, t-1] = self.Sf_t[:, :, t-1]
# smoothing
for irep in range(t-2, -1, -1):
self.Z_t = self.Sf_t[:, :, irep] @ self.beta_t[:, :, irep].T
U_t = np.squeeze(self.Z_t[:nstates, :nstates] @ np.linalg.inv(self.Rf_t[:nstates, :nstates, irep+1]))
factor_adj = U_t @ (self.factor_new[:nstates, irep+1] - self.factor_pred[:nstates, irep+1])
self.factor_new[:nstates, irep] = self.factor_update[:nstates, irep] + factor_adj
Sf_adj = U_t @ (self.Sf_t[:nstates, :nstates, irep+1] - self.Rf_t[:nstates, :nstates, irep+1]) @ U_t.T
self.Sf_t_new[:nstates, :nstates, irep] = self.Sf_t[:nstates, :nstates, irep] + Sf_adj
self.factor_new = self.factor_new[:nstates].T
def gen_factors(self, priors):
self.factor_filter(priors)
self.factor_smoother()
return self.factor_new, self.Sf_t_new
def create_RHS_NI(YY, nstates, nlags, t):
K = nlags*(nstates**2) # K is the number of elements in the state vector
# Create x_t matrix
# first find the zeros in matrix x_t
x_t = np.zeros(((t-nlags)*nstates, K))
for i in range(t-nlags):
ztemp = np.array([]).reshape(nstates, -1)
for j in range(nlags):
xtemp = YY[i, j*nstates:(j+1)*nstates]
xtemp = np.kron(np.eye(nstates), xtemp)
ztemp = np.concatenate((ztemp, xtemp), axis=1)
x_t[i*nstates:(i+1)*nstates, :] = ztemp
return x_t, K
def Minn_prior_KOOP(gamma, nstates, nlags, nvars):
# This is the version of the Minnesota prior with no dependence on the
# standard deviations of the univariate regressions. This prior allows
# online estimation and forecasting of the large TVP-VAR.
# 1. Minnesota Mean on VAR regression coefficients
A_prior = np.concatenate((0.9*np.eye(nstates),
np.zeros(((nlags-1)*nstates, nstates))), axis=0).T
a_prior = A_prior.T.reshape(-1)
# 2. Minnesota Variance on VAR regression coefficients
# Create an array of dimensions nvars x nstates, which will contain the nvars diagonal
# elements of the covariance matrix, in each of the nstates equations.
V_i = np.zeros((int(nvars/nstates), nstates))
for i in range(nstates):
for j in range(int(nvars/nstates)):
V_i[j, i] = gamma / (np.ceil((j+1)/nstates))**2
# 3. Now V (MINNESOTA VARIANCE) is a diagonal matrix with diagonal elements of V_i
V_prior = np.diag(V_i.reshape(-1)) # this is the prior variance of the vector alpha
return a_prior, V_prior
用途:
# step 1: Update Parameters Conditional on PC
# the decays include:
# 1) decay factor for measurement error variance
# 2) decay factor for factor error variance
# 3) decay factor for loadings error variance
# 4) decay factor for VAR coefficients error variance
decays = np.array([0.96, 0.96, 0.99, 0.99])
nfactors = 1
nlags = 4
nstates = YF.shape[1]
nmacro = nstates - nfactors
t, nvars = YX.shape
nstates_all = nstates*nlags
nVAR = nlags*(nstates**2) # number of VAR parameters
# Priors
priors = {}
# initial condition on the factors
priors['factor_0_mean'] = np.zeros(nstates_all)
priors['factor_0_var'] = 10*np.eye(nstates_all)
# initial condition on lambda_t
priors['lambda_0_mean'] = np.zeros((nvars, nstates))
priors['lambda_0_var'] = 1*np.eye(nstates)
# initial condition on beta_t
gamma = 0.1
b_prior, Vb_prior = Minn_prior_KOOP(gamma, nstates, nlags, nVAR) # Obtain a Minnesota-type prior
priors['beta_0_mean'] = b_prior
priors['beta_0_var'] = Vb_prior
# initial condition on the covariance matrices
priors['V_0'] = 0.1*np.eye(nvars)
priors['V_0'][:nmacro, :nmacro] = 0
priors['Q_0'] = 0.1*np.eye(nstates)
# modeling
tvp_favar_korobilis = TVP_FAVAR(YX, YF, nlags, nfactors)
beta_t, beta_new, lambda_t, V_t, Q_t = tvp_favar_korobilis.fit(decays, priors)
factor_new, Sf_t_new = tvp_favar_korobilis.gen_factors(priors)
S_knk = pd.DataFrame(factor_new, index=YF.index, columns=YF.columns)