It's a common one, but I created it so I'll leave it. I will add a commentary when I feel like it.
――I'm wandering around strange places until I lock it for the first time. ――I put some outliers on purpose, but it seems that they are filtering properly.
Isn't it wrong to do this here? Please feel free to comment if you have any questions. Also, I'm not sure, so I welcome you to tell me.
** 2016/12/14 amendment ** --Rewritten that the source was messed up. I feel like it's relatively refreshing --It seems that turning numpy with for is slow, so I rewrote resampling with cython (not much different) --I tried to run it with cupy, but I couldn't install it properly and died safely (it seems that CUDA didn't work, not the GPU made by NVIDIA in the first place. Unfortunately) --Added Kalman filter as a bonus
--The likelihood calculation when updating the weight is returned to exp after taking the sum of the log-likelihoods. This is to prevent overflow when the exponents are accumulated.
--For resampling, first generate a list of indexes ʻidxs, and then re-spread it all at once with
pars [:, idxs]`.
particle.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python implementation of particle filter
"""
import pyximport
pyximport.install()
import resample
import numpy as xp
# import cupy as xp
class GaussianNoiseModel(object):
"""Multidimensional Gaussian distribution
"""
def __init__(self, Cov):
"""constructor
@param ndarray(n,n) Cov :Covariance matrix
"""
self._Cov = Cov
def generate(self, num):
"""Noise generation
@param int num :Number of particles
@return ndarray(n,num) :Noise matrix
"""
n, _ = self._Cov.shape
return xp.random.multivariate_normal(xp.zeros(n), self._Cov, num).T
def logpdf(self, X):
"""Logarithmic probability density function
@param ndarray(n,num) X
@param int num :Number of particles
@return ndarray(num) :Logarithmic probability density
"""
Cov = self._Cov
k, _ = Cov.shape
det_Cov = xp.linalg.det(Cov)
Cov_inv = xp.linalg.inv(Cov)
coefs = xp.array([ x.dot(Cov_inv).dot(x.T) for x in X.T ])
return -k*xp.log(2.*xp.pi)/2. -xp.log(det_Cov)/2. -coefs/2.
class CauchyNoiseModel(object):
"""Multidimensional independent Cauchy distribution
Multidimensional Cauchy distribution assuming independence for each variable
"""
def __init__(self, gma):
"""constructor
@param ndarray(n) gma :Scale parameter
"""
self._gma = gma
def generate(self, num):
gma = self._gma
uni = xp.random.rand(gma.size, num)
Gma = gma.reshape(gma.size,1) * xp.ones(num)
return xp.arctan(uni / Gma) /xp.pi + 1./2.
def logpdf(self, X):
_, num = X.shape
gma = self._gma
Gma = gma.reshape(gma.size,1) * xp.ones(num)
return xp.sum(xp.log(Gma/xp.pi) - xp.log(X**2 + Gma**2), axis=0)
def _normalize(w):
"""Weight normalization
@param ndarray(num) w :Weight of each particle
@return ndarray(num) :Normalized weight for each particle
"""
return w / xp.sum(w)
class ParticleFilter(object):
"""Particle Filter (Particle filter)
"""
def __init__(self, f, g, h, t_noise, o_noise, pars_init):
"""constructor
@param ndarray(nx,num) function( ndarray(nx,num) ) f :State transition function
@param ndarray(nx,num) function( ndarray(nu,num) ) g :Input propagation function
@param ndarray(ny,num) function( ndarray(nx,num) ) h :Observation function
@param NoiseModel t_noise :System noise model
@param NoiseModel o_noise :Observation noise model
@param ndarray(nx,num) pars_init :Initial value of particles
"""
self._f = f
self._g = g
self._h = h
self._t_noise = t_noise
self._o_noise = o_noise
_, num = pars_init.shape
self._num = num
self._w = _normalize(xp.ones(num))
self._pars = pars_init
def update(self, y, u):
"""Filter update
@param ndarray(ny) y :Observation vector at n
@param ndarray(nu) u :Input vector at n
"""
self._update_pars(u)
self._update_weights(y)
self._resample()
def _update_pars(self, u):
"""Update particles according to state transition model
-Equation of state
x_n = f(x_n-1) + g(u_n-1) + w
-State vector x_n
-Input vector u_n
-State transition function f(x)
-Input propagation function g(u)
-System noise w
@param ndarray(nu) u : n-Input vector at 1(u_n-1)
"""
U = u.reshape(u.size,1) * xp.ones(self._num)
self._pars = self._f(self._pars) + self._g(U) + self._t_noise.generate(self._num)
def _update_weights(self, y):
"""Calculate the likelihood according to the observation model
-Observation equation
y_n = h(x_n) + v
-State vector x_n
-Observation vector y_n
-Observation function h(x)
-Observation noise v
@param ndarray(ny) y :Observation vector at n(y_n)
"""
Y = y.reshape(y.size,1) * xp.ones(self._num)
loglh = self._o_noise.logpdf( xp.absolute(Y - self._h(self._pars)) )
self._w = _normalize( xp.exp( xp.log(self._w) + loglh ) )
def _resample(self):
"""Resampling
"""
wcum = xp.cumsum(self._w)
num = self._num
# idxs = [ i for n in xp.sort(xp.random.rand(num))
# for i in range(num) if n <= wcum[i] ]
# start = 0
# idxs = [ 0 for i in xrange(num) ]
# for i, n in enumerate( xp.sort(xp.random.rand(num)) ):
# for j in range(start, num):
# if n <= wcum[j]:
# idxs[i] = start = j
# break
idxs = resample.resample(num, wcum)
self._pars = self._pars[:,idxs]
self._w = _normalize(self._w[idxs])
def estimate(self):
"""State estimation
@return ndarray(nx) :State vector estimate
"""
return xp.sum(self._pars * self._w, axis=1)
def particles(self):
"""particle
@return ndarray(nx,num)
"""
return self._pars
I devised a little here. If you do the following, it will be a double loop, but you can process almost a single loop. Since the number of loops is proportional to the first power of the number of particles, not the square, there is a big difference in speed as the number of particles is increased.
-Generate a uniform random number $ n_i $ of $ [0, 1] $ and sort in ascending order --Take a random number $ n_i $ --Take the cumulative weight $ w_j $ --If the random number $ n_i $ is larger than the cumulative weight $ w_j $, move to the next weight $ w_ {j + 1} $. --If it is small, the index $ j $ is adopted-> Move to the next random number $ n_ {i + 1} $ --Because the next random number $ n_ {i + 1} $ is $ w_ {j-1} <n_i <n_ {i + 1} $, you can start by comparing it with $ w_j $.
resample.pyx
# -*- coding: utf-8 -*-
#cython: boundscheck=False
#cython: wraparound=False
import numpy as xp
cimport numpy as xp
cimport cython
ctypedef xp.float64_t DOUBLE_t
ctypedef xp.int_t INT_t
def resample(int num, xp.ndarray[DOUBLE_t, ndim=1] wcum):
cdef int start, i, j, length
cdef double n
start = 0
cdef xp.ndarray[INT_t, ndim=1] idxs = xp.zeros(num).astype(xp.int)
cdef xp.ndarray[DOUBLE_t, ndim=1] rand = xp.sort(xp.random.rand(num))
length = rand.size
for i in xrange(length):
for j in xrange(start, num):
if rand[i] <= wcum[j]:
idxs[i] = start = j
break
return idxs
resample_setup.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#filename_setup.py
from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext as build_pyx
import numpy
setup(
name = 'resample',
ext_modules = [Extension('resample', ['resample.pyx'])],
cmdclass = { 'build_ext': build_pyx },
include_dirs = [numpy.get_include()],
)
$ cython -a resample.pyx #compile
$ python resample_setup.py build_ext --inplace #Build
Cython It didn't get much faster, probably because I didn't understand it well. (Thank you for your comment)
When system noise and observation noise are multidimensional independent Cauchy distribution. Outlier resistance is added. Reference: Filtering of motion trajectory using self-organized state space model
py.test_particle.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""particle.py usage sample
"""
import particle as par
import utils
import numpy as xp
# import cupy as xp
#parameter settings--------------------
num = 3000 #Number of particles
v_sys = 0.01 #Covariance of system noise
v_obs = 0.1 #Covariance of observed noise
v_noise = 0.05
#Initial particles
mins = -5. * xp.ones(4)
maxs = +5. * xp.ones(4)
pars_init = utils.rand_uniform(mins, maxs, num)
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#State model generation(Secondary floor difference model)
A = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
f = lambda X: A.dot(X) #Definition of state transition function
B = xp.zeros(1)
g = lambda U: B.dot(U)
C = xp.ones(4) * v_sys
sysn_model = par.CauchyNoiseModel(C)
#Generation of observation model(Direct observation model)
D = xp.kron(xp.eye(2), xp.array([1, 0]))
h = lambda X: D.dot(X) #Definition of observation function
E = xp.ones(2) * v_obs
obsn_model = par.CauchyNoiseModel(E)
#Initial plot
lines = utils.init_plot_particle(dataset, pars_init)
#Generation of particle filter
pf = par.ParticleFilter(
f, g, h, sysn_model, obsn_model, pars_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
pf.update(y, xp.zeros(1))
state = pf.estimate()
x_est[i,:] = h(state)
#Data plot
pars = pf.particles()
utils.plot_each_particle(lines, x_est, pars, i)
# utils.save_as_gif_particle(dataset, pars_init, pf, "particle_selforganized.gif")
utiils.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import matplotlib.animation as animation
import matplotlib.pyplot as plt
from collections import namedtuple
import numpy as xp
# import cupy as xp
Dataset = namedtuple("Dataset", ["t", "x", "y"])
def load_data(dirname):
t = xp.loadtxt(dirname + "/t.txt")
x = xp.loadtxt(dirname + "/x.txt")
y = xp.loadtxt(dirname + "/y.txt")
dataset = Dataset(t=t, x=x, y=y)
return dataset
def save_data(dirname, dataset):
xp.savetxt(dirname + "/t.txt", dataset.t)
xp.savetxt(dirname + "/x.txt", dataset.x)
xp.savetxt(dirname + "/y.txt", dataset.y)
def generate_data(dirname):
# truth
t = xp.arange(0, 6*xp.pi, 0.05)
x = xp.c_[t*xp.cos(t), t*xp.sin(t)]
# noise
nr, nc = x.shape
idx = xp.random.randint(nr/2, nr, 5)
n = xp.random.normal(0., xp.sqrt(v_noise), x.shape)
n[nr/3:2*nr/3,:] = xp.random.normal(0., xp.sqrt(v_noise*10), (2*nr/3-nr/3,nc))
n[idx,:] += xp.random.normal(0., xp.sqrt(v_noise)*20, (5,nc))
# observation
y = x + n
x_est = xp.zeros(x.shape)
Dataset = namedtuple("Dataset", ["t", "x", "y"])
dataset = Dataset(t=t, x=x, y=y)
save_data(dirname, dataset)
return dataset
def init_plot_particle(dataset, pars):
fig, ax = plt.subplots(1, 1)
ax.hold(True)
ax.axis("equal")
lines = []
lines.append( ax.plot(pars[0,:], pars[2,:], "o") )
lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
lines.append( ax.plot([0], [0]) )
plt.legend(["particles", "truth", "observed", "estimated"])
plt.xlabel("x")
plt.ylabel("y")
plt.grid(True)
return lines
def plot_each_particle(lines, x_est, pars, i):
lines[0][0].set_data(pars[0,:], pars[2,:])
lines[3][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
plt.pause(1e-5)
def save_as_gif_particle(dataset, pars_init, pf, filename):
x_est = xp.zeros(dataset.x.shape)
lines = init_plot_particle(dataset, pars_init, x_est)
def draw(i):
pf.update(dataset.y[i])
state = pf.estimate()
x_est[i,:] = [state[0], state[2]]
#Data plot
pars = pf.particles()
plot_each_particle(lines, x_est, pars, i)
ny, _ = dataset.y.shape
ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
ani.save(filename, writer="imagemagick")
def init_plot_kalman(dataset):
fig, ax = plt.subplots(1, 1)
ax.hold(True)
ax.axis("equal")
lines = []
lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
lines.append( ax.plot([0], [0]) )
plt.legend(["truth", "observed", "estimated"])
plt.xlabel("x")
plt.ylabel("y")
plt.grid(True)
return lines
def plot_each_kalman(lines, x_est, i):
lines[2][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
plt.pause(1e-5)
def save_as_gif_kalman(dataset, kf, filename):
x_est = xp.zeros(dataset.x.shape)
lines = init_plot_kalman(dataset, x_est)
def draw(i):
kf.update(dataset.y[i])
state = pf.estimate()
x_est[i,:] = [state[0], state[2]]
#Data plot
plot_each_kalman(lines, x_est, i)
ny, _ = dataset.y.shape
ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
ani.save(filename, writer="imagemagick")
def rand_uniform(mins, maxs, num):
"""For the generation of initial particles
@param ndarray(nx) mins :Each minimum value of the particle
@param ndarray(nx) maxs :Each maximum value of the particle
@param int num :Number of particles
@return ndarray(nx,num) :Initial particles
"""
nx = mins.size
return (mins.reshape(nx, 1) * xp.ones(num)
+ (maxs - mins).reshape(nx, 1) * xp.random.rand(nx, num))
kalman.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python implementation of Kalman filter
"""
import numpy as xp
# import cupy as xp
class KalmanFilter(object):
def __init__(self, F, H, Q, R, x_init, P_init):
self._x = x_init.reshape(-1,1)
self._P = P_init
self._update_x = lambda x : F.dot(x)
self._update_P = lambda P : Q + F.dot(P).dot(F.T)
self._error = lambda x,y : y - H.dot(x)
self._cov_P = lambda P : R + H.dot(P).dot(H.T)
self._kalman_gain = lambda P,S : P.dot(H.T).dot(xp.linalg.inv(S))
self._estimate_x = lambda x,K,e : x + K.dot(e)
self._estimate_P = lambda P,K : (xp.eye(*P.shape)-K.dot(H)).dot(P)
def update(self, y):
#Forecast
x_predicted = self._update_x(self._x)
P_predicted = self._update_P(self._P)
#Parameter calculation
e = self._error(x_predicted, y.reshape(-1,1))
S = self._cov_P(P_predicted)
K = self._kalman_gain(P_predicted, S)
#update
self._x = self._estimate_x(x_predicted, K, e)
self._P = self._estimate_P(P_predicted, K)
def estimate(self):
return self._x
test_kalman.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""kalman.py usage sample
"""
import kalman
import utils
import numpy as xp
# import cupy as xp
#parameter settings--------------------
var_Q = 0.01
var_R = 1
x_init = xp.zeros(4)
P_init = xp.ones(4)
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#State model generation(Secondary floor difference model)
F = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
Q = var_Q * xp.eye(4)
#Generation of observation model(Direct observation model)
H = xp.kron(xp.eye(2), xp.array([1, 0]))
R = var_R * xp.eye(2)
#Initial plot
lines = utils.init_plot_kalman(dataset)
kf = kalman.KalmanFilter(F, H, Q, R, x_init, P_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
kf.update(y)
state = kf.estimate()
x_est[i,:] = H.dot(state.reshape(-1,1)).T
#Data plot
utils.plot_each_kalman(lines, x_est, i)
# utils.save_as_gif_kalman(dataset, kf, "kalman.gif")
Recommended Posts