[PYTHON] Änderungspunkterkennung mit Kalman-Filter

Motivation

Ich hatte die Möglichkeit, Änderungspunkte bei der Arbeit zu erkennen. Zu dieser Zeit hatte ich nicht viel Zeit, also habe ich es unter Bezugnahme auf Algorithmus mit ARIMA-Modell, den Yokkuns gemacht hat gemacht. Ich tat. Das ARIMA-Modell hatte jedoch verschiedene Probleme, daher habe ich versucht, es mit dem Kalman-Filter neu zu schreiben.

Probleme mit dem ARIMA-Modell

Verweise

Die Hauptreferenz war "Anomalieerkennung durch Data Mining", die jeder liebt.

Überblick

Die Berechnungsschritte sind wie folgt: Die Berechnung kann grob in einen Lernschritt und einen Bewertungsberechnungsschritt unterteilt werden.

Lernschritte

Hier sind die Schritte zum Berechnen, bevor neue Daten eingehen.

Schritt zur Berechnung der Punktzahl

Dies ist der Schritt, um zu berechnen, wann neue Daten eingehen.

Der Umriss der Berechnung der Änderungspunktbewertung ist wie folgt.

Es ist einfach, aber das war's. Dann werde ich den folgenden Code schreiben.

Referenzcode

Code für Kalman-Filter

KF.py


# coding: utf8

from numpy.oldnumeric.linear_algebra import inverse
from scipy import linalg
import numpy as np
from math import log

class KalmanFiltering:
    limy = 1e20 #Grenze der als fehlend anzusehenden Zahlenwerte
    GSIG2 = 1
    L = 1
    R = np.identity(L)
    NSUM = 0.0
    SIG2 = 0.0
    LDET = 0.0
    
    def __init__(self, k, p, q, term=10, w=10):
        self.k = k #Bodenunterschied
        self.p = p #Saisonale Auflage
        self.q = q #AR-Komponente
        self.m, self.F, self.G, \
            self.H, self.Q = self.FGHset(0,k,p,q,w)
        self.term = term
        self.strg_trm = term
        
        self.resid = np.zeros(self.term)
        self.pred = 0.0
        
        # matrix for storage predicted value
        self.XPS = np.zeros((term,self.m), dtype=np.float)
        self.VPS = np.array([np.eye(self.m, dtype=np.float)]*term)
        # matrix for storage predicted value
        self.XFS = np.zeros((term,self.m), dtype=np.float)
        self.VFS = np.array([np.eye(self.m, dtype=np.float)]*term)
        # matrix for storage smoothed value
        self.XSS = np.zeros((term,self.m), dtype=np.float)
        self.VSS = np.array([np.eye(self.m, dtype=np.float)]*term)

    def forward_backward(self, new_data, smoothing=0):
        self.NSUM += 1 
        if self.NSUM < self.strg_trm:
            self.term = int(self.NSUM)
        else:
            self.term = self.strg_trm
        # forward
        self.forward(new_data)
        # smoothing
        self.SMO()
        if smoothing==1:
            return np.mean( self.XSS[:self.term,0] )
                    
        return self.predict()[0]
    
    def forward(self, y):
        XF = self.XFS[self.term-1]
        VF = self.VFS[self.term-1]     

        # 1span predicting
        XP, VP = self.forward_predicting(VF, XF)
        XF, VF = self.filtering(y, XP, VP)
        self.storage_params(XP, XF, VP, VF)
