Python implementation of particle filters

Introduction

It's a common one, but I created it so I'll leave it. I will add a commentary when I feel like it.

――I'm wandering around strange places until I lock it for the first time. ――I put some outliers on purpose, but it seems that they are filtering properly.

sample.gif

Isn't it wrong to do this here? Please feel free to comment if you have any questions. Also, I'm not sure, so I welcome you to tell me.

** 2016/12/14 amendment ** --Rewritten that the source was messed up. I feel like it's relatively refreshing --It seems that turning numpy with for is slow, so I rewrote resampling with cython (not much different) --I tried to run it with cupy, but I couldn't install it properly and died safely (it seems that CUDA didn't work, not the GPU made by NVIDIA in the first place. Unfortunately) --Added Kalman filter as a bonus

Source overview

Particle filter

Kalman filter (bonus)

Source code

Particle filter body

--The likelihood calculation when updating the weight is returned to exp after taking the sum of the log-likelihoods. This is to prevent overflow when the exponents are accumulated. --For resampling, first generate a list of indexes ʻidxs, and then re-spread it all at once with pars [:, idxs]`.

particle.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Python implementation of particle filter
"""

import pyximport
pyximport.install()
import resample
import numpy as xp
# import cupy as xp


class GaussianNoiseModel(object):
    """Multidimensional Gaussian distribution
    """

    def __init__(self, Cov):
        """constructor

        @param ndarray(n,n) Cov :Covariance matrix
        """
        self._Cov = Cov

    def generate(self, num):
        """Noise generation

        @param  int num :Number of particles
        @return ndarray(n,num) :Noise matrix
        """
        n, _ = self._Cov.shape
        return xp.random.multivariate_normal(xp.zeros(n), self._Cov, num).T

    def logpdf(self, X):
        """Logarithmic probability density function

        @param  ndarray(n,num) X
        @param  int num :Number of particles
        @return ndarray(num) :Logarithmic probability density
        """
        Cov = self._Cov
        k, _ = Cov.shape
        det_Cov = xp.linalg.det(Cov)
        Cov_inv = xp.linalg.inv(Cov)
        coefs = xp.array([ x.dot(Cov_inv).dot(x.T) for x in X.T ])
        return -k*xp.log(2.*xp.pi)/2. -xp.log(det_Cov)/2. -coefs/2.


class CauchyNoiseModel(object):
    """Multidimensional independent Cauchy distribution

Multidimensional Cauchy distribution assuming independence for each variable
    """

    def __init__(self, gma):
        """constructor

        @param ndarray(n) gma :Scale parameter
        """
        self._gma = gma

    def generate(self, num):
        gma = self._gma
        uni = xp.random.rand(gma.size, num)
        Gma = gma.reshape(gma.size,1) * xp.ones(num)
        return xp.arctan(uni / Gma) /xp.pi + 1./2.

    def logpdf(self, X):
        _, num = X.shape
        gma = self._gma
        Gma = gma.reshape(gma.size,1) * xp.ones(num)
        return xp.sum(xp.log(Gma/xp.pi) - xp.log(X**2 + Gma**2), axis=0)


def _normalize(w):
    """Weight normalization

    @param  ndarray(num) w :Weight of each particle
    @return ndarray(num) :Normalized weight for each particle
    """
    return w / xp.sum(w)


class ParticleFilter(object):
    """Particle Filter (Particle filter)
    """

    def __init__(self, f, g, h, t_noise, o_noise, pars_init):
        """constructor

        @param ndarray(nx,num) function( ndarray(nx,num) ) f :State transition function
        @param ndarray(nx,num) function( ndarray(nu,num) ) g :Input propagation function
        @param ndarray(ny,num) function( ndarray(nx,num) ) h :Observation function
        @param NoiseModel t_noise :System noise model
        @param NoiseModel o_noise :Observation noise model
        @param ndarray(nx,num)  pars_init :Initial value of particles
        """
        self._f = f
        self._g = g
        self._h = h

        self._t_noise = t_noise
        self._o_noise = o_noise

        _, num = pars_init.shape
        self._num = num
        self._w = _normalize(xp.ones(num))
        self._pars = pars_init

    def update(self, y, u):
        """Filter update

        @param ndarray(ny) y :Observation vector at n
        @param ndarray(nu) u :Input vector at n
        """
        self._update_pars(u)
        self._update_weights(y)
        self._resample()

    def _update_pars(self, u):
        """Update particles according to state transition model

        -Equation of state
            x_n = f(x_n-1) + g(u_n-1) + w
        -State vector x_n
        -Input vector u_n
        -State transition function f(x)
        -Input propagation function g(u)
        -System noise w

        @param  ndarray(nu) u : n-Input vector at 1(u_n-1)
        """
        U = u.reshape(u.size,1) * xp.ones(self._num)
        self._pars = self._f(self._pars) + self._g(U) + self._t_noise.generate(self._num)

    def _update_weights(self, y):
        """Calculate the likelihood according to the observation model

        -Observation equation
            y_n = h(x_n) + v
        -State vector x_n
        -Observation vector y_n
        -Observation function h(x)
        -Observation noise v

        @param  ndarray(ny) y :Observation vector at n(y_n)
        """
        Y = y.reshape(y.size,1) * xp.ones(self._num)
        loglh = self._o_noise.logpdf( xp.absolute(Y - self._h(self._pars)) )
        self._w = _normalize( xp.exp( xp.log(self._w) + loglh ) )

    def _resample(self):
        """Resampling
        """
        wcum = xp.cumsum(self._w)
        num = self._num

        # idxs = [ i for n in xp.sort(xp.random.rand(num))
        #     for i in range(num) if n <= wcum[i] ]

        # start = 0
        # idxs = [ 0 for i in xrange(num) ]
        # for i, n in enumerate( xp.sort(xp.random.rand(num)) ):
        #     for j in range(start, num):
        #         if n <= wcum[j]:
        #             idxs[i] = start = j
        #             break
        idxs = resample.resample(num, wcum)

        self._pars = self._pars[:,idxs]
        self._w = _normalize(self._w[idxs])

    def estimate(self):
        """State estimation

        @return ndarray(nx) :State vector estimate
        """
        return xp.sum(self._pars * self._w, axis=1)

    def particles(self):
        """particle

        @return ndarray(nx,num)
        """
        return self._pars

Resampling with Cython

I devised a little here. If you do the following, it will be a double loop, but you can process almost a single loop. Since the number of loops is proportional to the first power of the number of particles, not the square, there is a big difference in speed as the number of particles is increased.

-Generate a uniform random number $ n_i $ of $ [0, 1] $ and sort in ascending order --Take a random number $ n_i $ --Take the cumulative weight $ w_j $ --If the random number $ n_i $ is larger than the cumulative weight $ w_j $, move to the next weight $ w_ {j + 1} $. --If it is small, the index $ j $ is adopted-> Move to the next random number $ n_ {i + 1} $ --Because the next random number $ n_ {i + 1} $ is $ w_ {j-1} <n_i <n_ {i + 1} $, you can start by comparing it with $ w_j $.

resample.pyx


# -*- coding: utf-8 -*-
#cython: boundscheck=False
#cython: wraparound=False

import numpy as xp
cimport numpy as xp
cimport cython

ctypedef xp.float64_t DOUBLE_t
ctypedef xp.int_t INT_t

def resample(int num, xp.ndarray[DOUBLE_t, ndim=1] wcum):
    cdef int start, i, j, length
    cdef double n

    start = 0
    cdef xp.ndarray[INT_t, ndim=1] idxs = xp.zeros(num).astype(xp.int)
    cdef xp.ndarray[DOUBLE_t, ndim=1] rand = xp.sort(xp.random.rand(num))
    length = rand.size

    for i in xrange(length):
        for j in xrange(start, num):
            if rand[i] <= wcum[j]:
                idxs[i] = start = j
                break

    return idxs

resample_setup.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-
#filename_setup.py
from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext as build_pyx
import numpy

setup(
    name = 'resample',
    ext_modules = [Extension('resample', ['resample.pyx'])],
    cmdclass = { 'build_ext': build_pyx },
    include_dirs = [numpy.get_include()],
)
$ cython -a resample.pyx #compile
$ python resample_setup.py build_ext --inplace #Build

Cython It didn't get much faster, probably because I didn't understand it well. (Thank you for your comment)

Usage program sample

When system noise and observation noise are multidimensional independent Cauchy distribution. Outlier resistance is added. Reference: Filtering of motion trajectory using self-organized state space model

py.test_particle.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""particle.py usage sample
"""

import particle as par
import utils
import numpy as xp
# import cupy as xp


#parameter settings--------------------
num = 3000  #Number of particles
v_sys = 0.01  #Covariance of system noise
v_obs = 0.1  #Covariance of observed noise
v_noise = 0.05

#Initial particles
mins = -5. * xp.ones(4)
maxs = +5. * xp.ones(4)
pars_init = utils.rand_uniform(mins, maxs, num)
# ---------------------------------

dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")

#State model generation(Secondary floor difference model)
A = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
f = lambda X: A.dot(X)  #Definition of state transition function
B = xp.zeros(1)
g = lambda U: B.dot(U)

C = xp.ones(4) * v_sys
sysn_model = par.CauchyNoiseModel(C)

#Generation of observation model(Direct observation model)
D = xp.kron(xp.eye(2), xp.array([1, 0]))
h = lambda X: D.dot(X)  #Definition of observation function

E = xp.ones(2) * v_obs
obsn_model = par.CauchyNoiseModel(E)

#Initial plot
lines = utils.init_plot_particle(dataset, pars_init)

#Generation of particle filter
pf = par.ParticleFilter(
    f, g, h, sysn_model, obsn_model, pars_init)

x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
    pf.update(y, xp.zeros(1))
    state = pf.estimate()
    x_est[i,:] = h(state)

    #Data plot
    pars = pf.particles()
    utils.plot_each_particle(lines, x_est, pars, i)

# utils.save_as_gif_particle(dataset, pars_init, pf, "particle_selforganized.gif")

Plot or utility system

utiils.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-


import matplotlib.animation as animation
import matplotlib.pyplot as plt
from collections import namedtuple
import numpy as xp
# import cupy as xp


Dataset = namedtuple("Dataset", ["t", "x", "y"])


def load_data(dirname):
    t = xp.loadtxt(dirname + "/t.txt")
    x = xp.loadtxt(dirname + "/x.txt")
    y = xp.loadtxt(dirname + "/y.txt")
    dataset = Dataset(t=t, x=x, y=y)
    return dataset


def save_data(dirname, dataset):
    xp.savetxt(dirname + "/t.txt", dataset.t)
    xp.savetxt(dirname + "/x.txt", dataset.x)
    xp.savetxt(dirname + "/y.txt", dataset.y)


def generate_data(dirname):
    # truth
    t = xp.arange(0, 6*xp.pi, 0.05)
    x = xp.c_[t*xp.cos(t), t*xp.sin(t)]

    # noise
    nr, nc = x.shape
    idx = xp.random.randint(nr/2, nr, 5)
    n = xp.random.normal(0., xp.sqrt(v_noise), x.shape)
    n[nr/3:2*nr/3,:] = xp.random.normal(0., xp.sqrt(v_noise*10), (2*nr/3-nr/3,nc))
    n[idx,:] += xp.random.normal(0., xp.sqrt(v_noise)*20, (5,nc))

    # observation
    y = x + n
    x_est = xp.zeros(x.shape)
    Dataset = namedtuple("Dataset", ["t", "x", "y"])
    dataset = Dataset(t=t, x=x, y=y)

    save_data(dirname, dataset)
    return dataset


def init_plot_particle(dataset, pars):
    fig, ax = plt.subplots(1, 1)
    ax.hold(True)
    ax.axis("equal")
    lines = []
    lines.append( ax.plot(pars[0,:], pars[2,:], "o") )
    lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
    lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
    lines.append( ax.plot([0], [0]) )
    plt.legend(["particles", "truth", "observed", "estimated"])
    plt.xlabel("x")
    plt.ylabel("y")
    plt.grid(True)
    return lines


def plot_each_particle(lines, x_est, pars, i):
    lines[0][0].set_data(pars[0,:], pars[2,:])
    lines[3][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
    plt.pause(1e-5)


def save_as_gif_particle(dataset, pars_init, pf, filename):
    x_est = xp.zeros(dataset.x.shape)
    lines = init_plot_particle(dataset, pars_init, x_est)

    def draw(i):
        pf.update(dataset.y[i])
        state = pf.estimate()
        x_est[i,:] = [state[0], state[2]]

        #Data plot
        pars = pf.particles()
        plot_each_particle(lines, x_est, pars, i)

    ny, _ = dataset.y.shape
    ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
    ani.save(filename, writer="imagemagick")


def init_plot_kalman(dataset):
    fig, ax = plt.subplots(1, 1)
    ax.hold(True)
    ax.axis("equal")
    lines = []
    lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
    lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
    lines.append( ax.plot([0], [0]) )
    plt.legend(["truth", "observed", "estimated"])
    plt.xlabel("x")
    plt.ylabel("y")
    plt.grid(True)
    return lines


def plot_each_kalman(lines, x_est, i):
    lines[2][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
    plt.pause(1e-5)


def save_as_gif_kalman(dataset, kf, filename):
    x_est = xp.zeros(dataset.x.shape)
    lines = init_plot_kalman(dataset, x_est)

    def draw(i):
        kf.update(dataset.y[i])
        state = pf.estimate()
        x_est[i,:] = [state[0], state[2]]

        #Data plot
        plot_each_kalman(lines, x_est, i)

    ny, _ = dataset.y.shape
    ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
    ani.save(filename, writer="imagemagick")


def rand_uniform(mins, maxs, num):
    """For the generation of initial particles

    @param  ndarray(nx) mins :Each minimum value of the particle
    @param  ndarray(nx) maxs :Each maximum value of the particle
    @param  int num :Number of particles
    @return ndarray(nx,num)  :Initial particles
    """
    nx = mins.size
    return (mins.reshape(nx, 1) * xp.ones(num)
        + (maxs - mins).reshape(nx, 1) * xp.random.rand(nx, num))

bonus

Kalman filter body

kalman.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Python implementation of Kalman filter
"""

import numpy as xp
# import cupy as xp


class KalmanFilter(object):

    def __init__(self, F, H, Q, R, x_init, P_init):
        self._x = x_init.reshape(-1,1)
        self._P = P_init

        self._update_x    = lambda x     : F.dot(x)
        self._update_P    = lambda P     : Q + F.dot(P).dot(F.T)
        self._error       = lambda x,y   : y - H.dot(x)
        self._cov_P       = lambda P     : R + H.dot(P).dot(H.T)
        self._kalman_gain = lambda P,S   : P.dot(H.T).dot(xp.linalg.inv(S))
        self._estimate_x  = lambda x,K,e : x + K.dot(e)
        self._estimate_P  = lambda P,K   : (xp.eye(*P.shape)-K.dot(H)).dot(P)

    def update(self, y):
        #Forecast
        x_predicted = self._update_x(self._x)
        P_predicted = self._update_P(self._P)

        #Parameter calculation
        e = self._error(x_predicted, y.reshape(-1,1))
        S = self._cov_P(P_predicted)
        K = self._kalman_gain(P_predicted, S)

        #update
        self._x = self._estimate_x(x_predicted, K, e)
        self._P = self._estimate_P(P_predicted, K)

    def estimate(self):
        return self._x

Usage program sample

test_kalman.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""kalman.py usage sample
"""

import kalman
import utils
import numpy as xp
# import cupy as xp


#parameter settings--------------------
var_Q = 0.01
var_R = 1

x_init = xp.zeros(4)
P_init = xp.ones(4)
# ---------------------------------

dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")

#State model generation(Secondary floor difference model)
F = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
Q = var_Q * xp.eye(4)

#Generation of observation model(Direct observation model)
H = xp.kron(xp.eye(2), xp.array([1, 0]))
R = var_R * xp.eye(2)

#Initial plot
lines = utils.init_plot_kalman(dataset)

kf = kalman.KalmanFilter(F, H, Q, R, x_init, P_init)

x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
    kf.update(y)
    state = kf.estimate()
    x_est[i,:] = H.dot(state.reshape(-1,1)).T

    #Data plot
    utils.plot_each_kalman(lines, x_est, i)

# utils.save_as_gif_kalman(dataset, kf, "kalman.gif")

Recommended Posts

Python implementation of particle filters
Python implementation of self-organizing particle filters
Implementation of quicksort in Python
Implementation of particle filters in Python and application to state space models
Implementation of a simple particle filter
Implementation of desktop notifications using Python
Python implementation of non-recursive Segment Tree
Implementation of Light CNN (Python Keras)
Implementation of original sorting in Python
Implementation of Dijkstra's algorithm with python
Introduction of Python
Basics of Python ①
Basics of python ①
Non-recursive implementation of extended Euclidean algorithm (Python)
Python implementation of continuous hidden Markov model
Introduction of Python
Why the Python implementation of ISUCON 5 used Bottle
Implementation of TRIE tree with Python and LOUDS
[Codeing interview] Implementation of enigma cryptographic machine (Python)
Explanation of edit distance and implementation in Python
List of python modules
ValueObject implementation in Python
Unification of Python environment
Copy of python preferences
Basics of Python scraping basics
[python] behavior of argmax
Usage of Python locals ()
the zen of Python
Implementation of Fibonacci sequence
# 4 [python] Basics of functions
Basic knowledge of Python
Sober trivia of python3
Summary of Python arguments
Basics of python: Output
Installation of matplotlib (Python 3.3.2)
Application of Python 3 vars
SVM implementation in python
Various processing of Python
[Python] Implementation of clustering using a mixed Gaussian model
Implementation example of simple LISP processing system (Python version)
Maximum likelihood estimation implementation of topic model in python
A python implementation of the Bayesian linear regression class
Overview of generalized linear models and implementation in Python
Variational Bayesian inference implementation of topic model in python
A reminder about the implementation of recommendations in Python
Python implementation of CSS3 blend mode and talk of color space
[Python] Correct usage of map
Write Pandoc filters in Python
Towards the retirement of Python2
Implementation of TF-IDF using gensim
Implementation of MathJax on Sphinx
Summary of Python3 list operations
Python --Quick start of logging
Recommendation of binpacking library of python
Offline real-time how to write Python implementation example of E14
[python] Value of function object (?)
[Line / Python] Beacon implementation memo
Automatic update of Python module
Python --Check type of values
Static analysis of Python programs
About various encodings of Python 3