Python 中具有时变载荷的动态因子模型

问题描述 投票:0回答:1

我正在尝试在 python 中对时变载荷的动态因子模型进行建模,特别是在 python 中建立 TVP-FAVAR 模型。我查看了 statsmodels statspace sm.tsa.DynamicFactor 或 sm.tsa.statespace.MLEModel 类。我在 statsmodels 中找到了 TVP-VARFAVAR 的用例示例,但没有找到 TVP-FAVAR。对于简单的 AR(1) 情况,模型可以表示为: f1

f2

f3

我尝试将 beta 项放入状态方程中,但结果看起来不是线性的......

f4

f5

我正在阅读的论文使用卡尔曼滤波器在 matlab 中实现了模型,但如果 statsmodels 是不行的,我可能不得不求助于使用 MCMC(PyMC 可能可以帮助建模非线性的东西)。任何提示将不胜感激。

kalman-filter dimensionality-reduction state-space structural-equation-model
1个回答
0
投票

因此,在阅读了 Gary Koop 和 Dimitris Korobilis 的论文后,对于“新的财务状况指数”中我需要的具体模型,我意识到作者使用了间接方法进行估计。具体来说,是一个三步算法。首先,他们初始化参数并用一个分量(代理索引)估计 PCA。然后他们用它来获取一些随时间变化的参数。然后他们使用“方差贴现”来获得随时间变化的协方差矩阵。最后,使用这些我们通过卡尔曼滤波和平滑获得最终的(预期的)时变系数。估计模型后,我们可以生成/绘制 f_t 状态。我使用他们共享的 Matlab 代码在 python 中实现该模型。请参阅下面的完整代码:


