[PYTHON] Change point detection with Kalman filter

Motivation

I had the opportunity to detect change points at work. At that time, I didn't have much time, so I made it by referring to Algorithm using ARIMA model that yokkuns was doing. I did. However, the ARIMA model had various troubles, so I tried to rewrite it with the kalman filter.

Problems with the ARIMA model

References

The main reference was "anomaly detection by data mining," which everyone loves.

Overview

The calculation steps are as follows: The calculation can be broadly divided into a learning step and a score calculation step.

Learning steps

Here are the steps to calculate before new data comes in.

Score calculation step

This is the step to calculate when new data comes in.

The outline of the change point score calculation is as follows.

It's easy, but that's it. Then, I will write the code below.

Reference code

Code for kalman filter

KF.py


# coding: utf8

from numpy.oldnumeric.linear_algebra import inverse
from scipy import linalg
import numpy as np
from math import log

class KalmanFiltering:
    limy = 1e20 #Boundary of numerical values to be regarded as missing
    GSIG2 = 1
    L = 1
    R = np.identity(L)
    NSUM = 0.0
    SIG2 = 0.0
    LDET = 0.0
    
    def __init__(self, k, p, q, term=10, w=10):
        self.k = k #Floor difference
        self.p = p #Seasonal circulation
        self.q = q #AR component
        self.m, self.F, self.G, \
            self.H, self.Q = self.FGHset(0,k,p,q,w)
        self.term = term
        self.strg_trm = term
        
        self.resid = np.zeros(self.term)
        self.pred = 0.0
        
        # matrix for storage predicted value
        self.XPS = np.zeros((term,self.m), dtype=np.float)
        self.VPS = np.array([np.eye(self.m, dtype=np.float)]*term)
        # matrix for storage predicted value
        self.XFS = np.zeros((term,self.m), dtype=np.float)
        self.VFS = np.array([np.eye(self.m, dtype=np.float)]*term)
        # matrix for storage smoothed value
        self.XSS = np.zeros((term,self.m), dtype=np.float)
        self.VSS = np.array([np.eye(self.m, dtype=np.float)]*term)

    def forward_backward(self, new_data, smoothing=0):
        self.NSUM += 1 
        if self.NSUM < self.strg_trm:
            self.term = int(self.NSUM)
        else:
            self.term = self.strg_trm
        # forward
        self.forward(new_data)
        # smoothing
        self.SMO()
        if smoothing==1:
            return np.mean( self.XSS[:self.term,0] )
                    
        return self.predict()[0]
    
    def forward(self, y):
        XF = self.XFS[self.term-1]
        VF = self.VFS[self.term-1]     

        # 1span predicting
        XP, VP = self.forward_predicting(VF, XF)
        XF, VF = self.filtering(y, XP, VP)
        self.storage_params(XP, XF, VP, VF)
