Python-Implementierung des Partikelfilters

Einführung

Es ist weit verbreitet, aber ich habe es erstellt, also werde ich es verlassen. Ich werde einen Kommentar hinzufügen, wenn ich Lust dazu habe.

»Ich wandere durch seltsame Orte, bis ich es zum ersten Mal abschließe. ――Ich habe einige Ausreißer absichtlich eingesetzt, aber es scheint, dass sie richtig filtern.

sample.gif

Ist es nicht falsch, dies hier zu tun? Bitte zögern Sie nicht zu kommentieren, wenn Sie Fragen haben. Ich bin mir auch nicht sicher, also begrüße ich Sie, es mir zu sagen.

** Fest am 14. Dezember 2016 **

Quellenübersicht

Partikelfilter

Kalman Filter (Bonus)

Quellcode

Partikelfilterkörper

particle.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Python-Implementierung des Partikelfilters
"""

import pyximport
pyximport.install()
import resample
import numpy as xp
# import cupy as xp


class GaussianNoiseModel(object):
    """Mehrdimensionale Gaußsche Verteilung
    """

    def __init__(self, Cov):
        """Konstrukteur

        @param ndarray(n,n) Cov :Verteilte mitverteilte Matrix
        """
        self._Cov = Cov

    def generate(self, num):
        """Geräuschentwicklung

        @param  int num :Anzahl der Partikel
        @return ndarray(n,num) :Rauschmatrix
        """
        n, _ = self._Cov.shape
        return xp.random.multivariate_normal(xp.zeros(n), self._Cov, num).T

    def logpdf(self, X):
        """Logistische Wahrscheinlichkeitsdichtefunktion

        @param  ndarray(n,num) X
        @param  int num :Anzahl der Partikel
        @return ndarray(num) :Log Wahrscheinlichkeitsdichte
        """
        Cov = self._Cov
        k, _ = Cov.shape
        det_Cov = xp.linalg.det(Cov)
        Cov_inv = xp.linalg.inv(Cov)
        coefs = xp.array([ x.dot(Cov_inv).dot(x.T) for x in X.T ])
        return -k*xp.log(2.*xp.pi)/2. -xp.log(det_Cov)/2. -coefs/2.


class CauchyNoiseModel(object):
    """Mehrdimensionale unabhängige Cauchy-Verteilung

