[PYTHON] Détection de point de changement avec filtre de Kalman

Motivation

J'ai eu l'opportunité de détecter des points de changement au travail. À ce moment-là, je n'avais pas beaucoup de temps, donc je l'ai fait en me référant à Algorithme utilisant le modèle ARIMA que faisait yokkuns. J'ai fait. Cependant, le modèle ARIMA avait divers problèmes, j'ai donc essayé de le réécrire avec le filtre kalman.

Problèmes avec le modèle ARIMA

Les références

La principale référence était "la détection d'anomalies par data mining" que tout le monde aime.

Aperçu

Les étapes de calcul sont les suivantes: Le calcul peut être globalement divisé en une étape d'apprentissage et une étape de calcul de score.

Étapes d'apprentissage

Voici les étapes à suivre pour calculer avant l'arrivée de nouvelles données.

Étape de calcul du score

C'est l'étape à suivre pour calculer l'arrivée de nouvelles données.

Les grandes lignes du calcul du score des points de changement sont les suivantes.

C'est facile, mais c'est tout. Ensuite, j'écrirai le code ci-dessous.

Code de référence

Code du filtre Kalman

KF.py


# coding: utf8

from numpy.oldnumeric.linear_algebra import inverse
from scipy import linalg
import numpy as np
from math import log

class KalmanFiltering:
    limy = 1e20 #Limite des valeurs numériques à considérer comme manquantes
    GSIG2 = 1
    L = 1
    R = np.identity(L)
    NSUM = 0.0
    SIG2 = 0.0
    LDET = 0.0
    
    def __init__(self, k, p, q, term=10, w=10):
        self.k = k #Différence de sol
        self.p = p #Circulation saisonnière
        self.q = q #Composant AR
        self.m, self.F, self.G, \
            self.H, self.Q = self.FGHset(0,k,p,q,w)
        self.term = term
        self.strg_trm = term
        
        self.resid = np.zeros(self.term)
        self.pred = 0.0
        
        # matrix for storage predicted value
        self.XPS = np.zeros((term,self.m), dtype=np.float)
        self.VPS = np.array([np.eye(self.m, dtype=np.float)]*term)
        # matrix for storage predicted value
        self.XFS = np.zeros((term,self.m), dtype=np.float)
        self.VFS = np.array([np.eye(self.m, dtype=np.float)]*term)
        # matrix for storage smoothed value
        self.XSS = np.zeros((term,self.m), dtype=np.float)
        self.VSS = np.array([np.eye(self.m, dtype=np.float)]*term)

    def forward_backward(self, new_data, smoothing=0):
        self.NSUM += 1 
        if self.NSUM < self.strg_trm:
            self.term = int(self.NSUM)
        else:
            self.term = self.strg_trm
        # forward
        self.forward(new_data)
        # smoothing
        self.SMO()
        if smoothing==1:
            return np.mean( self.XSS[:self.term,0] )
                    
        return self.predict()[0]
    
    def forward(self, y):
        XF = self.XFS[self.term-1]
        VF = self.VFS[self.term-1]     

        # 1span predicting
        XP, VP = self.forward_predicting(VF, XF)
        XF, VF = self.filtering(y, XP, VP)
        self.storage_params(XP, XF, VP, VF)
