Es ist weit verbreitet, aber ich habe es erstellt, also werde ich es verlassen. Ich werde einen Kommentar hinzufügen, wenn ich Lust dazu habe.
»Ich wandere durch seltsame Orte, bis ich es zum ersten Mal abschließe. ――Ich habe einige Ausreißer absichtlich eingesetzt, aber es scheint, dass sie richtig filtern.
Ist es nicht falsch, dies hier zu tun? Bitte zögern Sie nicht zu kommentieren, wenn Sie Fragen haben. Ich bin mir auch nicht sicher, also begrüße ich Sie, es mir zu sagen.
** Fest am 14. Dezember 2016 **
particle.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python-Implementierung des Partikelfilters
"""
import pyximport
pyximport.install()
import resample
import numpy as xp
# import cupy as xp
class GaussianNoiseModel(object):
"""Mehrdimensionale Gaußsche Verteilung
"""
def __init__(self, Cov):
"""Konstrukteur
@param ndarray(n,n) Cov :Verteilte mitverteilte Matrix
"""
self._Cov = Cov
def generate(self, num):
"""Geräuschentwicklung
@param int num :Anzahl der Partikel
@return ndarray(n,num) :Rauschmatrix
"""
n, _ = self._Cov.shape
return xp.random.multivariate_normal(xp.zeros(n), self._Cov, num).T
def logpdf(self, X):
"""Logistische Wahrscheinlichkeitsdichtefunktion
@param ndarray(n,num) X
@param int num :Anzahl der Partikel
@return ndarray(num) :Log Wahrscheinlichkeitsdichte
"""
Cov = self._Cov
k, _ = Cov.shape
det_Cov = xp.linalg.det(Cov)
Cov_inv = xp.linalg.inv(Cov)
coefs = xp.array([ x.dot(Cov_inv).dot(x.T) for x in X.T ])
return -k*xp.log(2.*xp.pi)/2. -xp.log(det_Cov)/2. -coefs/2.
class CauchyNoiseModel(object):
"""Mehrdimensionale unabhängige Cauchy-Verteilung
Mehrdimensionale Cauchy-Verteilung unter der Annahme der Unabhängigkeit für jede Variable
"""
def __init__(self, gma):
"""Konstrukteur
@param ndarray(n) gma :Skalenpopulation
"""
self._gma = gma
def generate(self, num):
gma = self._gma
uni = xp.random.rand(gma.size, num)
Gma = gma.reshape(gma.size,1) * xp.ones(num)
return xp.arctan(uni / Gma) /xp.pi + 1./2.
def logpdf(self, X):
_, num = X.shape
gma = self._gma
Gma = gma.reshape(gma.size,1) * xp.ones(num)
return xp.sum(xp.log(Gma/xp.pi) - xp.log(X**2 + Gma**2), axis=0)
def _normalize(w):
"""Gewichtsnormalisierung
@param ndarray(num) w :Gewicht jedes Partikels
@return ndarray(num) :Normalisiertes Gewicht für jedes Partikel
"""
return w / xp.sum(w)
class ParticleFilter(object):
"""Particle Filter (Partikelfilter)
"""
def __init__(self, f, g, h, t_noise, o_noise, pars_init):
"""Konstrukteur
@param ndarray(nx,num) function( ndarray(nx,num) ) f :Zustandsübergangsfunktion
@param ndarray(nx,num) function( ndarray(nu,num) ) g :Eingangsausbreitungsfunktion
@param ndarray(ny,num) function( ndarray(nx,num) ) h :Beobachtungsfunktion
@param NoiseModel t_noise :Systemrauschmodell
@param NoiseModel o_noise :Beobachtungsrauschmodell
@param ndarray(nx,num) pars_init :Anfangswert der Partikel
"""
self._f = f
self._g = g
self._h = h
self._t_noise = t_noise
self._o_noise = o_noise
_, num = pars_init.shape
self._num = num
self._w = _normalize(xp.ones(num))
self._pars = pars_init
def update(self, y, u):
"""Filter Update
@param ndarray(ny) y :Beobachtungsvektor bei n
@param ndarray(nu) u :Eingabevektor bei n
"""
self._update_pars(u)
self._update_weights(y)
self._resample()
def _update_pars(self, u):
"""Aktualisieren Sie die Partikel gemäß dem Zustandsübergangsmodell
-Zustandsgleichung
x_n = f(x_n-1) + g(u_n-1) + w
-Zustandsvektor x_n
-Eingabevektor u_n
-Zustandsübergangsfunktion f(x)
-Eingangsausbreitungsfunktion g(u)
-Systemrauschen w
@param ndarray(nu) u : n-Eingabevektor bei 1(u_n-1)
"""
U = u.reshape(u.size,1) * xp.ones(self._num)
self._pars = self._f(self._pars) + self._g(U) + self._t_noise.generate(self._num)
def _update_weights(self, y):
"""Berechnen Sie die Wahrscheinlichkeit gemäß dem Beobachtungsmodell
-Beobachtungsgleichung
y_n = h(x_n) + v
-Zustandsvektor x_n
-Beobachtungsvektor y_n
-Beobachtungsfunktion h(x)
-Beobachtungsgeräusch v
@param ndarray(ny) y :Beobachtungsvektor bei n(y_n)
"""
Y = y.reshape(y.size,1) * xp.ones(self._num)
loglh = self._o_noise.logpdf( xp.absolute(Y - self._h(self._pars)) )
self._w = _normalize( xp.exp( xp.log(self._w) + loglh ) )
def _resample(self):
"""Resampling
"""
wcum = xp.cumsum(self._w)
num = self._num
# idxs = [ i for n in xp.sort(xp.random.rand(num))
# for i in range(num) if n <= wcum[i] ]
# start = 0
# idxs = [ 0 for i in xrange(num) ]
# for i, n in enumerate( xp.sort(xp.random.rand(num)) ):
# for j in range(start, num):
# if n <= wcum[j]:
# idxs[i] = start = j
# break
idxs = resample.resample(num, wcum)
self._pars = self._pars[:,idxs]
self._w = _normalize(self._w[idxs])
def estimate(self):
"""Zustandsschätzung
@return ndarray(nx) :Zustandsvektorschätzung
"""
return xp.sum(self._pars * self._w, axis=1)
def particles(self):
"""Partikel
@return ndarray(nx,num)
"""
return self._pars
Ich habe mir hier ein wenig ausgedacht. Wenn Sie Folgendes tun, handelt es sich um eine Doppelschleife, Sie können jedoch fast eine Einzelschleife verarbeiten. Da die Anzahl der Schleifen proportional zur ersten Potenz der Anzahl der Partikel und nicht zum Quadrat ist, gibt es einen großen Geschwindigkeitsunterschied, wenn die Anzahl der Partikel erhöht wird.
resample.pyx
# -*- coding: utf-8 -*-
#cython: boundscheck=False
#cython: wraparound=False
import numpy as xp
cimport numpy as xp
cimport cython
ctypedef xp.float64_t DOUBLE_t
ctypedef xp.int_t INT_t
def resample(int num, xp.ndarray[DOUBLE_t, ndim=1] wcum):
cdef int start, i, j, length
cdef double n
start = 0
cdef xp.ndarray[INT_t, ndim=1] idxs = xp.zeros(num).astype(xp.int)
cdef xp.ndarray[DOUBLE_t, ndim=1] rand = xp.sort(xp.random.rand(num))
length = rand.size
for i in xrange(length):
for j in xrange(start, num):
if rand[i] <= wcum[j]:
idxs[i] = start = j
break
return idxs
resample_setup.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#filename_setup.py
from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext as build_pyx
import numpy
setup(
name = 'resample',
ext_modules = [Extension('resample', ['resample.pyx'])],
cmdclass = { 'build_ext': build_pyx },
include_dirs = [numpy.get_include()],
)
$ cython -a resample.pyx #kompilieren
$ python resample_setup.py build_ext --inplace #Bauen
Cython Es wurde nicht viel schneller, wahrscheinlich weil ich es nicht gut verstand. (Danke für deinen Kommentar)
Wenn Systemrauschen und Beobachtungsrauschen mehrdimensionale unabhängige Cauchy-Verteilung sind. Ausreißerwiderstand wird hinzugefügt. Referenz: Filtern der Bewegungsbahn mithilfe eines selbstorganisierten Zustandsraummodells
py.test_particle.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""particle.py Verwendungsbeispiel
"""
import particle as par
import utils
import numpy as xp
# import cupy as xp
#Parametereinstellungen--------------------
num = 3000 #Anzahl der Partikel
v_sys = 0.01 #Co-Dispersion von Systemrauschen
v_obs = 0.1 #Co-Dispersion des beobachteten Rauschens
v_noise = 0.05
#Anfängliche Partikel
mins = -5. * xp.ones(4)
maxs = +5. * xp.ones(4)
pars_init = utils.rand_uniform(mins, maxs, num)
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#Generierung von Zustandsmodellen(Differenzialmodell für den zweiten Stock)
A = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
f = lambda X: A.dot(X) #Definition der Zustandsübergangsfunktion
B = xp.zeros(1)
g = lambda U: B.dot(U)
C = xp.ones(4) * v_sys
sysn_model = par.CauchyNoiseModel(C)
#Generierung eines Beobachtungsmodells(Direktes Beobachtungsmodell)
D = xp.kron(xp.eye(2), xp.array([1, 0]))
h = lambda X: D.dot(X) #Definition der Beobachtungsfunktion
E = xp.ones(2) * v_obs
obsn_model = par.CauchyNoiseModel(E)
#Erste Handlung
lines = utils.init_plot_particle(dataset, pars_init)
#Erzeugung eines Partikelfilters
pf = par.ParticleFilter(
f, g, h, sysn_model, obsn_model, pars_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
pf.update(y, xp.zeros(1))
state = pf.estimate()
x_est[i,:] = h(state)
#Datenplot
pars = pf.particles()
utils.plot_each_particle(lines, x_est, pars, i)
# utils.save_as_gif_particle(dataset, pars_init, pf, "particle_selforganized.gif")
utiils.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import matplotlib.animation as animation
import matplotlib.pyplot as plt
from collections import namedtuple
import numpy as xp
# import cupy as xp
Dataset = namedtuple("Dataset", ["t", "x", "y"])
def load_data(dirname):
t = xp.loadtxt(dirname + "/t.txt")
x = xp.loadtxt(dirname + "/x.txt")
y = xp.loadtxt(dirname + "/y.txt")
dataset = Dataset(t=t, x=x, y=y)
return dataset
def save_data(dirname, dataset):
xp.savetxt(dirname + "/t.txt", dataset.t)
xp.savetxt(dirname + "/x.txt", dataset.x)
xp.savetxt(dirname + "/y.txt", dataset.y)
def generate_data(dirname):
# truth
t = xp.arange(0, 6*xp.pi, 0.05)
x = xp.c_[t*xp.cos(t), t*xp.sin(t)]
# noise
nr, nc = x.shape
idx = xp.random.randint(nr/2, nr, 5)
n = xp.random.normal(0., xp.sqrt(v_noise), x.shape)
n[nr/3:2*nr/3,:] = xp.random.normal(0., xp.sqrt(v_noise*10), (2*nr/3-nr/3,nc))
n[idx,:] += xp.random.normal(0., xp.sqrt(v_noise)*20, (5,nc))
# observation
y = x + n
x_est = xp.zeros(x.shape)
Dataset = namedtuple("Dataset", ["t", "x", "y"])
dataset = Dataset(t=t, x=x, y=y)
save_data(dirname, dataset)
return dataset
def init_plot_particle(dataset, pars):
fig, ax = plt.subplots(1, 1)
ax.hold(True)
ax.axis("equal")
lines = []
lines.append( ax.plot(pars[0,:], pars[2,:], "o") )
lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
lines.append( ax.plot([0], [0]) )
plt.legend(["particles", "truth", "observed", "estimated"])
plt.xlabel("x")
plt.ylabel("y")
plt.grid(True)
return lines
def plot_each_particle(lines, x_est, pars, i):
lines[0][0].set_data(pars[0,:], pars[2,:])
lines[3][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
plt.pause(1e-5)
def save_as_gif_particle(dataset, pars_init, pf, filename):
x_est = xp.zeros(dataset.x.shape)
lines = init_plot_particle(dataset, pars_init, x_est)
def draw(i):
pf.update(dataset.y[i])
state = pf.estimate()
x_est[i,:] = [state[0], state[2]]
#Datenplot
pars = pf.particles()
plot_each_particle(lines, x_est, pars, i)
ny, _ = dataset.y.shape
ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
ani.save(filename, writer="imagemagick")
def init_plot_kalman(dataset):
fig, ax = plt.subplots(1, 1)
ax.hold(True)
ax.axis("equal")
lines = []
lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
lines.append( ax.plot([0], [0]) )
plt.legend(["truth", "observed", "estimated"])
plt.xlabel("x")
plt.ylabel("y")
plt.grid(True)
return lines
def plot_each_kalman(lines, x_est, i):
lines[2][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
plt.pause(1e-5)
def save_as_gif_kalman(dataset, kf, filename):
x_est = xp.zeros(dataset.x.shape)
lines = init_plot_kalman(dataset, x_est)
def draw(i):
kf.update(dataset.y[i])
state = pf.estimate()
x_est[i,:] = [state[0], state[2]]
#Datenplot
plot_each_kalman(lines, x_est, i)
ny, _ = dataset.y.shape
ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
ani.save(filename, writer="imagemagick")
def rand_uniform(mins, maxs, num):
"""Zur Erzeugung von Ausgangspartikeln
@param ndarray(nx) mins :Jeder Mindestwert des Partikels
@param ndarray(nx) maxs :Jeder Maximalwert des Partikels
@param int num :Anzahl der Partikel
@return ndarray(nx,num) :Anfängliche Partikel
"""
nx = mins.size
return (mins.reshape(nx, 1) * xp.ones(num)
+ (maxs - mins).reshape(nx, 1) * xp.random.rand(nx, num))
kalman.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python-Implementierung des Kalman-Filters
"""
import numpy as xp
# import cupy as xp
class KalmanFilter(object):
def __init__(self, F, H, Q, R, x_init, P_init):
self._x = x_init.reshape(-1,1)
self._P = P_init
self._update_x = lambda x : F.dot(x)
self._update_P = lambda P : Q + F.dot(P).dot(F.T)
self._error = lambda x,y : y - H.dot(x)
self._cov_P = lambda P : R + H.dot(P).dot(H.T)
self._kalman_gain = lambda P,S : P.dot(H.T).dot(xp.linalg.inv(S))
self._estimate_x = lambda x,K,e : x + K.dot(e)
self._estimate_P = lambda P,K : (xp.eye(*P.shape)-K.dot(H)).dot(P)
def update(self, y):
#Prognose
x_predicted = self._update_x(self._x)
P_predicted = self._update_P(self._P)
#Parameterberechnung
e = self._error(x_predicted, y.reshape(-1,1))
S = self._cov_P(P_predicted)
K = self._kalman_gain(P_predicted, S)
#aktualisieren
self._x = self._estimate_x(x_predicted, K, e)
self._P = self._estimate_P(P_predicted, K)
def estimate(self):
return self._x
test_kalman.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""kalman.py Verwendungsbeispiel
"""
import kalman
import utils
import numpy as xp
# import cupy as xp
#Parametereinstellungen--------------------
var_Q = 0.01
var_R = 1
x_init = xp.zeros(4)
P_init = xp.ones(4)
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#Generierung von Zustandsmodellen(Differenzialmodell für den zweiten Stock)
F = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
Q = var_Q * xp.eye(4)
#Generierung eines Beobachtungsmodells(Direktes Beobachtungsmodell)
H = xp.kron(xp.eye(2), xp.array([1, 0]))
R = var_R * xp.eye(2)
#Erste Handlung
lines = utils.init_plot_kalman(dataset)
kf = kalman.KalmanFilter(F, H, Q, R, x_init, P_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
kf.update(y)
state = kf.estimate()
x_est[i,:] = H.dot(state.reshape(-1,1)).T
#Datenplot
utils.plot_each_kalman(lines, x_est, i)
# utils.save_as_gif_kalman(dataset, kf, "kalman.gif")
Recommended Posts