I needed a highly accurate time-series prediction model for change point detection, so I tried to create a code. Write the result first. I wanted to say, "I was able to get good accuracy! (Don't do it!", But I got a half-finished code with strange behavior ... orz In the future, we will raise it again as soon as it can be corrected. ..
"Forefront of financial time series analysis by self-organized state space model with initial distribution search: Application to stochastic volatility fluctuation model with t distribution" summarizes the issues of Monte Carlo filter, and further searches for optimal initial distribution. The method was also proposed and it was a very interesting material. I intended to create a product that included optimization of the initial distribution with reference to this, but I ended up with a product that was far from that. ..
The Monte Carlo filter is a very flexible state estimation algorithm that allows you to build unsteady non-Gaussian models. However, the following are raised as problems.
Regarding parameter estimation in the Monte Carlo filter, (1) the estimator of the likelihood function for the parameter includes an error by the Monte Carlo method, and (2) the differentiation of the likelihood function is difficult to calculate (self-organization with initial distribution search). Forefront of financial time series analysis by state space model P.4)
To solve this, a method using an optimization method called the Nelder-Mead method (linear search type nonlinear optimization method) has been proposed. However, this method leaves the problem that if the convergence condition is not relaxed significantly, it will not converge and the variance of the parameter estimator will increase. On the other hand, the self-organized state space model is a method of making Bayesian inference by incorporating parameters into the state space. In "Forefront of financial time series analysis by self-organized state space model with initial distribution search", a method of constructing the initial distribution (maximum likelihood estimation method) using the NM method is proposed. For the Monte Carlo filter, please refer to Implementation of a simple particle filter.
I created the code for the following model.
As described in "Knowledge discovery and self-organization type statistical model", for example, when estimating $ σ ^ 2 $, it is necessary to guarantee positiveness, so estimate $ log_ {10} σ ^ 2 $. going.
Then, I will write the code below.
python
# coding: utf-8
from math import log, pow, sqrt
import numpy as np
from scipy.stats import norm, cauchy
from numpy.random import uniform, multivariate_normal
from multiprocessing import Pool
import matplotlib.pyplot as plt
class ParticleFilter:
nu = 0.01 #System noise scale ultra-super parameter
xi = 0.03 #Observation noise scale ultra-super parameter
log_likelihood = 0.0 #Log likelihood
TIME = 1
PR=8 # unmber of processing
def __init__(self, PARTICLES_NUM, k=1, ydim=1, pdim=2):
self.PARTICLES_NUM = PARTICLES_NUM #Number of particles
self.TEETH_OF_COMB = np.arange(0, 1, float(1.0)/self.PARTICLES_NUM)
self.weights = np.zeros((ydim, self.PARTICLES_NUM))
self.particles = np.zeros((k*ydim+pdim ,self.PARTICLES_NUM))
self.predicted_particles = np.zeros((k*ydim+pdim , self.PARTICLES_NUM))
np.random.seed(555)
self.predicted_value = []
self.filtered_value = []
self.LSM = np.zeros(ydim) #Square error
self.F, self.G, self.H = self.FGHset(k, ydim, pdim)
self.k = k
self.ydim = ydim
self.pdim = pdim
def init_praticles_distribution(self, P, r):
"""initialize particles
x_0|0
tau_0|0
sigma_0|0
"""
data_particles = multivariate_normal([1]*self.ydim*self.k,
np.eye(self.ydim*self.k)*10, self.PARTICLES_NUM).T
param_particles = np.zeros((self.pdim, self.PARTICLES_NUM))
for i in xrange(self.pdim):
param_particles[i,:] = uniform(P-r, P+r, self.PARTICLES_NUM)
self.particles = np.vstack((data_particles, param_particles))
def get_system_noise(self):
"""v_t vector"""
data_noise = cauchy.rvs(loc=[0]*self.PARTICLES_NUM, scale=np.power(10,self.particles[self.ydim])) #size=self.PARTICLES_NUM
data_noise[data_noise==float("-inf")] = -1e308
data_noise[data_noise==float("inf")] = 1e308
parameta_noise_sys = cauchy.rvs(loc=0, scale=self.xi, size=self.PARTICLES_NUM) # noise of hyper parameter in system model
parameta_noise_ob = cauchy.rvs(loc=0, scale=self.nu, size=self.PARTICLES_NUM) # noise of hyper parameter in observation model
#print np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
return np.vstack((data_noise, parameta_noise_sys, parameta_noise_ob))
def calc_pred_particles(self):
"""calculate system function
x_t|t-1 = F*x_t-1 + Gv_t
"""
return self.F.dot(self.particles) + self.G.dot(self.get_system_noise()) # linear non-Gaussian
def calc_particles_weight(self,y):
"""calculate fitness probabilities between observation value and predicted value
w_t
"""
locs = self.calc_pred_particles()
self.predicted_particles = locs
scale=np.power(10,locs[-1])
scale[scale==0] = 1e-308
self.weights = cauchy.pdf( np.array([y]*self.PARTICLES_NUM) - self.H.dot(locs), loc=[0]*self.PARTICLES_NUM,
scale=scale ).flatten()
def calc_likelihood(self):
"""calculate likelihood at that point
p(y_t|y_1:t-1)
"""
res = np.sum(self.weights)/self.PARTICLES_NUM
#print res
self.log_likelihood += log(res)
def normalize_weights(self):
"""wtilda_t"""
self.weights = self.weights/np.sum(self.weights)
def resample(self,y):
"""x_t|t"""
self.normalize_weights()
self.memorize_predicted_value()
# accumulate weight
cum = np.cumsum(self.weights)
# create roulette pointer
base = uniform(0,float(1.0)/self.PARTICLES_NUM)
pointers = self.TEETH_OF_COMB + base
# select particles
selected_idx = [np.where(cum>=p)[0][0] for p in pointers]
"""
pool = Pool(processes=self.PR)
selected_idx = pool.map(get_slected_particles_idx, ((cum,p) for p in pointers))
pool.close()
pool.join()
"""
self.particles = self.predicted_particles[:,selected_idx]
self.memorize_filtered_value(selected_idx, y)
def memorize_predicted_value(self):
predicted_value = np.sum(self.predicted_particles*self.weights, axis=1)[0]
self.predicted_value.append(predicted_value)
def memorize_filtered_value(self, selected_idx, y):
filtered_value = np.sum(self.particles*self.weights[selected_idx] , axis=1) \
/np.sum(self.weights[selected_idx])
self.filtered_value.append(filtered_value[0])
self.calculate_LSM(y,filtered_value[:self.ydim])
def calculate_LSM(self,y,filterd_value):
self.LSM += pow(y-filterd_value,2)
def forward(self,y):
"""compute system model and observation model"""
print 'calculating time at %d' % self.TIME
self.calc_pred_particles()
self.calc_particles_weight(y)
self.calc_likelihood()
self.resample(y)
self.TIME += 1
def FGHset(self, k, vn_y, n_ss_parameters):
"""
vn_y:input dimension
n_ss_parameters:number of hyper parameter
k:difference
"""
G_upper_block = np.zeros((k*vn_y, vn_y+n_ss_parameters))
G_lower_block = np.zeros((n_ss_parameters, vn_y+n_ss_parameters))
G_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
G_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
G = np.vstack( (G_upper_block, G_lower_block) )
H = np.hstack( (np.eye(vn_y),
np.zeros((vn_y, vn_y*(k-1)+n_ss_parameters))
) )
F_upper_block = np.zeros((k*vn_y, k*vn_y+n_ss_parameters))
F_lower_block = np.zeros((n_ss_parameters, k*vn_y+n_ss_parameters))
F_lower_block[-n_ss_parameters:, -n_ss_parameters:] = np.eye(n_ss_parameters)
if k==1:
F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)
elif k==2:
F_upper_block[:vn_y, :vn_y] = np.eye(vn_y)*2
F_upper_block[:vn_y, vn_y:k*vn_y] = np.eye(vn_y)*-1
F_upper_block[vn_y:k*vn_y, :vn_y] = np.eye(vn_y)
F = np.vstack((F_upper_block, F_lower_block))
return F, G, H
def get_slected_particles_idx((cum,p)):
"""multiprocessing function"""
try:
return np.where(cum>=p)[0][0]
except Exception, e:
import sys
import traceback
sys.stderr.write(traceback.format_exc())
if __name__=='__main__':
n_particle = 10000
pf = ParticleFilter(n_particle, k=1, ydim=1, pdim=2)
pf.init_praticles_distribution(0, 8) # P, r
data = np.hstack((norm.rvs(0,1,size=20),norm.rvs(10,1,size=60),norm.rvs(-30,0.5,size=20)))
for d in data:
pf.forward(d)
print 'log likelihood:', pf.log_likelihood
print 'LSM:', pf.LSM
rng = range(100)
plt.plot(rng,data,label=u"training data")
plt.plot(rng,pf.predicted_value,label=u"predicted data")
plt.plot(rng,pf.filtered_value,label=u"filtered data")
plt.xlabel('TIME',fontsize=18)
plt.ylabel('Value',fontsize=18)
plt.legend()
plt.show()
It's a code full of flaws that doesn't need to have a diagram of the results ...
It's a pity that the time has run out halfway this time. Correct it so that you can get decent results in the future, and raise it again. (I'm sorry to publish a half-finished thing that is not helpful, but this time I uploaded it as a memo for myself.)
We apologize for the inconvenience, but if you make a mistake, we would appreciate it if you could point it out.