** 2016/12/14 amendment ** --Rewritten that the source was messed up. I feel like it's relatively refreshing --It seems that turning numpy with for is slow, so I rewrote resampling with cython (not much different) --I tried to run it with cupy, but I couldn't install it properly and died safely (it seems that CUDA didn't work, not the GPU made by NVIDIA in the first place. Unfortunately) --Added Kalman filter as a bonus
--The likelihood calculation when updating the weight is returned to exp after taking the sum of the log-likelihoods. This is to prevent overflow when the exponents are accumulated.
--For resampling, first generate a list of indexes ʻidxs, and then re-spread it all at once with
pars [:, idxs]`.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python implementation of particle filter
import pyximport
import resample
import numpy as xp
# import cupy as xp
class GaussianNoiseModel(object):
"""Multidimensional Gaussian distribution
def __init__(self, Cov):
@param ndarray(n,n) Cov :Covariance matrix
self._Cov = Cov
def generate(self, num):
"""Noise generation
@param int num :Number of particles
@return ndarray(n,num) :Noise matrix
n, _ = self._Cov.shape
return xp.random.multivariate_normal(xp.zeros(n), self._Cov, num).T
def logpdf(self, X):
"""Logarithmic probability density function
@param ndarray(n,num) X
@param int num :Number of particles
@return ndarray(num) :Logarithmic probability density
Cov = self._Cov
k, _ = Cov.shape
det_Cov = xp.linalg.det(Cov)
Cov_inv = xp.linalg.inv(Cov)
coefs = xp.array([ for x in X.T ])
return -k*xp.log(2.*xp.pi)/2. -xp.log(det_Cov)/2. -coefs/2.
class CauchyNoiseModel(object):
"""Multidimensional independent Cauchy distribution
Multidimensional Cauchy distribution assuming independence for each variable
def __init__(self, gma):
@param ndarray(n) gma :Scale parameter
self._gma = gma
def generate(self, num):
gma = self._gma
uni = xp.random.rand(gma.size, num)
Gma = gma.reshape(gma.size,1) * xp.ones(num)
return xp.arctan(uni / Gma) /xp.pi + 1./2.
def logpdf(self, X):
_, num = X.shape
gma = self._gma
Gma = gma.reshape(gma.size,1) * xp.ones(num)
return xp.sum(xp.log(Gma/xp.pi) - xp.log(X**2 + Gma**2), axis=0)
def _normalize(w):
"""Weight normalization
@param ndarray(num) w :Weight of each particle
@return ndarray(num) :Normalized weight for each particle
return w / xp.sum(w)
class ParticleFilter(object):
"""Particle Filter (Particle filter)
def __init__(self, f, g, h, t_noise, o_noise, pars_init):
@param ndarray(nx,num) function( ndarray(nx,num) ) f :State transition function
@param ndarray(nx,num) function( ndarray(nu,num) ) g :Input propagation function
@param ndarray(ny,num) function( ndarray(nx,num) ) h :Observation function
@param NoiseModel t_noise :System noise model
@param NoiseModel o_noise :Observation noise model
@param ndarray(nx,num) pars_init :Initial value of particles
self._f = f
self._g = g
self._h = h
self._t_noise = t_noise
self._o_noise = o_noise
_, num = pars_init.shape
self._num = num
self._w = _normalize(xp.ones(num))
self._pars = pars_init
def update(self, y, u):
"""Filter update
@param ndarray(ny) y :Observation vector at n
@param ndarray(nu) u :Input vector at n
def _update_pars(self, u):
"""Update particles according to state transition model
-Equation of state
x_n = f(x_n-1) + g(u_n-1) + w
-State vector x_n
-Input vector u_n
-State transition function f(x)
-Input propagation function g(u)
-System noise w
@param ndarray(nu) u : n-Input vector at 1(u_n-1)
U = u.reshape(u.size,1) * xp.ones(self._num)
self._pars = self._f(self._pars) + self._g(U) + self._t_noise.generate(self._num)
def _update_weights(self, y):
"""Calculate the likelihood according to the observation model
-Observation equation
y_n = h(x_n) + v
-State vector x_n
-Observation vector y_n
-Observation function h(x)
-Observation noise v
@param ndarray(ny) y :Observation vector at n(y_n)
Y = y.reshape(y.size,1) * xp.ones(self._num)
loglh = self._o_noise.logpdf( xp.absolute(Y - self._h(self._pars)) )
self._w = _normalize( xp.exp( xp.log(self._w) + loglh ) )
def _resample(self):
wcum = xp.cumsum(self._w)
num = self._num
# idxs = [ i for n in xp.sort(xp.random.rand(num))
# for i in range(num) if n <= wcum[i] ]
# start = 0
# idxs = [ 0 for i in xrange(num) ]
# for i, n in enumerate( xp.sort(xp.random.rand(num)) ):
# for j in range(start, num):
# if n <= wcum[j]:
# idxs[i] = start = j
# break
idxs = resample.resample(num, wcum)
self._pars = self._pars[:,idxs]
self._w = _normalize(self._w[idxs])
def estimate(self):
"""State estimation
@return ndarray(nx) :State vector estimate
return xp.sum(self._pars * self._w, axis=1)
def particles(self):
@return ndarray(nx,num)
return self._pars
I devised a little here. If you do the following, it will be a double loop, but you can process almost a single loop. Since the number of loops is proportional to the first power of the number of particles, not the square, there is a big difference in speed as the number of particles is increased.
-Generate a uniform random number $ n_i $ of $ [0, 1] $ and sort in ascending order --Take a random number $ n_i $ --Take the cumulative weight $ w_j $ --If the random number $ n_i $ is larger than the cumulative weight $ w_j $, move to the next weight $ w_ {j + 1} $. --If it is small, the index $ j $ is adopted-> Move to the next random number $ n_ {i + 1} $ --Because the next random number $ n_ {i + 1} $ is $ w_ {j-1} <n_i <n_ {i + 1} $, you can start by comparing it with $ w_j $.
# -*- coding: utf-8 -*-
#cython: boundscheck=False
#cython: wraparound=False
import numpy as xp
cimport numpy as xp
cimport cython
ctypedef xp.float64_t DOUBLE_t
ctypedef xp.int_t INT_t
def resample(int num, xp.ndarray[DOUBLE_t, ndim=1] wcum):
cdef int start, i, j, length
cdef double n
start = 0
cdef xp.ndarray[INT_t, ndim=1] idxs = xp.zeros(num).astype(
cdef xp.ndarray[DOUBLE_t, ndim=1] rand = xp.sort(xp.random.rand(num))
length = rand.size
for i in xrange(length):
for j in xrange(start, num):
if rand[i] <= wcum[j]:
idxs[i] = start = j
return idxs
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext as build_pyx
import numpy
name = 'resample',
ext_modules = [Extension('resample', ['resample.pyx'])],
cmdclass = { 'build_ext': build_pyx },
include_dirs = [numpy.get_include()],
$ cython -a resample.pyx #compile
$ python build_ext --inplace #Build
Cython It didn't get much faster, probably because I didn't understand it well. (Thank you for your comment)
When system noise and observation noise are multidimensional independent Cauchy distribution. Outlier resistance is added. Reference: Filtering of motion trajectory using self-organized state space model
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" usage sample
import particle as par
import utils
import numpy as xp
# import cupy as xp
#parameter settings--------------------
num = 3000 #Number of particles
v_sys = 0.01 #Covariance of system noise
v_obs = 0.1 #Covariance of observed noise
v_noise = 0.05
#Initial particles
mins = -5. * xp.ones(4)
maxs = +5. * xp.ones(4)
pars_init = utils.rand_uniform(mins, maxs, num)
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#State model generation(Secondary floor difference model)
A = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
f = lambda X: #Definition of state transition function
B = xp.zeros(1)
g = lambda U:
C = xp.ones(4) * v_sys
sysn_model = par.CauchyNoiseModel(C)
#Generation of observation model(Direct observation model)
D = xp.kron(xp.eye(2), xp.array([1, 0]))
h = lambda X: #Definition of observation function
E = xp.ones(2) * v_obs
obsn_model = par.CauchyNoiseModel(E)
#Initial plot
lines = utils.init_plot_particle(dataset, pars_init)
#Generation of particle filter
pf = par.ParticleFilter(
f, g, h, sysn_model, obsn_model, pars_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
pf.update(y, xp.zeros(1))
state = pf.estimate()
x_est[i,:] = h(state)
#Data plot
pars = pf.particles()
utils.plot_each_particle(lines, x_est, pars, i)
# utils.save_as_gif_particle(dataset, pars_init, pf, "particle_selforganized.gif")
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import matplotlib.animation as animation
import matplotlib.pyplot as plt
from collections import namedtuple
import numpy as xp
# import cupy as xp
Dataset = namedtuple("Dataset", ["t", "x", "y"])
def load_data(dirname):
t = xp.loadtxt(dirname + "/t.txt")
x = xp.loadtxt(dirname + "/x.txt")
y = xp.loadtxt(dirname + "/y.txt")
dataset = Dataset(t=t, x=x, y=y)
return dataset
def save_data(dirname, dataset):
xp.savetxt(dirname + "/t.txt", dataset.t)
xp.savetxt(dirname + "/x.txt", dataset.x)
xp.savetxt(dirname + "/y.txt", dataset.y)
def generate_data(dirname):
# truth
t = xp.arange(0, 6*xp.pi, 0.05)
x = xp.c_[t*xp.cos(t), t*xp.sin(t)]
# noise
nr, nc = x.shape
idx = xp.random.randint(nr/2, nr, 5)
n = xp.random.normal(0., xp.sqrt(v_noise), x.shape)
n[nr/3:2*nr/3,:] = xp.random.normal(0., xp.sqrt(v_noise*10), (2*nr/3-nr/3,nc))
n[idx,:] += xp.random.normal(0., xp.sqrt(v_noise)*20, (5,nc))
# observation
y = x + n
x_est = xp.zeros(x.shape)
Dataset = namedtuple("Dataset", ["t", "x", "y"])
dataset = Dataset(t=t, x=x, y=y)
save_data(dirname, dataset)
return dataset
def init_plot_particle(dataset, pars):
fig, ax = plt.subplots(1, 1)
lines = []
lines.append( ax.plot(pars[0,:], pars[2,:], "o") )
lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
lines.append( ax.plot([0], [0]) )
plt.legend(["particles", "truth", "observed", "estimated"])
return lines
def plot_each_particle(lines, x_est, pars, i):
lines[0][0].set_data(pars[0,:], pars[2,:])
lines[3][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
def save_as_gif_particle(dataset, pars_init, pf, filename):
x_est = xp.zeros(dataset.x.shape)
lines = init_plot_particle(dataset, pars_init, x_est)
def draw(i):
state = pf.estimate()
x_est[i,:] = [state[0], state[2]]
#Data plot
pars = pf.particles()
plot_each_particle(lines, x_est, pars, i)
ny, _ = dataset.y.shape
ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False), writer="imagemagick")
def init_plot_kalman(dataset):
fig, ax = plt.subplots(1, 1)
lines = []
lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
lines.append( ax.plot([0], [0]) )
plt.legend(["truth", "observed", "estimated"])
return lines
def plot_each_kalman(lines, x_est, i):
lines[2][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
def save_as_gif_kalman(dataset, kf, filename):
x_est = xp.zeros(dataset.x.shape)
lines = init_plot_kalman(dataset, x_est)
def draw(i):
state = pf.estimate()
x_est[i,:] = [state[0], state[2]]
#Data plot
plot_each_kalman(lines, x_est, i)
ny, _ = dataset.y.shape
ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False), writer="imagemagick")
def rand_uniform(mins, maxs, num):
"""For the generation of initial particles
@param ndarray(nx) mins :Each minimum value of the particle
@param ndarray(nx) maxs :Each maximum value of the particle
@param int num :Number of particles
@return ndarray(nx,num) :Initial particles
nx = mins.size
return (mins.reshape(nx, 1) * xp.ones(num)
+ (maxs - mins).reshape(nx, 1) * xp.random.rand(nx, num))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python implementation of Kalman filter
import numpy as xp
# import cupy as xp
class KalmanFilter(object):
def __init__(self, F, H, Q, R, x_init, P_init):
self._x = x_init.reshape(-1,1)
self._P = P_init
self._update_x = lambda x :
self._update_P = lambda P : Q +
self._error = lambda x,y : y -
self._cov_P = lambda P : R +
self._kalman_gain = lambda P,S :
self._estimate_x = lambda x,K,e : x +
self._estimate_P = lambda P,K : (xp.eye(*P.shape)
def update(self, y):
x_predicted = self._update_x(self._x)
P_predicted = self._update_P(self._P)
#Parameter calculation
e = self._error(x_predicted, y.reshape(-1,1))
S = self._cov_P(P_predicted)
K = self._kalman_gain(P_predicted, S)
self._x = self._estimate_x(x_predicted, K, e)
self._P = self._estimate_P(P_predicted, K)
def estimate(self):
return self._x
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" usage sample
import kalman
import utils
import numpy as xp
# import cupy as xp
#parameter settings--------------------
var_Q = 0.01
var_R = 1
x_init = xp.zeros(4)
P_init = xp.ones(4)
# ---------------------------------
dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")
#State model generation(Secondary floor difference model)
F = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
Q = var_Q * xp.eye(4)
#Generation of observation model(Direct observation model)
H = xp.kron(xp.eye(2), xp.array([1, 0]))
R = var_R * xp.eye(2)
#Initial plot
lines = utils.init_plot_kalman(dataset)
kf = kalman.KalmanFilter(F, H, Q, R, x_init, P_init)
x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
state = kf.estimate()
x_est[i,:] =,1)).T
#Data plot
utils.plot_each_kalman(lines, x_est, i)
# utils.save_as_gif_kalman(dataset, kf, "kalman.gif")