Mehrdimensionale Cauchy-Verteilung unter der Annahme der Unabhängigkeit für jede Variable
    """

    def __init__(self, gma):
        """Konstrukteur

        @param ndarray(n) gma :Skalenpopulation
        """
        self._gma = gma

    def generate(self, num):
        gma = self._gma
        uni = xp.random.rand(gma.size, num)
        Gma = gma.reshape(gma.size,1) * xp.ones(num)
        return xp.arctan(uni / Gma) /xp.pi + 1./2.

    def logpdf(self, X):
        _, num = X.shape
        gma = self._gma
        Gma = gma.reshape(gma.size,1) * xp.ones(num)
        return xp.sum(xp.log(Gma/xp.pi) - xp.log(X**2 + Gma**2), axis=0)


def _normalize(w):
    """Gewichtsnormalisierung

    @param  ndarray(num) w :Gewicht jedes Partikels
    @return ndarray(num) :Normalisiertes Gewicht für jedes Partikel
    """
    return w / xp.sum(w)


class ParticleFilter(object):
    """Particle Filter (Partikelfilter)
    """

    def __init__(self, f, g, h, t_noise, o_noise, pars_init):
        """Konstrukteur

        @param ndarray(nx,num) function( ndarray(nx,num) ) f :Zustandsübergangsfunktion
        @param ndarray(nx,num) function( ndarray(nu,num) ) g :Eingangsausbreitungsfunktion
        @param ndarray(ny,num) function( ndarray(nx,num) ) h :Beobachtungsfunktion
        @param NoiseModel t_noise :Systemrauschmodell
        @param NoiseModel o_noise :Beobachtungsrauschmodell
        @param ndarray(nx,num)  pars_init :Anfangswert der Partikel
        """
        self._f = f
        self._g = g
        self._h = h

        self._t_noise = t_noise
        self._o_noise = o_noise

        _, num = pars_init.shape
        self._num = num
        self._w = _normalize(xp.ones(num))
        self._pars = pars_init

    def update(self, y, u):
        """Filter Update

        @param ndarray(ny) y :Beobachtungsvektor bei n
        @param ndarray(nu) u :Eingabevektor bei n
        """
        self._update_pars(u)
        self._update_weights(y)
        self._resample()

    def _update_pars(self, u):
        """Aktualisieren Sie die Partikel gemäß dem Zustandsübergangsmodell

        -Zustandsgleichung
            x_n = f(x_n-1) + g(u_n-1) + w
        -Zustandsvektor x_n
        -Eingabevektor u_n
        -Zustandsübergangsfunktion f(x)
        -Eingangsausbreitungsfunktion g(u)
        -Systemrauschen w

        @param  ndarray(nu) u : n-Eingabevektor bei 1(u_n-1)
        """
        U = u.reshape(u.size,1) * xp.ones(self._num)
        self._pars = self._f(self._pars) + self._g(U) + self._t_noise.generate(self._num)

    def _update_weights(self, y):
        """Berechnen Sie die Wahrscheinlichkeit gemäß dem Beobachtungsmodell

        -Beobachtungsgleichung
            y_n = h(x_n) + v
        -Zustandsvektor x_n
        -Beobachtungsvektor y_n
        -Beobachtungsfunktion h(x)
        -Beobachtungsgeräusch v

        @param  ndarray(ny) y :Beobachtungsvektor bei n(y_n)
        """
        Y = y.reshape(y.size,1) * xp.ones(self._num)
        loglh = self._o_noise.logpdf( xp.absolute(Y - self._h(self._pars)) )
        self._w = _normalize( xp.exp( xp.log(self._w) + loglh ) )

    def _resample(self):
        """Resampling
        """
        wcum = xp.cumsum(self._w)
        num = self._num

        # idxs = [ i for n in xp.sort(xp.random.rand(num))
        #     for i in range(num) if n <= wcum[i] ]

        # start = 0
        # idxs = [ 0 for i in xrange(num) ]
        # for i, n in enumerate( xp.sort(xp.random.rand(num)) ):
        #     for j in range(start, num):
        #         if n <= wcum[j]:
        #             idxs[i] = start = j
        #             break
        idxs = resample.resample(num, wcum)

        self._pars = self._pars[:,idxs]
        self._w = _normalize(self._w[idxs])

    def estimate(self):
        """Zustandsschätzung

        @return ndarray(nx) :Zustandsvektorschätzung
        """
        return xp.sum(self._pars * self._w, axis=1)

    def particles(self):
        """Partikel

        @return ndarray(nx,num)
        """
        return self._pars

Resampling mit Cython

Ich habe mir hier ein wenig ausgedacht. Wenn Sie Folgendes tun, handelt es sich um eine Doppelschleife, Sie können jedoch fast eine Einzelschleife verarbeiten. Da die Anzahl der Schleifen proportional zur ersten Potenz der Anzahl der Partikel und nicht zum Quadrat ist, gibt es einen großen Geschwindigkeitsunterschied, wenn die Anzahl der Partikel erhöht wird.

resample.pyx


# -*- coding: utf-8 -*-
#cython: boundscheck=False
#cython: wraparound=False

import numpy as xp
cimport numpy as xp
cimport cython

ctypedef xp.float64_t DOUBLE_t
ctypedef xp.int_t INT_t

def resample(int num, xp.ndarray[DOUBLE_t, ndim=1] wcum):
    cdef int start, i, j, length
    cdef double n

    start = 0
    cdef xp.ndarray[INT_t, ndim=1] idxs = xp.zeros(num).astype(xp.int)
    cdef xp.ndarray[DOUBLE_t, ndim=1] rand = xp.sort(xp.random.rand(num))
    length = rand.size

    for i in xrange(length):
        for j in xrange(start, num):
            if rand[i] <= wcum[j]:
                idxs[i] = start = j
                break

    return idxs

resample_setup.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-
#filename_setup.py
from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext as build_pyx
import numpy

setup(
    name = 'resample',
    ext_modules = [Extension('resample', ['resample.pyx'])],
    cmdclass = { 'build_ext': build_pyx },
    include_dirs = [numpy.get_include()],
)
$ cython -a resample.pyx #kompilieren
$ python resample_setup.py build_ext --inplace #Bauen

Cython Es wurde nicht viel schneller, wahrscheinlich weil ich es nicht gut verstand. (Danke für deinen Kommentar)

Anwendungsprogrammbeispiel

Wenn Systemrauschen und Beobachtungsrauschen mehrdimensionale unabhängige Cauchy-Verteilung sind. Ausreißerwiderstand wird hinzugefügt. Referenz: Filtern der Bewegungsbahn mithilfe eines selbstorganisierten Zustandsraummodells

py.test_particle.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""particle.py Verwendungsbeispiel
"""

