[PYTHON] Code for self-organizing state-space model (unfinished)

Motivation

I needed a highly accurate time-series prediction model for change point detection, so I tried to create a code. Write the result first. I wanted to say, "I was able to get good accuracy! (Don't do it!", But I got a half-finished code with strange behavior ... orz In the future, we will raise it again as soon as it can be corrected. ..

References

"Forefront of financial time series analysis by self-organized state space model with initial distribution search: Application to stochastic volatility fluctuation model with t distribution" summarizes the issues of Monte Carlo filter, and further searches for optimal initial distribution. The method was also proposed and it was a very interesting material. I intended to create a product that included optimization of the initial distribution with reference to this, but I ended up with a product that was far from that. ..

Problems with Monte Carlo filters

The Monte Carlo filter is a very flexible state estimation algorithm that allows you to build unsteady non-Gaussian models. However, the following are raised as problems.

Regarding parameter estimation in the Monte Carlo filter, (1) the estimator of the likelihood function for the parameter includes an error by the Monte Carlo method, and (2) the differentiation of the likelihood function is difficult to calculate (self-organization with initial distribution search). Forefront of financial time series analysis by state space model P.4)

To solve this, a method using an optimization method called the Nelder-Mead method (linear search type nonlinear optimization method) has been proposed. However, this method leaves the problem that if the convergence condition is not relaxed significantly, it will not converge and the variance of the parameter estimator will increase. On the other hand, the self-organized state space model is a method of making Bayesian inference by incorporating parameters into the state space. In "Forefront of financial time series analysis by self-organized state space model with initial distribution search", a method of constructing the initial distribution (maximum likelihood estimation method) using the NM method is proposed. For the Monte Carlo filter, please refer to Implementation of a simple particle filter.

model

I created the code for the following model. x_t=Fx_{t-1}+Gv_t y_t=Hx_{t}+w_t, w_t~C(0,σ_t^2) v_{t,t_t}~C(0,τ_t^2) v_{t,{τ_t^2}}~C(0,ν^2) v_{t,{σ_t^2}}~C(0,ξ^2) x_t=[t_t,log_{10}τ_t^2,log_{10}σ_t^2]^T v_{t}=[v_{t,t_t},v_{t,{τ_t^2}},v_{t,{σ_t^2}}]^T

As described in "Knowledge discovery and self-organization type statistical model", for example, when estimating $ σ ^ 2 $, it is necessary to guarantee positiveness, so estimate $ log_ {10} σ ^ 2 $. going.

Then, I will write the code below.

Reference code

python


# coding: utf-8

from math import log, pow, sqrt
import numpy as np
from scipy.stats import norm, cauchy
from numpy.random import uniform, multivariate_normal
from multiprocessing import Pool
import matplotlib.pyplot as plt