#         sig2 = self.SIG2 / self.NSUM
#         FF = -0.5 * (self.NSUM * (log(2 * np.pi * sig2) + 1) + self.LDET)
#         return {'LLF':FF, 'Ovar':sig2}
    
    def storage_params(self, XP, XF, VP, VF):
        if self.NSUM>self.term:
            self.XPS[:self.term-1] = self.XPS[1:self.term] 
            self.XFS[:self.term-1] = self.XFS[1:self.term]
            self.VPS[:self.term-1] = self.VPS[1:self.term]
            self.VFS[:self.term-1] = self.VFS[1:self.term]
            self.normal_storage(XP, XF, VP, VF)
        else:
            self.normal_storage(XP, XF, VP, VF)
                
    def normal_storage(self, XP, XF, VP, VF):
        self.XPS[self.term-1] = XP 
        self.XFS[self.term-1] = XF
        self.VPS[self.term-1] = VP
        self.VFS[self.term-1] = VF
    
    def forward_predicting(self, VF, XF):
        """1span predicting"""
        XP = np.ndarray.flatten( np.dot(self.F, XF.T) ) #Puisqu'il devient un vecteur vertical à partir de la deuxième semaine, il est toujours converti en vecteur horizontal
        VP = self.F.dot(VF).dot(self.F.T) +  self.G.dot(self.Q).dot(self.G.T)
        return XP, VP
    
    def filtering(self, y, XP, VP):
        if y < self.limy: 
            B = np.dot( np.dot(self.H, VP), self.H.T)  + self.R  #H est mathématiquement un vecteur horizontal
            B1 = inverse(B)
            K = np.matrix(np.dot(VP, self.H.T)) * np.matrix(B1) #K devient un vecteur vertical(matrix)
            e = np.array(y).T - np.dot(self.H, XP.T)            
            XF = np.array(XP) + np.array( K * np.matrix(e) ).T #Vecteur horizontal
            VF = np.array(VP) - np.array( K* np.matrix(self.H) * VP)           
            self.SIG2 += np.ndarray.flatten(np.array( np.matrix(e) * np.matrix(B1) * np.matrix(e).T ))[0] #Faites-en une matrice pour qu'elle puisse être calculée même dans une dimension
            self.LDET += log(linalg.det(B))
        else:
            XF = XP; VF = VP
        return XF, VF
        
    def SMO(self):
        """fixed-interval smoothing"""
        XS1 = self.XFS[self.term-1]
        VS1 = self.VFS[self.term-1]
        self.XSS[self.term-1] = XS1
        self.VSS[self.term-1] = VS1
        for n1 in xrange(self.term):        
            n = (self.term-1) - n1; XP = self.XPS[n]; XF = self.XFS[n-1]
            VP = self.VPS[n]; VF = self.VFS[n-1]; VPI = inverse(VP)
            A = np.dot( np.dot(VF, self.F.T), VPI)
            XS2 = XF + np.dot(A, (XS1 - XP))
            VS2 = VF + np.dot( np.dot(A, (VS1 - VP)), A.T )
            XS1 = XS2; VS1 = VS2
            self.XSS[n-1] = XS1
            self.VSS[n-1] = VS1
                
    #Définition de la fonction de vraisemblance logarithmique de TAU2x
    def LogL(self, parm, *args):
        y=args[0]
        LLF = self.forward(y)
        LL = LLF['LLF']
        return -LL #Puisque l'optimeze est une fonction de minimisation, elle renvoie la vraisemblance logarithmique multipliée par moins.
    
    def predict(self, forward_time=1):
        """pridint average value"""
        y = np.zeros(forward_time, dtype=np.float)
        XFp=self.XFS[-1] #Obtenez uniquement la matrice de données la plus récente
        #VFp=VF[XF.shape[0]-1,:]
        
        for n in xrange(forward_time):
            XP = np.ndarray.flatten( np.dot(self.F, XFp.T) )
            #VP = np.dot( np.dot(F, VF), F.T ) +  np.dot( np.dot(G, Q), G.T )
            y[n] = np.dot(self.H, XP) #Puisqu'il prend la valeur attendue, aucun bruit n'est inclus
            XFp=XP
        return y

    def FGHset(self, al, k, p, q, w=10):
        """Définition de la matrice de la représentation de l'espace d'états du modèle de désaisonnalisation
al: vecteur α du modèle AR
        k,p,q: Différence, cycle saisonnier, nombre de paramètres AR (k lors de la prédiction)>=2)
        w:Dispersion de l'erreur système (pour la détection du point de changement, il doit être réglé petit et fixe)
        """
        m = k + p + q -1
    
        if q>0: G = np.zeros((m,3), dtype=np.float) #Lorsque le modèle d'état comprend trois, tendance, saison et RA
        elif p>0: G = np.zeros((m,2), dtype=np.float) #Lorsqu'il ne contient pas de composant AR(q=0)
        else: m=k; G = np.zeros((m,1), dtype=np.float)
        F = np.zeros((m,m), dtype=np.float)
        H = np.zeros((1,m), dtype=np.float)
      
        ns = 0; ls =0
        #Construction de la matrice de blocs du modèle de tendance
        if k>0:
            ns +=1
            G[0,0] = 1; H[0,0] = 1
            if k==1: F[0,0] = 1
            if k==2: F[0,0] = 2; F[0,1] = -1; F[1,0] = 1
            if k==3: F[0,0] = 3; F[0,1] = -3; F[0,2] = 1; F[1,0] = 1; F[2,1] = 1
            ls += k
      
        #Construction d'une matrice de blocs de composants saisonniers
        if p>0:
            ns +=1
            G[ls, ns-1] = 1
            H[0,ls] = 1
            for i in xrange(p-1): F[ls, ls+i] = -1
            for i in xrange(p-2): F[ls+i+1, ls+i] = 1
            ls +=p-1
      
        #Construction d'une matrice de blocs de composants AR
        if q>0:
            ns +=1
            G[ls, ns-1] = 1
            H[0,ls] = 1
            for i in xrange(q): F[ls, ls+i-1] = al[i]
            if q>1:
                for i in xrange(q-1): F[ls+i, ls+i-1] = 1
      
        #Calcul de la trame de la matrice de co-distribution de dispersion Q du modèle système
        Q = np.eye(ns,dtype=np.float)*w
      
        return m, F, G, H, Q