import particle as par
import utils
import numpy as xp
# import cupy as xp


#Parametereinstellungen--------------------
num = 3000  #Anzahl der Partikel
v_sys = 0.01  #Co-Dispersion von Systemrauschen
v_obs = 0.1  #Co-Dispersion des beobachteten Rauschens
v_noise = 0.05

#Anfängliche Partikel
mins = -5. * xp.ones(4)
maxs = +5. * xp.ones(4)
pars_init = utils.rand_uniform(mins, maxs, num)
# ---------------------------------

dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")

#Generierung von Zustandsmodellen(Differenzialmodell für den zweiten Stock)
A = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
f = lambda X: A.dot(X)  #Definition der Zustandsübergangsfunktion
B = xp.zeros(1)
g = lambda U: B.dot(U)

C = xp.ones(4) * v_sys
sysn_model = par.CauchyNoiseModel(C)

#Generierung eines Beobachtungsmodells(Direktes Beobachtungsmodell)
D = xp.kron(xp.eye(2), xp.array([1, 0]))
h = lambda X: D.dot(X)  #Definition der Beobachtungsfunktion

E = xp.ones(2) * v_obs
obsn_model = par.CauchyNoiseModel(E)

#Erste Handlung
lines = utils.init_plot_particle(dataset, pars_init)

#Erzeugung eines Partikelfilters
pf = par.ParticleFilter(
    f, g, h, sysn_model, obsn_model, pars_init)

x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
    pf.update(y, xp.zeros(1))
    state = pf.estimate()
    x_est[i,:] = h(state)

    #Datenplot
    pars = pf.particles()
    utils.plot_each_particle(lines, x_est, pars, i)

# utils.save_as_gif_particle(dataset, pars_init, pf, "particle_selforganized.gif")

Grundstück oder Versorgungssystem

utiils.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-


import matplotlib.animation as animation
import matplotlib.pyplot as plt
from collections import namedtuple
import numpy as xp
# import cupy as xp


Dataset = namedtuple("Dataset", ["t", "x", "y"])


def load_data(dirname):
    t = xp.loadtxt(dirname + "/t.txt")
    x = xp.loadtxt(dirname + "/x.txt")
    y = xp.loadtxt(dirname + "/y.txt")
    dataset = Dataset(t=t, x=x, y=y)
    return dataset


def save_data(dirname, dataset):
    xp.savetxt(dirname + "/t.txt", dataset.t)
    xp.savetxt(dirname + "/x.txt", dataset.x)
    xp.savetxt(dirname + "/y.txt", dataset.y)


def generate_data(dirname):
    # truth
    t = xp.arange(0, 6*xp.pi, 0.05)
    x = xp.c_[t*xp.cos(t), t*xp.sin(t)]

    # noise
    nr, nc = x.shape
    idx = xp.random.randint(nr/2, nr, 5)
    n = xp.random.normal(0., xp.sqrt(v_noise), x.shape)
    n[nr/3:2*nr/3,:] = xp.random.normal(0., xp.sqrt(v_noise*10), (2*nr/3-nr/3,nc))
    n[idx,:] += xp.random.normal(0., xp.sqrt(v_noise)*20, (5,nc))

    # observation
    y = x + n
    x_est = xp.zeros(x.shape)
    Dataset = namedtuple("Dataset", ["t", "x", "y"])
    dataset = Dataset(t=t, x=x, y=y)

    save_data(dirname, dataset)
    return dataset


def init_plot_particle(dataset, pars):
    fig, ax = plt.subplots(1, 1)
    ax.hold(True)
    ax.axis("equal")
    lines = []
    lines.append( ax.plot(pars[0,:], pars[2,:], "o") )
    lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
    lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
    lines.append( ax.plot([0], [0]) )
    plt.legend(["particles", "truth", "observed", "estimated"])
    plt.xlabel("x")
    plt.ylabel("y")
    plt.grid(True)
    return lines


