I implemented the Self-Organizing State Space Model in the following paper in Python. By assuming non-Gaussian noise for the system noise / observation noise of the particle filter, we will optimize the hyperparameters by ourselves.
Searching for hyperparameters is quite troublesome, so this one that optimizes itself is quite convenient. (Next, the search for ultra-super parameters will be a problem ...)
Reference: Filtering of motion trajectory using self-organized state space model
Isn't it wrong to do this here? Please feel free to comment if you have any questions. Also, I'm not sure, so I welcome you to tell me.
** 2016/12/14 amendment ** --Rewrite the source was messed up --It seems that turning numpy with for is slow, so I rewrote resampling with cython (not much different) --I tried to run it with cupy, but I couldn't install it properly and died safely.
In both cases, the noise dispersion is increased on the way.
――The normal particle filter seems to follow well in the area where the dispersion is small, but it rampages in the area where the dispersion is large. ――The self-organizing type seems to be more tolerant of errors because the particle spread is slightly larger in the region where the dispersion is large.
** Caution ** Utility functions are the same as below. Python implementation of particle filter-Qiita
particle_selforganized.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python implementation of self-organizing particle filters
"""
import pyximport
pyximport.install()
import resample
import numpy as xp
# import cupy as xp
def _rand_cauchy(gma):
nr, nc = gma.shape
uni = xp.random.rand(nr, nc)
return xp.arctan(uni/gma) /xp.pi + 1./2.
def _rand_normal(sgm, shape):
return xp.random.normal(0, sgm, shape)
def _logpdf_cauchy(gma, x):
y = xp.log(gma/xp.pi) -xp.log(x**2. + gma**2.)
return xp.sum(y, axis=0)
def _normalize(w):
return w / xp.sum(w)
class ParticleFilterSelfOrganized(object):
_sgm = 0.001
def __init__(self, f, h, pars_init, pars_hpt_init, pars_hpo_init):
self._f = f
self._h = h
_, num = pars_init.shape
self._num = num
self._w = _normalize(xp.ones(num))
self._pars = pars_init
self._pars_hpt = pars_hpt_init
self._pars_hpo = pars_hpo_init
def update(self, y):
self._update_pars()
self._update_weights(y)
self._resample()
def _update_pars(self):
self._pars = self._f(self._pars) + _rand_cauchy(xp.exp(self._pars_hpt/2.))
self._pars_hpt = self._pars_hpt + _rand_normal(self._sgm, self._pars_hpt.shape)
self._pars_hpo = self._pars_hpo + _rand_normal(self._sgm, self._pars_hpo.shape)
def _update_weights(self, y):
Y = y.reshape(y.size,1) * xp.ones(self._num)
pars_hpo = xp.exp(self._pars_hpo/2.)
loglh = _logpdf_cauchy( pars_hpo, xp.absolute(Y - self._h(self._pars)) )
self._w = _normalize( xp.exp( xp.log(self._w) + loglh ) )
def _resample(self):
wcum = xp.r_[0, xp.cumsum(self._w)]
num = self._num
idxs = resample.resample(num, wcum)
# start = 0
# idxs = num*[0]
# for i, n in enumerate( sorted(xp.random.rand(num)) ):
# for j in range(start, num):
# if n <= wcum[j+1]:
# idxs[i] = start = j
# break
self._pars = self._pars[:,idxs]
self._pars_hpt = self._pars_hpt[:,idxs]
self._pars_hpo = self._pars_hpo[:,idxs]
self._w = _normalize(self._w[idxs])
def estimate(self):
return xp.sum(self._pars * self._w, axis=1)
def particles(self):
return self._pars
test_particle_selforganized.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""particle_selforganized.py usage sample
"""
import particle_selforganized as par
import utils
import numpy as xp
# import cupy as xp
#parameter settings--------------------
num = 5000 #Number of particles
v_noise = 0.05
#Initial particles
mins = -5. * xp.ones(4)
maxs = +5. * xp.ones(4)
pars_init = utils.rand_uniform(mins, maxs, num)
pars_hpt_init = xp.random.normal(0, 0.01, pars_init.shape)
pars_hpo_init = xp.random.normal(0, 0.01, (2,num))
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#State model generation(Secondary floor difference model)
A = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
f_trans = lambda X: A.dot(X)
#Generation of observation model(Direct observation model)
B = xp.kron(xp.eye(2), xp.array([1, 0]))
f_obs = lambda X: B.dot(X)
#Initial plot
lines = utils.init_plot_particle(dataset, pars_init)
#Generation of particle filter
pf = par.ParticleFilterSelfOrganized(
f_trans, f_obs, pars_init, pars_hpt_init, pars_hpo_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
pf.update(y)
state = pf.estimate()
x_est[i,:] = f_obs(state)
#Data plot
pars = pf.particles()
utils.plot_each_particle(lines, x_est, pars, i)
# utils.save_as_gif_particle(dataset, pars_init, pf, "particle_selforganized.gif")
Recommended Posts