class TVP_FAVAR():
    # Function to estimate time-varying loadings, coefficients, and covariances
    # from a TVP-FAVAR, conditional on feeding in an estimate of the factors
    # (Principal Components). This function runs the Kalman filter and smoother
    # for all time-varying parameters using an adaptive algorithm (EWMA filter
    # for the covariances).
    # Written by Dimitris Korobilis, 2012, edited by YV
    def __init__(self, endog, exog, nlags=4, nfactors=1):
        nstates = exog.shape[1]
        t, nvars = endog.shape
        self.nlags = nlags
        self.nmacro = nstates - nfactors
        nstates_all = nstates*self.nlags
        nVAR = self.nlags*(nstates**2)

        # setup: lambda
        self.lambda_pred = np.zeros((nvars, nstates, t))
        self.lambda_update = np.zeros((nvars, nstates, t))

        for i in range(t):
            self.lambda_pred[:nstates, :nstates, i] = np.eye(nstates)
            self.lambda_update[:nstates, :nstates, i] = np.eye(nstates)

        # setup: beta
        self.beta_pred = np.zeros((nVAR, t))
        self.beta_update = np.zeros((nVAR, t))

        # setup: states
        self.Rl_t = np.zeros((nstates, nstates, nvars, t))
        self.Sl_t = np.zeros((nstates, nstates, nvars, t))
        self.Rb_t = np.zeros((nVAR, nVAR, t))
        self.Sb_t = np.zeros((nVAR, nVAR, t))

        # setup: expectations
        self.x_t_pred = np.zeros((t, nvars))
        self.e_t = np.zeros((nvars, t))
        self.lambda_t = np.zeros((nvars, nstates, t))
        self.beta_t = np.zeros((nstates_all, nstates_all, t))

        # setup: covariance
        self.Q_t = np.zeros((nstates, nstates, t))
        self.V_t = np.zeros((nvars, nvars, t))

        # define lags of the factors to be used in the state (VAR) equation
        lf = LagFeatures(periods=list(range(1, self.nlags+1)), drop_original=True)
        xx = lf.fit_transform(exog).dropna()
        yy = exog.loc[xx.index].copy()
        self.xx = np.array(xx)
        self.yy = np.array(yy)
        self.exog = np.array(exog)
        self.endog = np.array(endog)
        Flagtemp, m = create_RHS_NI(self.xx.copy(), nstates, self.nlags, t)
        self.Flag = np.concatenate((np.zeros((nstates_all, nVAR)), Flagtemp), axis=0)

    def kalman_setup(self, priors):
        # init
        t, nvars = self.endog.shape
        self.Q_t[:, :, 0] = priors['Q_0']
        self.V_t[:, :, 0] = np.diag(np.diag(priors['V_0']))
        self.lambda_pred[:, :, 0] = priors['lambda_0_mean']
        self.beta_pred[:, 0] = priors['beta_0_mean']
        self.beta_update[:, 0] = self.beta_pred[:, 0]
        self.Rb_t[:, :, 0] = priors['beta_0_var']
        for i in range(self.nmacro, nvars):
            self.Rl_t[:, :, i, 0] = priors['lambda_0_var']

    def kalman_update_coeffs(self, t):
        nstates = self.exog.shape[1]
        # Update VAR coefficients conditional on Principal Componets estimates
        Rx = self.Rb_t[:, :, t] @ self.Flag[t*nstates:(t+1)*nstates, :].T
        KV_b = self.Q_t[:, :, t] + self.Flag[t*nstates:(t+1)*nstates, :]  @ Rx
        KG = Rx @ np.linalg.inv(KV_b)
        beta_adj = KG @ (self.exog[t, :].T - self.Flag[t*nstates:(t+1)*nstates, :] @ self.beta_pred[:, t])
        self.beta_update[:, t] = self.beta_pred[:, t] + beta_adj
        Sb_adj = KG @ (self.Flag[t*nstates:(t+1)*nstates, :] @ self.Rb_t[:, :, t])
        self.Sb_t[:, :, t] = self.Rb_t[:, :, t] - Sb_adj

    def kalman_filter_early(self, t, decays, priors):
        nstates = self.exog.shape[1]
        exog_i = self.exog[t, :]

        # priors
        self.beta_pred[:, t] = priors['beta_0_mean']
        self.beta_update[:, t] = self.beta_pred[:, t]
        self.Rb_t[:, :, t] = priors['beta_0_var']

        # expectations
        Gf_t = 0.1*(exog_i.reshape(-1, 1) @ exog_i.reshape(1, -1))
        if t > 0:
            self.Q_t[:, :, t] = decays[1]*self.Q_t[:, :, t-1] + (1-decays[1])*Gf_t[:nstates, :nstates]

    def kalman_filter(self, t, decays):
        nstates = self.exog.shape[1]
        nstates_all = nstates*self.nlags
        yy_i = self.yy[t-self.nlags]
        xx_i = self.xx[t-self.nlags]

        # expectations
        ee_i = (yy_i - xx_i @ self.B[:nstates, :nstates_all].T)
        Gf_t = ee_i.reshape(-1, 1) @ ee_i.reshape(1, -1)
        self.Q_t[:, :, t] = decays[1]*self.Q_t[:, :, t-1] + (1-decays[1])*Gf_t[:nstates, :nstates]

        # update beta
        self.beta_pred[:, t] = self.beta_update[:, t-1]
        self.Rb_t[:, :, t] = (1 / decays[3]) * self.Sb_t[:, :, t-1]

    def kalman_predict(self, t, decays):
        nstates = self.exog.shape[1]
        nvars = self.endog.shape[1]
        n_undefined_rows = nstates*(self.nlags-1)
        p = self.nmacro

        # Kalman predict steps
        if t > 0:
            self.lambda_pred[:, :, t] = self.lambda_update[:, :, t-1]
            self.Rl_t[:, :, :, t] = (1/decays[2])*self.Sl_t[:, :, :, t-1]

        # One step ahead prediction based on PC factor
        self.x_t_pred[t, :] = self.lambda_pred[:, :, t] @ self.exog[t, :].T

        # Prediction error
        self.e_t[:, t] = self.endog[t, :] - self.x_t_pred[t, :]

        # Get the measurement error variance
        self.A_t = self.e_t[self.nmacro:, t].reshape(-1, 1) @ self.e_t[self.nmacro:, t].reshape(1, -1)
        if t > 0:
            self.V_t[p:, p:, t] = decays[0]*self.V_t[p:, p:, t-1] + (1-decays[0])*np.diag(np.diag(self.A_t))

        # Update loadings conditional on Principal Components estimates
        for i in range(self.nmacro, nvars):
            Rx = self.Rl_t[:nstates, :nstates, i, t] @ self.exog[t, :nstates].T
            KV_l = self.V_t[i, i, t] + self.exog[t, :nstates] @ Rx
            KG = Rx / KV_l
            lambda_adj = KG * (self.endog[t, i].T - self.lambda_pred[i, :nstates, t] @ self.exog[t, :nstates].T)
            self.lambda_update[i, :nstates, t] = self.lambda_pred[i, :nstates, t] + lambda_adj
            Sl_adj = KG.reshape(-1, 1) @ (self.exog[t, :nstates] @ self.Rl_t[:nstates, :nstates, i, t]).reshape(1, -1)
            self.Sl_t[:nstates, :nstates, i, t] = self.Rl_t[:nstates, :nstates, i, t] - Sl_adj

        # update coeffs
        if t >= self.nlags:
            self.kalman_update_coeffs(t)

        # Assign coefficients
        biga = self.beta_update[:, t].reshape(nstates, self.nlags, nstates).transpose(1, 0, 2).reshape(nstates, -1)
        b_other = np.concatenate((np.eye(n_undefined_rows), np.zeros((n_undefined_rows, nstates))), axis=1)
        self.B = np.concatenate((biga, b_other), axis=0)
        self.lambda_t[:, :, t] = self.lambda_update[:, :, t]
        if np.max(np.abs(np.linalg.eig(self.B)[0])) < 0.9999:
            self.beta_t[:, :, t] = self.B
        else:
            self.beta_t[:, :, t] = self.beta_t[:, :, t-1]
            self.beta_update[:, t] = 0.95*self.beta_update[:, t-1]

    def smoother_setup(self, t):
        self.lambda_new = 0*self.lambda_update
        self.lambda_new[:, :, t] = self.lambda_update[:, :, t]
        self.beta_new = 0*self.beta_update
        self.beta_new[:, t] = self.beta_update[:, t]
        self.Q_t_new = 0*self.Q_t
        self.Q_t_new[:, :, t] = self.Q_t[:, :, t]
        self.V_t_new = 0*self.V_t
        self.V_t_new[:, :, t] = self.V_t[:, :, t]

    def kalman_smoother(self, t):
        # init
        nstates = self.exog.shape[1]
        nvars = self.endog.shape[1]
        p = self.nmacro

        # smooth lambda
        self.lambda_new[:nstates, :, t] = self.lambda_update[:nstates, :, t]
        for i in range(nstates, nvars):
            Ul_t = self.Sl_t[:nstates, :nstates, i, t] @ np.linalg.inv(self.Rl_t[:nstates, :nstates, i, t+1])
            lambda_adj = (self.lambda_new[i, :nstates, t+1] - self.lambda_pred[i, :nstates, t+1]) @ Ul_t.T
            self.lambda_new[i, :nstates, t] = self.lambda_update[i, :nstates, t] + lambda_adj

        # smooth beta
        if np.sum(self.Rb_t[:, :, t+1]) == 0:
            self.beta_new[:, t] = self.beta_update[:, t]
        else:
            Ub_t = self.Sb_t[:, :, t] @ np.linalg.inv(self.Rb_t[:, :, t+1])
            self.beta_new[:, t] = self.beta_update[:, t] + Ub_t @ (self.beta_new[:, t+1] - self.beta_pred[:, t+1])

        # smooth Q_t
        self.Q_t_new[:, :, t] = 0.9*self.Q_t[:, :, t] + 0.1*self.Q_t_new[:, :, t+1]

        # smooth V_t
        self.V_t_new[p:, p:, t] = 0.9*self.V_t[p:, p:, t] + 0.1*self.V_t_new[p:, p:, t+1]

    def kalman_reassign_coefficients(self, t):
        nstates = self.exog.shape[1]
        n_undefined_rows = nstates*(self.nlags-1)
        biga = self.beta_new[:, t].reshape(nstates, self.nlags, nstates).transpose(1, 0, 2).reshape(nstates, -1)
        b_other = np.concatenate((np.eye(n_undefined_rows), np.zeros((n_undefined_rows, nstates))), axis=1)
        B = np.concatenate((biga, b_other), axis=0)
        self.lambda_t[:, :, t] = self.lambda_new[:, :, t]
        self.beta_t[:, :, t] = B

    def fit(self, nfactors, decays, priors):
        # initialize matrices for filtering
        t = self.endog.shape[0]
        self.kalman_setup(priors)

        # perform predictions at time t=0
        self.kalman_predict(0, decays)

        # 1. Kalman filter
        for irep in range(1, t):
            # Update the state covariances,
            # get the variance of the factor and update Q[t]
            if irep <= self.nlags:
                self.kalman_filter_early(irep, decays, priors)
            else:
                self.kalman_filter(irep, decays)
            self.kalman_predict(irep, decays)

        # 2. Kalman smoother
        self.smoother_setup(t-1)
        for irep in range(t-2, -1, -1):
            self.kalman_smoother(irep)

        # using smoothed values for beta and lambda
        for irep in range(t):
            self.kalman_reassign_coefficients(irep)
        return self.beta_t, self.beta_new, self.lambda_t, self.V_t, self.Q_t

    def kalman_factor_update(self, t):
        # setup
        nstates = self.exog.shape[1]

        # One step ahead prediction based on Kalman factor
        self.x_t_predf[t, :] = self.lambda_t[:, :, t] @ self.factor_pred[:nstates, t]

        # Prediction error
        self.ef_t[:, t] = self.endog[t, :] - self.x_t_predf[t, :]

        # Update the factors conditional on the estimate of lambda_t and beta_t
        KV_f = self.V_t[:, :, t] + self.lambda_t[:, :, t] @ self.Rf_t[:nstates, :nstates, t] @ self.lambda_t[:, :, t].T
        KG = (self.Rf_t[:nstates, :nstates, t] @ self.lambda_t[:, :, t].T) @ np.linalg.inv(KV_f)
        self.factor_update[:nstates, t] = self.factor_pred[:nstates, t] + KG @ self.ef_t[:, t]
        self.Sf_t[:nstates, :nstates, t] = self.Rf_t[:nstates, :nstates, t] - KG @ (self.lambda_t[:, :, t] @ self.Rf_t[:nstates, :nstates, t])

    def factor_filter(self, priors):
        # setup
        t, nvars = self.endog.shape
        nstates = self.exog.shape[1]
        nstates_all = nstates*self.nlags

        # init
        self.factor_pred = np.zeros((nstates_all, t))
        self.factor_update = np.zeros((nstates_all, t))
        self.Rf_t = np.zeros((nstates_all, nstates_all, t))
        self.Sf_t = np.zeros((nstates_all, nstates_all, t))
        self.x_t_predf = np.zeros((t, nvars))
        self.ef_t = np.zeros((nvars, t))

        # priors
        self.factor_pred[:, 0] = priors['factor_0_mean']
        self.Rf_t[:, :, 0] = priors['factor_0_var']
        self.kalman_factor_update(0)

        # filtering step
        for irep in range(1, t):
            self.factor_pred[:, irep] = self.beta_t[:, :, irep-1] @ self.factor_update[:, irep-1]
            Sfb_t = self.beta_t[:, :, irep-1] @ self.Sf_t[:, :, irep-1] @ self.beta_t[:, :, irep-1].T
            Q_t_sparse = np.concatenate((self.Q_t[:, :, irep], np.zeros((nstates, nstates*(self.nlags-1)))), axis=1)
            Q_t_sparse = np.concatenate((Q_t_sparse, np.zeros((nstates*(self.nlags-1), nstates*nlags))), axis=0)
            self.Rf_t[:, :, irep] = Sfb_t + Q_t_sparse
            self.kalman_factor_update(irep)

    def factor_smoother(self):
        # setup
        t, nvars = self.endog.shape
        nstates = self.exog.shape[1]

        # fixed-interval smoother for the factors
        self.factor_new = 0*self.factor_update
        self.factor_new[:, t-1] = self.factor_update[:, t-1]
        self.Sf_t_new = 0*self.Sf_t
        self.Sf_t_new[:, :, t-1] = self.Sf_t[:, :, t-1]

        # smoothing
        for irep in range(t-2, -1, -1):
            self.Z_t = self.Sf_t[:, :, irep] @ self.beta_t[:, :, irep].T
            U_t = np.squeeze(self.Z_t[:nstates, :nstates] @ np.linalg.inv(self.Rf_t[:nstates, :nstates, irep+1]))
            factor_adj = U_t @ (self.factor_new[:nstates, irep+1] - self.factor_pred[:nstates, irep+1])
            self.factor_new[:nstates, irep] = self.factor_update[:nstates, irep] + factor_adj
            Sf_adj = U_t @ (self.Sf_t[:nstates, :nstates, irep+1] - self.Rf_t[:nstates, :nstates, irep+1]) @ U_t.T
            self.Sf_t_new[:nstates, :nstates, irep] = self.Sf_t[:nstates, :nstates, irep] + Sf_adj
        self.factor_new = self.factor_new[:nstates].T

    def gen_factors(self, priors):
        self.factor_filter(priors)
        self.factor_smoother()
        return self.factor_new, self.Sf_t_new