def plot_each_particle(lines, x_est, pars, i):
    lines[0][0].set_data(pars[0,:], pars[2,:])
    lines[3][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
    plt.pause(1e-5)


def save_as_gif_particle(dataset, pars_init, pf, filename):
    x_est = xp.zeros(dataset.x.shape)
    lines = init_plot_particle(dataset, pars_init, x_est)

    def draw(i):
        pf.update(dataset.y[i])
        state = pf.estimate()
        x_est[i,:] = [state[0], state[2]]

        #Datenplot
        pars = pf.particles()
        plot_each_particle(lines, x_est, pars, i)

    ny, _ = dataset.y.shape
    ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
    ani.save(filename, writer="imagemagick")


def init_plot_kalman(dataset):
    fig, ax = plt.subplots(1, 1)
    ax.hold(True)
    ax.axis("equal")
    lines = []
    lines.append( ax.plot(dataset.x.T[0], dataset.x.T[1], ":"))
    lines.append( ax.plot(dataset.y.T[0], dataset.y.T[1], ".-"))
    lines.append( ax.plot([0], [0]) )
    plt.legend(["truth", "observed", "estimated"])
    plt.xlabel("x")
    plt.ylabel("y")
    plt.grid(True)
    return lines


def plot_each_kalman(lines, x_est, i):
    lines[2][0].set_data(x_est.T[0][:i], x_est.T[1][:i])
    plt.pause(1e-5)


def save_as_gif_kalman(dataset, kf, filename):
    x_est = xp.zeros(dataset.x.shape)
    lines = init_plot_kalman(dataset, x_est)

    def draw(i):
        kf.update(dataset.y[i])
        state = pf.estimate()
        x_est[i,:] = [state[0], state[2]]

        #Datenplot
        plot_each_kalman(lines, x_est, i)

    ny, _ = dataset.y.shape
    ani = animation.FuncAnimation(fig, draw, ny, blit=False, interval=100, repeat=False)
    ani.save(filename, writer="imagemagick")


def rand_uniform(mins, maxs, num):
    """Zur Erzeugung von Ausgangspartikeln

    @param  ndarray(nx) mins :Jeder Mindestwert des Partikels
    @param  ndarray(nx) maxs :Jeder Maximalwert des Partikels
    @param  int num :Anzahl der Partikel
    @return ndarray(nx,num)  :Anfängliche Partikel
    """
    nx = mins.size
    return (mins.reshape(nx, 1) * xp.ones(num)
        + (maxs - mins).reshape(nx, 1) * xp.random.rand(nx, num))

Bonus

Kalman Filterkörper

kalman.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Python-Implementierung des Kalman-Filters
"""

import numpy as xp
# import cupy as xp


class KalmanFilter(object):

    def __init__(self, F, H, Q, R, x_init, P_init):
        self._x = x_init.reshape(-1,1)
        self._P = P_init

        self._update_x    = lambda x     : F.dot(x)
        self._update_P    = lambda P     : Q + F.dot(P).dot(F.T)
        self._error       = lambda x,y   : y - H.dot(x)
        self._cov_P       = lambda P     : R + H.dot(P).dot(H.T)
        self._kalman_gain = lambda P,S   : P.dot(H.T).dot(xp.linalg.inv(S))
        self._estimate_x  = lambda x,K,e : x + K.dot(e)
        self._estimate_P  = lambda P,K   : (xp.eye(*P.shape)-K.dot(H)).dot(P)

    def update(self, y):
        #Prognose
        x_predicted = self._update_x(self._x)
        P_predicted = self._update_P(self._P)

        #Parameterberechnung
        e = self._error(x_predicted, y.reshape(-1,1))
        S = self._cov_P(P_predicted)
        K = self._kalman_gain(P_predicted, S)

        #aktualisieren
        self._x = self._estimate_x(x_predicted, K, e)
        self._P = self._estimate_P(P_predicted, K)

    def estimate(self):
        return self._x

Anwendungsprogrammbeispiel

test_kalman.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""kalman.py Verwendungsbeispiel
"""

import kalman
import utils
import numpy as xp
# import cupy as xp


#Parametereinstellungen--------------------
var_Q = 0.01
var_R = 1

x_init = xp.zeros(4)
P_init = xp.ones(4)
# ---------------------------------

dataset = utils.load_data("testdata")
# dataset = utils.generate_data("testdata")

#Generierung von Zustandsmodellen(Differenzialmodell für den zweiten Stock)
F = xp.kron(xp.eye(2), xp.array([[2, -1], [1, 0]]))
Q = var_Q * xp.eye(4)

#Generierung eines Beobachtungsmodells(Direktes Beobachtungsmodell)
H = xp.kron(xp.eye(2), xp.array([1, 0]))
R = var_R * xp.eye(2)

#Erste Handlung
lines = utils.init_plot_kalman(dataset)

kf = kalman.KalmanFilter(F, H, Q, R, x_init, P_init)

x_est = xp.zeros(dataset.x.shape)
for i, y in enumerate(dataset.y):
    kf.update(y)
    state = kf.estimate()
    x_est[i,:] = H.dot(state.reshape(-1,1)).T

    #Datenplot
    utils.plot_each_kalman(lines, x_est, i)

# utils.save_as_gif_kalman(dataset, kf, "kalman.gif")

Recommended Posts

Python-Implementierung des Partikelfilters
Python-Implementierung eines selbstorganisierenden Partikelfilters
Implementierung der schnellen Sortierung in Python
Implementierung des Partikelfilters durch Python und Anwendung auf das Zustandsraummodell
Implementierung eines einfachen Partikelfilters
Implementierung von Desktop-Benachrichtigungen mit Python
Python-Implementierung eines nicht rekursiven Segmentbaums
Implementierung von Light CNN (Python Keras)
Implementierung der ursprünglichen Sortierung in Python
Implementierung der Dyxtra-Methode durch Python
Python-Grundlagen ①
Grundlagen von Python ①
Python-Implementierung eines kontinuierlichen Hidden-Markov-Modells
Einführung von Python
Warum die Python-Implementierung von ISUCON 5 Bottle verwendet
TRIE-Baumimplementierung mit Python und LOUDS
[Coding Interview] Implementierung der Enigma-Kryptografiemaschine (Python)
Erläuterung der Bearbeitungsentfernung und Implementierung in Python
Liste der Python-Module
ValueObject-Implementierung in Python
Vereinheitlichung der Python-Umgebung
Kopie der Python-Einstellungen
Grundlagen der Python-Scraping-Grundlagen
[Python] Verhalten von Argmax
Verwendung von Python-Einheimischen ()
der Zen von Python
Implementierung der Fibonacci-Sequenz
# 4 [Python] Grundlagen der Funktionen
Grundkenntnisse in Python
Nüchterne Trivia von Python3
Zusammenfassung der Python-Argumente
Grundlagen von Python: Ausgabe
Installation von matplotlib (Python 3.3.2)
Anwendung von Python 3 vars
SVM-Implementierung in Python
Verschiedene Verarbeitung von Python
[Python] Implementierung von Clustering mit einem gemischten Gaußschen Modell
Implementierungsbeispiel eines einfachen LISP-Verarbeitungssystems (Python-Version)
Höchstwahrscheinlich Schätzungsimplementierung des Themenmodells in Python
Python-Implementierung der Bayes'schen linearen Regressionsklasse
Implementierung der Bayes'schen Varianzschätzung des Themenmodells in Python
Ein Memorandum über die Umsetzung von Empfehlungen in Python
Python-Implementierung des CSS3-Mischmodus und Diskussion über den Farbraum
[Python] Richtige Verwendung der Karte
Schreiben Sie Pandec-Filter in Python
Auf dem Weg zum Ruhestand von Python2
Implementierung von TF-IDF mit Gensim
Implementierung von MathJax auf Sphinx
Zusammenfassung der Python3-Listenoperationen
Python - Schneller Start der Protokollierung
Empfehlung der binpacking Bibliothek von Python
Offline-Echtzeit zum Schreiben eines E14 Python-Implementierungsbeispiels
[Python] Wert des Funktionsobjekts (?)
[Line / Python] Beacon-Implementierungsnotiz
Automatisches Update des Python-Moduls
Python --Überprüfen Sie den Wertetyp
Statische Analyse von Python-Programmen
Über verschiedene Codierungen von Python 3