#         sig2 = self.SIG2 / self.NSUM
#         FF = -0.5 * (self.NSUM * (log(2 * np.pi * sig2) + 1) + self.LDET)
#         return {'LLF':FF, 'Ovar':sig2}
    
    def storage_params(self, XP, XF, VP, VF):
        if self.NSUM>self.term:
            self.XPS[:self.term-1] = self.XPS[1:self.term] 
            self.XFS[:self.term-1] = self.XFS[1:self.term]
            self.VPS[:self.term-1] = self.VPS[1:self.term]
            self.VFS[:self.term-1] = self.VFS[1:self.term]
            self.normal_storage(XP, XF, VP, VF)
        else:
            self.normal_storage(XP, XF, VP, VF)
                
    def normal_storage(self, XP, XF, VP, VF):
        self.XPS[self.term-1] = XP 
        self.XFS[self.term-1] = XF
        self.VPS[self.term-1] = VP
        self.VFS[self.term-1] = VF
    
    def forward_predicting(self, VF, XF):
        """1span predicting"""
        XP = np.ndarray.flatten( np.dot(self.F, XF.T) ) #Da es ab der zweiten Woche ein vertikaler Vektor wird, wird es immer in einen horizontalen Vektor konvertiert
        VP = self.F.dot(VF).dot(self.F.T) +  self.G.dot(self.Q).dot(self.G.T)
        return XP, VP
    
    def filtering(self, y, XP, VP):
        if y < self.limy: 
            B = np.dot( np.dot(self.H, VP), self.H.T)  + self.R  #H ist mathematisch ein horizontaler Vektor
            B1 = inverse(B)
            K = np.matrix(np.dot(VP, self.H.T)) * np.matrix(B1) #K wird ein vertikaler Vektor(matrix)
            e = np.array(y).T - np.dot(self.H, XP.T)            
            XF = np.array(XP) + np.array( K * np.matrix(e) ).T #Horizontaler Vektor
            VF = np.array(VP) - np.array( K* np.matrix(self.H) * VP)           
            self.SIG2 += np.ndarray.flatten(np.array( np.matrix(e) * np.matrix(B1) * np.matrix(e).T ))[0] #Machen Sie es Matrix, so dass es auch in einer Dimension berechnet werden kann
            self.LDET += log(linalg.det(B))
        else:
            XF = XP; VF = VP
        return XF, VF
        
    def SMO(self):
        """fixed-interval smoothing"""
        XS1 = self.XFS[self.term-1]
        VS1 = self.VFS[self.term-1]
        self.XSS[self.term-1] = XS1
        self.VSS[self.term-1] = VS1
        for n1 in xrange(self.term):        
            n = (self.term-1) - n1; XP = self.XPS[n]; XF = self.XFS[n-1]
            VP = self.VPS[n]; VF = self.VFS[n-1]; VPI = inverse(VP)
            A = np.dot( np.dot(VF, self.F.T), VPI)
            XS2 = XF + np.dot(A, (XS1 - XP))
            VS2 = VF + np.dot( np.dot(A, (VS1 - VP)), A.T )
            XS1 = XS2; VS1 = VS2
            self.XSS[n-1] = XS1
            self.VSS[n-1] = VS1
                
    #Definition der logarithmischen Wahrscheinlichkeitsfunktion von TAU2x
    def LogL(self, parm, *args):
        y=args[0]
        LLF = self.forward(y)
        LL = LLF['LLF']
        return -LL #Da optimeze eine Minimierungsfunktion ist, wird die logarithmische Wahrscheinlichkeit multipliziert mit minus zurückgegeben.
    
    def predict(self, forward_time=1):
        """pridint average value"""
        y = np.zeros(forward_time, dtype=np.float)
        XFp=self.XFS[-1] #Holen Sie sich nur die neueste Datenmatrix
        #VFp=VF[XF.shape[0]-1,:]
        
        for n in xrange(forward_time):
            XP = np.ndarray.flatten( np.dot(self.F, XFp.T) )
            #VP = np.dot( np.dot(F, VF), F.T ) +  np.dot( np.dot(G, Q), G.T )
            y[n] = np.dot(self.H, XP) #Da es den erwarteten Wert annimmt, ist kein Rauschen enthalten
            XFp=XP
        return y

    def FGHset(self, al, k, p, q, w=10):
        """Matrixeinstellung der Zustandsraumdarstellung des saisonalen Anpassungsmodells
al: α-Vektor des AR-Modells
        k,p,q: Differenz, saisonaler Zyklus, Anzahl der AR-Parameter (k bei Vorhersage)>=2)
        w:Streuung des Systemfehlers (Zur Erkennung von Änderungspunkten sollte dieser klein und fest eingestellt werden.)
        """
        m = k + p + q -1
    
        if q>0: G = np.zeros((m,3), dtype=np.float) #Wenn das Zustandsmodell drei enthält, Trend, Saison und AR
        elif p>0: G = np.zeros((m,2), dtype=np.float) #Wenn keine AR-Komponente enthalten ist(q=0)
        else: m=k; G = np.zeros((m,1), dtype=np.float)
        F = np.zeros((m,m), dtype=np.float)
        H = np.zeros((1,m), dtype=np.float)
      
        ns = 0; ls =0
        #Konstruktion der Blockmatrix des Trendmodells
        if k>0:
            ns +=1
            G[0,0] = 1; H[0,0] = 1
            if k==1: F[0,0] = 1
            if k==2: F[0,0] = 2; F[0,1] = -1; F[1,0] = 1
            if k==3: F[0,0] = 3; F[0,1] = -3; F[0,2] = 1; F[1,0] = 1; F[2,1] = 1
            ls += k
      
        #Konstruktion einer saisonalen Komponentenblockmatrix
        if p>0:
            ns +=1
            G[ls, ns-1] = 1
            H[0,ls] = 1
            for i in xrange(p-1): F[ls, ls+i] = -1
            for i in xrange(p-2): F[ls+i+1, ls+i] = 1
            ls +=p-1
      
        #Konstruktion der AR-Komponentenblockmatrix
        if q>0:
            ns +=1
            G[ls, ns-1] = 1
            H[0,ls] = 1
            for i in xrange(q): F[ls, ls+i-1] = al[i]
            if q>1:
                for i in xrange(q-1): F[ls+i, ls+i-1] = 1
      
        #Berechnung des Rahmens der Dispersions-Co-Verteilungsmatrix Q des Systemmodells
        Q = np.eye(ns,dtype=np.float)*w
      
        return m, F, G, H, Q