def create_RHS_NI(YY, nstates, nlags, t):
    K = nlags*(nstates**2)  # K is the number of elements in the state vector
    # Create x_t matrix
    # first find the zeros in matrix x_t
    x_t = np.zeros(((t-nlags)*nstates, K))
    for i in range(t-nlags):
        ztemp = np.array([]).reshape(nstates, -1)
        for j in range(nlags):
            xtemp = YY[i, j*nstates:(j+1)*nstates]
            xtemp = np.kron(np.eye(nstates), xtemp)
            ztemp = np.concatenate((ztemp, xtemp), axis=1)
        x_t[i*nstates:(i+1)*nstates, :] = ztemp
    return x_t, K

def Minn_prior_KOOP(gamma, nstates, nlags, nvars):
    # This is the version of the Minnesota prior with no dependence on the
    # standard deviations of the univariate regressions. This prior allows
    # online estimation and forecasting of the large TVP-VAR.

    # 1. Minnesota Mean on VAR regression coefficients
    A_prior = np.concatenate((0.9*np.eye(nstates),
                              np.zeros(((nlags-1)*nstates, nstates))), axis=0).T
    a_prior = A_prior.T.reshape(-1)

    # 2. Minnesota Variance on VAR regression coefficients
    # Create an array of dimensions nvars x nstates, which will contain the nvars diagonal
    # elements of the covariance matrix, in each of the nstates equations.
    V_i = np.zeros((int(nvars/nstates), nstates))

    for i in range(nstates):
        for j in range(int(nvars/nstates)):
            V_i[j, i] = gamma / (np.ceil((j+1)/nstates))**2

    # 3. Now V (MINNESOTA VARIANCE) is a diagonal matrix with diagonal elements of V_i
    V_prior = np.diag(V_i.reshape(-1))  # this is the prior variance of the vector alpha
    return a_prior, V_prior