Code pour la détection du point de changement

KF_AnomalyDetection.py


# coding: utf-8
from math import log, ceil
import numpy as np
from scipy.stats import norm, t
import matplotlib.pyplot as plt
import KF

class KFAnomalyDetection:
    datalist = []
    outlier_score_list = []
    change_score_list = []
    outlier_score_smooth = []
    change_score_smooth = []
    outlier_resid = None
    change_resid = None
    outlier_pred = None
    change_pred = None
    
    def __init__(self, term, smooth, k=2, p=0, q=0, w=10):
        self.kf_outlier_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
        self.kf_first_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
        self.kf_change_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
        self.kf_second_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
        self.term = term
    
    def forward_step(self, new_data):
        # add new_data to datalist
        if len(self.datalist)>=self.term:
            self.datalist.pop(0)
            self.datalist.append(new_data)
        else:
            self.datalist.append(new_data)

        # compute score
        if self.outlier_pred is None:
            self.first_step(new_data)
        else:
            self.calculate_score_step(new_data)
            self.learn_step(new_data)
    
    def conversion_score(self, train, var):
        """convert score to log loss"""
        m = np.mean(train)
        s = np.std(train)
        try:
            if s < 1: s=1
            px = norm.pdf(var, m, s) if norm.pdf(var, m, s)!=0.0 else 1e-308
            res = -log(px)
            return res
        except:
            return 0

    def first_step(self, new_data):
        # learn outlier model
        self.outlier_resid, self.outlier_pred = \
                self.learn_KF(self.kf_outlier_score, new_data)
        # calculate outlier score
        self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
                    self.outlier_pred, new_data, self.outlier_score_list,
                    self.outlier_score_smooth)
        # learn cnage model
        self.change_resid, self.change_pred = \
                self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
        # calculate change score
        self.calculate_score(self.kf_second_smooth_score, self.change_resid,
                    self.change_pred, self.outlier_score_smooth[-1],
                    self.change_score_list, self.change_score_smooth)

    def learn_step(self, data):
        self.outlier_resid, self.outlier_pred = \
                self.learn_KF(self.kf_outlier_score, data)
        self.change_resid, self.change_pred = \
                self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
    
    def learn_KF(self, func, data):
        """leaning KF from new data"""
        pred = func.forward_backward(data)
        resid = np.abs( func.XSS[:func.term,0] - np.array(self.datalist) ) # residuals
        return resid, pred
         
    def calculate_score_step(self, new_data):
        # calculate outlier score
        self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
                    self.outlier_pred, new_data, self.outlier_score_list,
                    self.outlier_score_smooth)
        # calculate change score
        self.calculate_score(self.kf_second_smooth_score, self.change_resid,
                    self.change_pred, self.outlier_score_smooth[-1],
                    self.change_score_list, self.change_score_smooth)

    def calculate_score(self, func, resid, pred, new_data, storage_score_list, storage_smooth_list):
        score = self.conversion_score( resid, abs(float(pred) - float(new_data)) )
        print 'got score', score
        storage_score_list.append(score)
        print 'smoothing score'
        storage_smooth_list.append( func.forward_backward(score, smoothing=1) )
         