#         sig2 = self.SIG2 / self.NSUM
#         FF = -0.5 * (self.NSUM * (log(2 * np.pi * sig2) + 1) + self.LDET)
#         return {'LLF':FF, 'Ovar':sig2}
    
    def storage_params(self, XP, XF, VP, VF):
        if self.NSUM>self.term:
            self.XPS[:self.term-1] = self.XPS[1:self.term] 
            self.XFS[:self.term-1] = self.XFS[1:self.term]
            self.VPS[:self.term-1] = self.VPS[1:self.term]
            self.VFS[:self.term-1] = self.VFS[1:self.term]
            self.normal_storage(XP, XF, VP, VF)
        else:
            self.normal_storage(XP, XF, VP, VF)
                
    def normal_storage(self, XP, XF, VP, VF):
        self.XPS[self.term-1] = XP 
        self.XFS[self.term-1] = XF
        self.VPS[self.term-1] = VP
        self.VFS[self.term-1] = VF
    
    def forward_predicting(self, VF, XF):
        """1span predicting"""
        XP = np.ndarray.flatten( np.dot(self.F, XF.T) ) #Since it becomes a vertical vector from the second week, it is always converted to a horizontal vector
        VP = self.F.dot(VF).dot(self.F.T) +  self.G.dot(self.Q).dot(self.G.T)
        return XP, VP
    
    def filtering(self, y, XP, VP):
        if y < self.limy: 
            B = np.dot( np.dot(self.H, VP), self.H.T)  + self.R  #H is mathematically a horizontal vector
            B1 = inverse(B)
            K = np.matrix(np.dot(VP, self.H.T)) * np.matrix(B1) #K becomes a vertical vector(matrix)
            e = np.array(y).T - np.dot(self.H, XP.T)            
            XF = np.array(XP) + np.array( K * np.matrix(e) ).T #Horizontal vector
            VF = np.array(VP) - np.array( K* np.matrix(self.H) * VP)           
            self.SIG2 += np.ndarray.flatten(np.array( np.matrix(e) * np.matrix(B1) * np.matrix(e).T ))[0] #Make matrix so that it can be calculated even in one dimension
            self.LDET += log(linalg.det(B))
        else:
            XF = XP; VF = VP
        return XF, VF
        
    def SMO(self):
        """fixed-interval smoothing"""
        XS1 = self.XFS[self.term-1]
        VS1 = self.VFS[self.term-1]
        self.XSS[self.term-1] = XS1
        self.VSS[self.term-1] = VS1
        for n1 in xrange(self.term):        
            n = (self.term-1) - n1; XP = self.XPS[n]; XF = self.XFS[n-1]
            VP = self.VPS[n]; VF = self.VFS[n-1]; VPI = inverse(VP)
            A = np.dot( np.dot(VF, self.F.T), VPI)
            XS2 = XF + np.dot(A, (XS1 - XP))
            VS2 = VF + np.dot( np.dot(A, (VS1 - VP)), A.T )
            XS1 = XS2; VS1 = VS2
            self.XSS[n-1] = XS1
            self.VSS[n-1] = VS1
                
    #Definition of TAU2x log-likelihood function
    def LogL(self, parm, *args):
        y=args[0]
        LLF = self.forward(y)
        LL = LLF['LLF']
        return -LL #Since optimeze is a minimization function, it returns the log-likelihood multiplied by minus.
    
    def predict(self, forward_time=1):
        """pridint average value"""
        y = np.zeros(forward_time, dtype=np.float)
        XFp=self.XFS[-1] #Get only the most recent data matrix
        #VFp=VF[XF.shape[0]-1,:]
        
        for n in xrange(forward_time):
            XP = np.ndarray.flatten( np.dot(self.F, XFp.T) )
            #VP = np.dot( np.dot(F, VF), F.T ) +  np.dot( np.dot(G, Q), G.T )
            y[n] = np.dot(self.H, XP) #Since it takes the expected value, no noise is added.
            XFp=XP
        return y

    def FGHset(self, al, k, p, q, w=10):
        """Matrix setting of state-space representation of seasonally adjusted model
al: α vector of AR model
        k,p,q: Difference, seasonal cycle, number of AR parameters (k when predicting)>=2)
        w:Dispersion of system error (For change point detection, it should be set small and fixed)
        """
        m = k + p + q -1
    
        if q>0: G = np.zeros((m,3), dtype=np.float) #When the state model includes trend, season, and AR
        elif p>0: G = np.zeros((m,2), dtype=np.float) #When it does not contain AR component(q=0)
        else: m=k; G = np.zeros((m,1), dtype=np.float)
        F = np.zeros((m,m), dtype=np.float)
        H = np.zeros((1,m), dtype=np.float)
      
        ns = 0; ls =0
        #Construction of block matrix of trend model
        if k>0:
            ns +=1
            G[0,0] = 1; H[0,0] = 1
            if k==1: F[0,0] = 1
            if k==2: F[0,0] = 2; F[0,1] = -1; F[1,0] = 1
            if k==3: F[0,0] = 3; F[0,1] = -3; F[0,2] = 1; F[1,0] = 1; F[2,1] = 1
            ls += k
      
        #Construction of seasonally adjusted component block matrix
        if p>0:
            ns +=1
            G[ls, ns-1] = 1
            H[0,ls] = 1
            for i in xrange(p-1): F[ls, ls+i] = -1
            for i in xrange(p-2): F[ls+i+1, ls+i] = 1
            ls +=p-1
      
        #Construction of AR component block matrix
        if q>0:
            ns +=1
            G[ls, ns-1] = 1
            H[0,ls] = 1
            for i in xrange(q): F[ls, ls+i-1] = al[i]
            if q>1:
                for i in xrange(q-1): F[ls+i, ls+i-1] = 1
      
        #Calculation of the frame of the variance-covariance matrix Q of the system model
        Q = np.eye(ns,dtype=np.float)*w
      
        return m, F, G, H, Q

Code for change point detection

KF_AnomalyDetection.py


# coding: utf-8
from math import log, ceil
import numpy as np
from scipy.stats import norm, t
import matplotlib.pyplot as plt
import KF