Code zur Änderungspunkterkennung

KF_AnomalyDetection.py


# coding: utf-8
from math import log, ceil
import numpy as np
from scipy.stats import norm, t
import matplotlib.pyplot as plt
import KF

class KFAnomalyDetection:
    datalist = []
    outlier_score_list = []
    change_score_list = []
    outlier_score_smooth = []
    change_score_smooth = []
    outlier_resid = None
    change_resid = None
    outlier_pred = None
    change_pred = None
    
    def __init__(self, term, smooth, k=2, p=0, q=0, w=10):
        self.kf_outlier_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
        self.kf_first_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
        self.kf_change_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
        self.kf_second_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
        self.term = term
    
    def forward_step(self, new_data):
        # add new_data to datalist
        if len(self.datalist)>=self.term:
            self.datalist.pop(0)
            self.datalist.append(new_data)
        else:
            self.datalist.append(new_data)

        # compute score
        if self.outlier_pred is None:
            self.first_step(new_data)
        else:
            self.calculate_score_step(new_data)
            self.learn_step(new_data)
    
    def conversion_score(self, train, var):
        """convert score to log loss"""
        m = np.mean(train)
        s = np.std(train)
        try:
            if s < 1: s=1
            px = norm.pdf(var, m, s) if norm.pdf(var, m, s)!=0.0 else 1e-308
            res = -log(px)
            return res
        except:
            return 0

    def first_step(self, new_data):
        # learn outlier model
        self.outlier_resid, self.outlier_pred = \
                self.learn_KF(self.kf_outlier_score, new_data)
        # calculate outlier score
        self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
                    self.outlier_pred, new_data, self.outlier_score_list,
                    self.outlier_score_smooth)
        # learn cnage model
        self.change_resid, self.change_pred = \
                self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
        # calculate change score
        self.calculate_score(self.kf_second_smooth_score, self.change_resid,
                    self.change_pred, self.outlier_score_smooth[-1],
                    self.change_score_list, self.change_score_smooth)

    def learn_step(self, data):
        self.outlier_resid, self.outlier_pred = \
                self.learn_KF(self.kf_outlier_score, data)
        self.change_resid, self.change_pred = \
                self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
    
    def learn_KF(self, func, data):
        """leaning KF from new data"""
        pred = func.forward_backward(data)
        resid = np.abs( func.XSS[:func.term,0] - np.array(self.datalist) ) # residuals
        return resid, pred
         
    def calculate_score_step(self, new_data):
        # calculate outlier score
        self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
                    self.outlier_pred, new_data, self.outlier_score_list,
                    self.outlier_score_smooth)
        # calculate change score
        self.calculate_score(self.kf_second_smooth_score, self.change_resid,
                    self.change_pred, self.outlier_score_smooth[-1],
                    self.change_score_list, self.change_score_smooth)

    def calculate_score(self, func, resid, pred, new_data, storage_score_list, storage_smooth_list):
        score = self.conversion_score( resid, abs(float(pred) - float(new_data)) )
        print 'got score', score
        storage_score_list.append(score)
        print 'smoothing score'
        storage_smooth_list.append( func.forward_backward(score, smoothing=1) )
         