if __name__=='__main__':
    fname = 'test'
    term = 3 # time window of training
    smooth = 1
    kfad = KFAnomalyDetection(term,smooth,2,0,0,20)
    datalist = []
    of = open('score_out.txt','w')    
    dlist = np.hstack( (np.random.normal(0,1,100),np.random.normal(10,0.2,20),np.random.normal(0,1,100)) )
    for data in dlist:
        kfad.forward_step(data)
        of.write( str(kfad.change_score_smooth[-1])+'\n' )
    of.close()
    
    rng = range( len(dlist.tolist()) )
    plt.plot(rng,dlist,label=u"data")
    plt.show()
    plt.plot(rng,kfad.change_score_smooth,label=u"score")
    plt.show()

Désolé pour le code sale comme d'habitude. J'ai écrit le code du filtre Kalman il y a plus d'un an, mais quand je le regarde maintenant, je pense: "Qui a écrit ce uncode?" .. .. Eh bien, je pense positivement qu'il se développe il y a plus d'un an w

Lors du calcul du score, si la valeur de l'écart type est petite, de nombreux points seront jugés comme des valeurs anormales, de sorte que la valeur d'écart standard minimum est définie sur 1. (Je me demande si les données de comptage sont acceptables, je les utilise comme si elles l'étaient.) En outre, si une valeur trop anormale est entrée, la probabilité est considérée comme égale à zéro (la valeur zéro est renvoyée) et les opérations de journalisation ne peuvent pas être effectuées, elle est donc remplacée par l'ordre minimum de float. Je pense qu'il est possible que cette zone puisse être résolue en utilisant une distribution avec un ourlet épais.

Résultat expérimental

Collez le résultat de l'exécution avec le code ci-dessus ci-dessous.

données brutes

data.png

Modifier les données de score de points

score.png

commentaire

Vous pouvez assez bien détecter le score. Il est naturel de pouvoir détecter des données aussi claires. .. ..

Quant aux paramètres, il est presque nécessaire de changer uniquement le terme (fenêtre temporelle). Il n'y a aucun problème avec 1 pour la fenêtre lisse. En d'autres termes, la partie lisse du code ci-dessus n'est que du calcul gaspillé ... orz De plus, il n'y a pas de problème de dispersion et il n'est pas nécessaire de répondre à des changements majeurs, donc je pense qu'environ 10 à 20, c'est bien, selon l'échelle.

Résumé

Je pensais que le filtre Kalman aurait plus d'avantages que le modèle ARIMA, mais le résultat n'était pas aussi bon que prévu. .. Je vais continuer à chercher des endroits qui peuvent être améliorés.

Nous vous prions de nous excuser pour la gêne occasionnée, mais si vous faites une erreur, veuillez nous en informer.

Recommended Posts

Détection de point de changement avec filtre de Kalman
Introduction à la détection des anomalies 3 Détection des points de changement
Estimation des paramètres avec le filtre de Kalman
Estimation des paramètres avec le filtre de Kalman
Filtre de Kalman que vous pouvez comprendre
Détection de point de changement avec filtre de Kalman
Nuage de points avec du poivre
Essayez la détection des bords avec OpenCV
[Python] Filtrer les feuilles de calcul avec gspread
Détection de visage avec Python + dlib
[Python] Changer de type avec les pandas
Modifier les paramètres de nouvelle tentative avec boto3
Détection des bords en temps réel avec OpenCV
Détection de visage avec Python + OpenCV
Détection de falsification de la blockchain avec Python
Détection de visage avec Haar Cascades
Détection de visage d'anime avec OpenCV