用途:

# step 1: Update Parameters Conditional on PC
# the  decays include:
# 1) decay factor for measurement error variance
# 2) decay factor for factor error variance
# 3) decay factor for loadings error variance
# 4) decay factor for VAR coefficients error variance
decays = np.array([0.96, 0.96, 0.99, 0.99])
nfactors = 1
nlags = 4
nstates = YF.shape[1]
nmacro = nstates - nfactors
t, nvars = YX.shape
nstates_all = nstates*nlags
nVAR = nlags*(nstates**2)  # number of VAR parameters

# Priors
priors = {}

# initial condition on the factors
priors['factor_0_mean'] = np.zeros(nstates_all)
priors['factor_0_var'] = 10*np.eye(nstates_all)

# initial condition on lambda_t
priors['lambda_0_mean'] = np.zeros((nvars, nstates))
priors['lambda_0_var'] = 1*np.eye(nstates)

# initial condition on beta_t
gamma = 0.1
b_prior, Vb_prior = Minn_prior_KOOP(gamma, nstates, nlags, nVAR)  # Obtain a Minnesota-type prior
priors['beta_0_mean'] = b_prior
priors['beta_0_var'] = Vb_prior

# initial condition on the covariance matrices
priors['V_0'] = 0.1*np.eye(nvars)
priors['V_0'][:nmacro, :nmacro] = 0
priors['Q_0'] = 0.1*np.eye(nstates)

# modeling
tvp_favar_korobilis = TVP_FAVAR(YX, YF, nlags, nfactors)
beta_t, beta_new, lambda_t, V_t, Q_t = tvp_favar_korobilis.fit(decays, priors)
factor_new, Sf_t_new = tvp_favar_korobilis.gen_factors(priors)
S_knk = pd.DataFrame(factor_new, index=YF.index, columns=YF.columns)
© www.soinside.com 2019 - 2024. All rights reserved.