if __name__=='__main__':
    fname = 'test'
    term = 3 # time window of training
    smooth = 1
    kfad = KFAnomalyDetection(term,smooth,2,0,0,20)
    datalist = []
    of = open('score_out.txt','w')    
    dlist = np.hstack( (np.random.normal(0,1,100),np.random.normal(10,0.2,20),np.random.normal(0,1,100)) )
    for data in dlist:
        kfad.forward_step(data)
        of.write( str(kfad.change_score_smooth[-1])+'\n' )
    of.close()
    
    rng = range( len(dlist.tolist()) )
    plt.plot(rng,dlist,label=u"data")
    plt.show()
    plt.plot(rng,kfad.change_score_smooth,label=u"score")
    plt.show()

Entschuldigung für den schmutzigen Code wie gewohnt. Ich habe den Code für den Kalman-Filter vor über einem Jahr geschrieben, aber wenn ich ihn mir jetzt ansehe, denke ich: "Wer hat diesen Uncode geschrieben?" .. .. Nun, ich werde positiv denken, dass es vor mehr als einem Jahr wächst w

Wenn bei der Berechnung der Punktzahl der Standardabweichungswert klein ist, werden viele Punkte als abnormale Werte beurteilt, sodass der minimale Standardabweichungswert auf 1 gesetzt wird. (Ich frage mich, ob Zähldaten akzeptabel sind. Ich verwende sie so, als ob sie es wären.) Wenn ein zu abnormaler Wert eingegeben wird, wird die Wahrscheinlichkeit als Null betrachtet (der Wert Null wird zurückgegeben), und Protokolloperationen können nicht ausgeführt werden. Daher wird sie durch die Mindestreihenfolge des Gleitkommas ersetzt. Ich denke, es besteht die Möglichkeit, dass dieser Bereich durch Verwendung einer Verteilung mit einem dicken Saum gelöst werden kann.

Versuchsergebnis

Fügen Sie das Ergebnis der Ausführung mit dem obigen Code ein.

Originale Daten

data.png

Punktedaten ändern

score.png

Kommentar

Sie können die Punktzahl ziemlich gut erkennen. Es ist natürlich, solche eindeutigen Daten erkennen zu können. .. ..

Bei den Parametern muss fast nur der Begriff (Zeitfenster) geändert werden. Es gibt kein Problem mit 1 für das glatte Fenster. Mit anderen Worten, der glatte Teil des obigen Codes ist nur eine verschwendete Berechnung ... orz Es gibt auch kein Problem mit der Dispersion, und es ist nicht notwendig, auf größere Änderungen zu reagieren. Daher denke ich, dass 10 bis 20 je nach Maßstab in Ordnung sind.

Zusammenfassung

Ich dachte, dass der Kalman-Filter mehr Vorteile als das ARIMA-Modell haben würde, aber das Ergebnis war nicht so gut wie ich erwartet hatte. .. Ich werde weiterhin nach Orten suchen, die verbessert werden können.

Wir entschuldigen uns für die Unannehmlichkeiten, aber wenn Sie einen Fehler machen, würden wir uns freuen, wenn Sie darauf hinweisen könnten.

Recommended Posts

Änderungspunkterkennung mit Kalman-Filter
Einführung in die Anomalieerkennung 3 Änderungspunkterkennung
Parameterschätzung mit Kalman-Filter
Parameterschätzung mit Kalman-Filter
Kalman Filter, den Sie verstehen können
Änderungspunkterkennung mit Kalman-Filter
Punktwolke mit Pfeffer
Versuchen Sie die Kantenerkennung mit OpenCV
[Python] Filtern Sie Tabellenkalkulationen mit gspread
Gesichtserkennung mit Python + dlib
[Python] Ändere den Typ mit Pandas
Ändern Sie die Wiederholungseinstellungen mit boto3
Echtzeit-Kantenerkennung mit OpenCV
Gesichtserkennung mit Python + OpenCV
Erkennung von Blockchain-Manipulationen mit Python
Gesichtserkennung mit Haar Cascades
Anime-Gesichtserkennung mit OpenCV