Ich habe das selbstorganisierende Zustandsraummodell im folgenden Artikel in Python implementiert. Indem Sie für das Systemrauschen / Beobachtungsrauschen des Partikelfilters ein nicht-Gaußsches Rauschen annehmen, optimieren Sie die Hyperparameter selbst.
Die Suche nach Super-Parametern ist ziemlich mühsam, daher ist diese, die sich selbst optimiert, sehr praktisch. (Als nächstes besteht das Problem darin, nach Ultra-Super-Parametern zu suchen ...)
Referenz: Filtern der Bewegungsbahn mithilfe eines selbstorganisierten Zustandsraummodells
Ist es nicht falsch, dies hier zu tun? Bitte zögern Sie nicht zu kommentieren, wenn Sie Fragen haben. Ich bin mir auch nicht sicher, also begrüße ich Sie, es mir zu sagen.
** Fest am 14. Dezember 2016 **
In beiden Fällen wird die Geräuschstreuung unterwegs erhöht.
――Der normale Partikelfilter scheint in dem Bereich, in dem die Dispersion klein ist, gut zu folgen, aber er tobt in dem Bereich, in dem die Dispersion groß ist. ――Der selbstorganisierende Typ scheint fehlertoleranter zu sein, da die Partikelverteilung in dem Bereich, in dem die Dispersion groß ist, etwas größer ist.
** Achtung ** Die Utility-Funktionen sind die gleichen wie unten. Python-Implementierung des Partikelfilters --Qiita
particle_selforganized.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python-Implementierung eines selbstorganisierenden Partikelfilters
"""
import pyximport
pyximport.install()
import resample
import numpy as xp
# import cupy as xp
def _rand_cauchy(gma):
nr, nc = gma.shape
uni = xp.random.rand(nr, nc)
return xp.arctan(uni/gma) /xp.pi + 1./2.
def _rand_normal(sgm, shape):
return xp.random.normal(0, sgm, shape)
def _logpdf_cauchy(gma, x):
y = xp.log(gma/xp.pi) -xp.log(x**2. + gma**2.)
return xp.sum(y, axis=0)
def _normalize(w):
return w / xp.sum(w)
class ParticleFilterSelfOrganized(object):
_sgm = 0.001
def __init__(self, f, h, pars_init, pars_hpt_init, pars_hpo_init):
self._f = f
self._h = h
_, num = pars_init.shape
self._num = num
self._w = _normalize(xp.ones(num))
self._pars = pars_init
self._pars_hpt = pars_hpt_init
self._pars_hpo = pars_hpo_init
def update(self, y):
self._update_pars()
self._update_weights(y)
self._resample()
def _update_pars(self):
self._pars = self._f(self._pars) + _rand_cauchy(xp.exp(self._pars_hpt/2.))
self._pars_hpt = self._pars_hpt + _rand_normal(self._sgm, self._pars_hpt.shape)
self._pars_hpo = self._pars_hpo + _rand_normal(self._sgm, self._pars_hpo.shape)
def _update_weights(self, y):
Y = y.reshape(y.size,1) * xp.ones(self._num)
pars_hpo = xp.exp(self._pars_hpo/2.)
loglh = _logpdf_cauchy( pars_hpo, xp.absolute(Y - self._h(self._pars)) )
self._w = _normalize( xp.exp( xp.log(self._w) + loglh ) )
def _resample(self):
wcum = xp.r_[0, xp.cumsum(self._w)]
num = self._num
idxs = resample.resample(num, wcum)
# start = 0
# idxs = num*[0]
# for i, n in enumerate( sorted(xp.random.rand(num)) ):
# for j in range(start, num):
# if n <= wcum[j+1]:
# idxs[i] = start = j
# break
self._pars = self._pars[:,idxs]
self._pars_hpt = self._pars_hpt[:,idxs]
self._pars_hpo = self._pars_hpo[:,idxs]
self._w = _normalize(self._w[idxs])
def estimate(self):
return xp.sum(self._pars * self._w, axis=1)
def particles(self):
return self._pars
test_particle_selforganized.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""particle_selforganized.py Verwendungsbeispiel
"""
import particle_selforganized as par
import utils
import numpy as xp
# import cupy as xp
#Parametereinstellungen--------------------
num = 5000 #Anzahl der Partikel
v_noise = 0.05
#Anfängliche Partikel
mins = -5. * xp.ones(4)
maxs = +5. * xp.ones(4)
pars_init = utils.rand_uniform(mins, maxs, num)
pars_hpt_init = xp.random.normal(0, 0.01, pars_init.shape)
pars_hpo_init = xp.random.normal(0, 0.01, (2,num))
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#Generierung von Zustandsmodellen(Differenzialmodell für den zweiten Stock)
A = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
f_trans = lambda X: A.dot(X)
#Generierung eines Beobachtungsmodells(Direktes Beobachtungsmodell)
B = xp.kron(xp.eye(2), xp.array([1, 0]))
f_obs = lambda X: B.dot(X)
#Erste Handlung
lines = utils.init_plot_particle(dataset, pars_init)
#Erzeugung eines Partikelfilters
pf = par.ParticleFilterSelfOrganized(
f_trans, f_obs, pars_init, pars_hpt_init, pars_hpo_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
pf.update(y)
state = pf.estimate()
x_est[i,:] = f_obs(state)
#Datenplot
pars = pf.particles()
utils.plot_each_particle(lines, x_est, pars, i)
# utils.save_as_gif_particle(dataset, pars_init, pf, "particle_selforganized.gif")
Recommended Posts