class KFAnomalyDetection:
    datalist = []
    outlier_score_list = []
    change_score_list = []
    outlier_score_smooth = []
    change_score_smooth = []
    outlier_resid = None
    change_resid = None
    outlier_pred = None
    change_pred = None
    
    def __init__(self, term, smooth, k=2, p=0, q=0, w=10):
        self.kf_outlier_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
        self.kf_first_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
        self.kf_change_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
        self.kf_second_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
        self.term = term
    
    def forward_step(self, new_data):
        # add new_data to datalist
        if len(self.datalist)>=self.term:
            self.datalist.pop(0)
            self.datalist.append(new_data)
        else:
            self.datalist.append(new_data)

        # compute score
        if self.outlier_pred is None:
            self.first_step(new_data)
        else:
            self.calculate_score_step(new_data)
            self.learn_step(new_data)
    
    def conversion_score(self, train, var):
        """convert score to log loss"""
        m = np.mean(train)
        s = np.std(train)
        try:
            if s < 1: s=1
            px = norm.pdf(var, m, s) if norm.pdf(var, m, s)!=0.0 else 1e-308
            res = -log(px)
            return res
        except:
            return 0

    def first_step(self, new_data):
        # learn outlier model
        self.outlier_resid, self.outlier_pred = \
                self.learn_KF(self.kf_outlier_score, new_data)
        # calculate outlier score
        self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
                    self.outlier_pred, new_data, self.outlier_score_list,
                    self.outlier_score_smooth)
        # learn cnage model
        self.change_resid, self.change_pred = \
                self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
        # calculate change score
        self.calculate_score(self.kf_second_smooth_score, self.change_resid,
                    self.change_pred, self.outlier_score_smooth[-1],
                    self.change_score_list, self.change_score_smooth)

    def learn_step(self, data):
        self.outlier_resid, self.outlier_pred = \
                self.learn_KF(self.kf_outlier_score, data)
        self.change_resid, self.change_pred = \
                self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
    
    def learn_KF(self, func, data):
        """leaning KF from new data"""
        pred = func.forward_backward(data)
        resid = np.abs( func.XSS[:func.term,0] - np.array(self.datalist) ) # residuals
        return resid, pred
         
    def calculate_score_step(self, new_data):
        # calculate outlier score
        self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
                    self.outlier_pred, new_data, self.outlier_score_list,
                    self.outlier_score_smooth)
        # calculate change score
        self.calculate_score(self.kf_second_smooth_score, self.change_resid,
                    self.change_pred, self.outlier_score_smooth[-1],
                    self.change_score_list, self.change_score_smooth)

    def calculate_score(self, func, resid, pred, new_data, storage_score_list, storage_smooth_list):
        score = self.conversion_score( resid, abs(float(pred) - float(new_data)) )
        print 'got score', score
        storage_score_list.append(score)
        print 'smoothing score'
        storage_smooth_list.append( func.forward_backward(score, smoothing=1) )
         
if __name__=='__main__':
    fname = 'test'
    term = 3 # time window of training
    smooth = 1
    kfad = KFAnomalyDetection(term,smooth,2,0,0,20)
    datalist = []
    of = open('score_out.txt','w')    
    dlist = np.hstack( (np.random.normal(0,1,100),np.random.normal(10,0.2,20),np.random.normal(0,1,100)) )
    for data in dlist:
        kfad.forward_step(data)
        of.write( str(kfad.change_score_smooth[-1])+'\n' )
    of.close()
    
    rng = range( len(dlist.tolist()) )
    plt.plot(rng,dlist,label=u"data")
    plt.show()
    plt.plot(rng,kfad.change_score_smooth,label=u"score")
    plt.show()

Sorry for the dirty code as usual. I wrote the Kalman filter code over a year ago, but when I look at it now, I think "Who wrote this uncode?" .. .. Well, I will positively think that it is growing more than a year ago w

When calculating the score, if the standard deviation value is small, many points will be judged as abnormal values, so the minimum standard deviation value is set to 1. (I wonder if count data is acceptable, I use it as if it were.) Also, if a value that is too abnormal is input, the probability is considered to be zero (zero value is returned) and log calculation cannot be performed, so it is replaced with the minimum order of float. I think there is a possibility that this area can be solved by using a distribution with a thick tail.

Experimental result

Paste the result of running with the above code below.

raw data

data.png

Change point score data

score.png

comment

You can detect the score fairly well. It is natural to be able to detect such clear data. .. ..

As for the parameters, it is almost necessary to change only the term (time window). There is no problem with 1 for the smooth window. In other words, the smooth part of the above code is just wasted calculation ... orz Also, there is no problem with dispersion, and it is not necessary to respond to major changes, so I think that about 10 to 20 is fine, depending on the scale.

Summary

I thought that the Kalman filter would have more advantages than the ARIMA model, but the result was not as good as I expected. .. I will continue to look for places that can be improved.

We apologize for the inconvenience, but if you make a mistake, we would appreciate it if you could point it out.

Recommended Posts

Change point detection with Kalman filter
Anomaly detection introduction 3 Change point detection
Parameter estimation with Kalman filter
Parameter estimation with Kalman filter
Kalman filter that you can understand
Change point detection with Kalman filter
I implemented ChangeFinder (change point detection)
Point Cloud with Pepper
Try edge detection with OpenCV
[Python] Filter spreadsheets with gspread
Face detection with Python + dlib
[Python] Change dtype with pandas
Change retry settings with boto3
Real-time edge detection with OpenCV
Face detection with Python + OpenCV
Blockchain tampering detection with Python
Face detection with Haar Cascades
Anime face detection with OpenCV
Aim to improve prediction accuracy with Kaggle / MNIST (2. Change filter size)