class ParticleFilter:
    nu = 0.01 #System noise scale ultra-super parameter
    xi = 0.03 #Observation noise scale ultra-super parameter
    log_likelihood = 0.0 #Log likelihood
    TIME = 1
    PR=8 # unmber of processing
    
    def __init__(self, PARTICLES_NUM, k=1, ydim=1, pdim=2):
        self.PARTICLES_NUM = PARTICLES_NUM #Number of particles
        self.TEETH_OF_COMB = np.arange(0, 1, float(1.0)/self.PARTICLES_NUM)
        self.weights = np.zeros((ydim, self.PARTICLES_NUM))
        self.particles = np.zeros((k*ydim+pdim ,self.PARTICLES_NUM))
        self.predicted_particles = np.zeros((k*ydim+pdim , self.PARTICLES_NUM))
        np.random.seed(555)
        self.predicted_value = []
        self.filtered_value = []
        self.LSM = np.zeros(ydim) #Square error

        self.F, self.G, self.H = self.FGHset(k, ydim, pdim)
        self.k = k
        self.ydim = ydim
        self.pdim = pdim
    
    def init_praticles_distribution(self, P, r):
        """initialize particles
        x_0|0
        tau_0|0
        sigma_0|0
        """
        data_particles = multivariate_normal([1]*self.ydim*self.k,
                            np.eye(self.ydim*self.k)*10, self.PARTICLES_NUM).T
        param_particles = np.zeros((self.pdim, self.PARTICLES_NUM))
        for i in xrange(self.pdim):
            param_particles[i,:] = uniform(P-r, P+r, self.PARTICLES_NUM)
        self.particles = np.vstack((data_particles, param_particles))
        
    def get_system_noise(self):
        """v_t vector"""
        data_noise = cauchy.rvs(loc=[0]*self.PARTICLES_NUM, scale=np.power(10,self.particles[self.ydim])) #size=self.PARTICLES_NUM
        data_noise[data_noise==float("-inf")] = -1e308
        data_noise[data_noise==float("inf")] = 1e308
        parameta_noise_sys = cauchy.rvs(loc=0, scale=self.xi, size=self.PARTICLES_NUM) # noise of hyper parameter in system model
        parameta_noise_ob = cauchy.rvs(loc=0, scale=self.nu, size=self.PARTICLES_NUM) # noise of hyper parameter in observation model

        #print np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
        return np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
        
    def calc_pred_particles(self):
        """calculate system function
        x_t|t-1 = F*x_t-1 + Gv_t
        """
        return self.F.dot(self.particles) + self.G.dot(self.get_system_noise()) # linear non-Gaussian  
        
    def calc_particles_weight(self,y):
        """calculate fitness probabilities between observation value and predicted value
        w_t
        """
        locs = self.calc_pred_particles()
        self.predicted_particles = locs
        scale=np.power(10,locs[-1])
        scale[scale==0] = 1e-308
        
        self.weights = cauchy.pdf( np.array([y]*self.PARTICLES_NUM) - self.H.dot(locs), loc=[0]*self.PARTICLES_NUM,
                                scale=scale ).flatten()
        
    def calc_likelihood(self):
        """calculate likelihood at that point
        p(y_t|y_1:t-1)
        """
        res = np.sum(self.weights)/self.PARTICLES_NUM
        #print res
        self.log_likelihood += log(res)
      
    def normalize_weights(self):
        """wtilda_t"""
        self.weights = self.weights/np.sum(self.weights)
      
    def resample(self,y):
        """x_t|t"""
        self.normalize_weights()

        self.memorize_predicted_value()

        # accumulate weight
        cum = np.cumsum(self.weights)
        
        # create roulette pointer 
        base = uniform(0,float(1.0)/self.PARTICLES_NUM)
        pointers = self.TEETH_OF_COMB + base

        # select particles
        selected_idx = [np.where(cum>=p)[0][0] for p in pointers]
        """
        pool = Pool(processes=self.PR)
        selected_idx = pool.map(get_slected_particles_idx, ((cum,p) for p in pointers))
        pool.close()
        pool.join()     
        """

        self.particles = self.predicted_particles[:,selected_idx]
        self.memorize_filtered_value(selected_idx, y)
    
    def memorize_predicted_value(self):
        predicted_value = np.sum(self.predicted_particles*self.weights, axis=1)[0]
        self.predicted_value.append(predicted_value)

    def memorize_filtered_value(self, selected_idx, y):
        filtered_value = np.sum(self.particles*self.weights[selected_idx] , axis=1) \
                            /np.sum(self.weights[selected_idx])
        self.filtered_value.append(filtered_value[0])
        self.calculate_LSM(y,filtered_value[:self.ydim])

    def calculate_LSM(self,y,filterd_value):
        self.LSM += pow(y-filterd_value,2)

    def forward(self,y):
        """compute system model and observation model"""
        print 'calculating time at %d' % self.TIME
        self.calc_pred_particles()
        self.calc_particles_weight(y)
        self.calc_likelihood()
        self.resample(y)
        self.TIME += 1

    def FGHset(self, k, vn_y, n_ss_parameters):
        """
        vn_y:input dimension
        n_ss_parameters:number of hyper parameter 
        k:difference
        """
        G_upper_block = np.zeros((k*vn_y, vn_y+n_ss_parameters))
        G_lower_block = np.zeros((n_ss_parameters, vn_y+n_ss_parameters))
        G_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
        G_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
        G = np.vstack( (G_upper_block, G_lower_block) )
        
        H = np.hstack( (np.eye(vn_y), 
                            np.zeros((vn_y, vn_y*(k-1)+n_ss_parameters))
                        ) )
              
        F_upper_block = np.zeros((k*vn_y, k*vn_y+n_ss_parameters))
        F_lower_block = np.zeros((n_ss_parameters, k*vn_y+n_ss_parameters))
        F_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
        if k==1:
            F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
        elif k==2:
            F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)*2
            F_upper_block[:vn_y, vn_y:k*vn_y] = np.eye(vn_y)*-1
            F_upper_block[vn_y:k*vn_y, :vn_y] = np.eye(vn_y)
        F = np.vstack((F_upper_block, F_lower_block))
        
        return F, G, H

def get_slected_particles_idx((cum,p)):
    """multiprocessing function"""
    try:
        return np.where(cum>=p)[0][0]
    
    except Exception, e:
        import sys
        import traceback
        sys.stderr.write(traceback.format_exc())    

if __name__=='__main__':
    n_particle = 10000
    pf = ParticleFilter(n_particle, k=1, ydim=1, pdim=2)
    pf.init_praticles_distribution(0, 8) # P, r
    
    data = np.hstack((norm.rvs(0,1,size=20),norm.rvs(10,1,size=60),norm.rvs(-30,0.5,size=20)))
    
    for d in data:
        pf.forward(d)
    print 'log likelihood:', pf.log_likelihood
    print 'LSM:', pf.LSM
    
    rng = range(100)
    plt.plot(rng,data,label=u"training data")
    plt.plot(rng,pf.predicted_value,label=u"predicted data")
    plt.plot(rng,pf.filtered_value,label=u"filtered data")
    plt.xlabel('TIME',fontsize=18)
    plt.ylabel('Value',fontsize=18)    
    plt.legend() 
    plt.show()

Experimental result

It's a code full of flaws that doesn't need to have a diagram of the results ...

comment

It's a pity that the time has run out halfway this time. Correct it so that you can get decent results in the future, and raise it again. (I'm sorry to publish a half-finished thing that is not helpful, but this time I uploaded it as a memo for myself.)

We apologize for the inconvenience, but if you make a mistake, we would appreciate it if you could point it out.

Recommended Posts

Code for self-organizing state-space model (unfinished)
Code for self-organizing state-space model (modified version)
Techniques for code testing?
Python code memo for yourself
Test code for evaluating decorators
[Python] Sample code for Python grammar