J'ai eu l'opportunité de détecter des points de changement au travail. À ce moment-là, je n'avais pas beaucoup de temps, donc je l'ai fait en me référant à Algorithme utilisant le modèle ARIMA que faisait yokkuns. J'ai fait. Cependant, le modèle ARIMA avait divers problèmes, j'ai donc essayé de le réécrire avec le filtre kalman.
La principale référence était "la détection d'anomalies par data mining" que tout le monde aime.
Les étapes de calcul sont les suivantes: Le calcul peut être globalement divisé en une étape d'apprentissage et une étape de calcul de score.
Voici les étapes à suivre pour calculer avant l'arrivée de nouvelles données.
C'est l'étape à suivre pour calculer l'arrivée de nouvelles données.
Les grandes lignes du calcul du score des points de changement sont les suivantes.
C'est facile, mais c'est tout. Ensuite, j'écrirai le code ci-dessous.
KF.py
# coding: utf8
from numpy.oldnumeric.linear_algebra import inverse
from scipy import linalg
import numpy as np
from math import log
class KalmanFiltering:
limy = 1e20 #Limite des valeurs numériques à considérer comme manquantes
GSIG2 = 1
L = 1
R = np.identity(L)
NSUM = 0.0
SIG2 = 0.0
LDET = 0.0
def __init__(self, k, p, q, term=10, w=10):
self.k = k #Différence de sol
self.p = p #Circulation saisonnière
self.q = q #Composant AR
self.m, self.F, self.G, \
self.H, self.Q = self.FGHset(0,k,p,q,w)
self.term = term
self.strg_trm = term
self.resid = np.zeros(self.term)
self.pred = 0.0
# matrix for storage predicted value
self.XPS = np.zeros((term,self.m), dtype=np.float)
self.VPS = np.array([np.eye(self.m, dtype=np.float)]*term)
# matrix for storage predicted value
self.XFS = np.zeros((term,self.m), dtype=np.float)
self.VFS = np.array([np.eye(self.m, dtype=np.float)]*term)
# matrix for storage smoothed value
self.XSS = np.zeros((term,self.m), dtype=np.float)
self.VSS = np.array([np.eye(self.m, dtype=np.float)]*term)
def forward_backward(self, new_data, smoothing=0):
self.NSUM += 1
if self.NSUM < self.strg_trm:
self.term = int(self.NSUM)
else:
self.term = self.strg_trm
# forward
self.forward(new_data)
# smoothing
self.SMO()
if smoothing==1:
return np.mean( self.XSS[:self.term,0] )
return self.predict()[0]
def forward(self, y):
XF = self.XFS[self.term-1]
VF = self.VFS[self.term-1]
# 1span predicting
XP, VP = self.forward_predicting(VF, XF)
XF, VF = self.filtering(y, XP, VP)
self.storage_params(XP, XF, VP, VF)
# sig2 = self.SIG2 / self.NSUM
# FF = -0.5 * (self.NSUM * (log(2 * np.pi * sig2) + 1) + self.LDET)
# return {'LLF':FF, 'Ovar':sig2}
def storage_params(self, XP, XF, VP, VF):
if self.NSUM>self.term:
self.XPS[:self.term-1] = self.XPS[1:self.term]
self.XFS[:self.term-1] = self.XFS[1:self.term]
self.VPS[:self.term-1] = self.VPS[1:self.term]
self.VFS[:self.term-1] = self.VFS[1:self.term]
self.normal_storage(XP, XF, VP, VF)
else:
self.normal_storage(XP, XF, VP, VF)
def normal_storage(self, XP, XF, VP, VF):
self.XPS[self.term-1] = XP
self.XFS[self.term-1] = XF
self.VPS[self.term-1] = VP
self.VFS[self.term-1] = VF
def forward_predicting(self, VF, XF):
"""1span predicting"""
XP = np.ndarray.flatten( np.dot(self.F, XF.T) ) #Puisqu'il devient un vecteur vertical à partir de la deuxième semaine, il est toujours converti en vecteur horizontal
VP = self.F.dot(VF).dot(self.F.T) + self.G.dot(self.Q).dot(self.G.T)
return XP, VP
def filtering(self, y, XP, VP):
if y < self.limy:
B = np.dot( np.dot(self.H, VP), self.H.T) + self.R #H est mathématiquement un vecteur horizontal
B1 = inverse(B)
K = np.matrix(np.dot(VP, self.H.T)) * np.matrix(B1) #K devient un vecteur vertical(matrix)
e = np.array(y).T - np.dot(self.H, XP.T)
XF = np.array(XP) + np.array( K * np.matrix(e) ).T #Vecteur horizontal
VF = np.array(VP) - np.array( K* np.matrix(self.H) * VP)
self.SIG2 += np.ndarray.flatten(np.array( np.matrix(e) * np.matrix(B1) * np.matrix(e).T ))[0] #Faites-en une matrice pour qu'elle puisse être calculée même dans une dimension
self.LDET += log(linalg.det(B))
else:
XF = XP; VF = VP
return XF, VF
def SMO(self):
"""fixed-interval smoothing"""
XS1 = self.XFS[self.term-1]
VS1 = self.VFS[self.term-1]
self.XSS[self.term-1] = XS1
self.VSS[self.term-1] = VS1
for n1 in xrange(self.term):
n = (self.term-1) - n1; XP = self.XPS[n]; XF = self.XFS[n-1]
VP = self.VPS[n]; VF = self.VFS[n-1]; VPI = inverse(VP)
A = np.dot( np.dot(VF, self.F.T), VPI)
XS2 = XF + np.dot(A, (XS1 - XP))
VS2 = VF + np.dot( np.dot(A, (VS1 - VP)), A.T )
XS1 = XS2; VS1 = VS2
self.XSS[n-1] = XS1
self.VSS[n-1] = VS1
#Définition de la fonction de vraisemblance logarithmique de TAU2x
def LogL(self, parm, *args):
y=args[0]
LLF = self.forward(y)
LL = LLF['LLF']
return -LL #Puisque l'optimeze est une fonction de minimisation, elle renvoie la vraisemblance logarithmique multipliée par moins.
def predict(self, forward_time=1):
"""pridint average value"""
y = np.zeros(forward_time, dtype=np.float)
XFp=self.XFS[-1] #Obtenez uniquement la matrice de données la plus récente
#VFp=VF[XF.shape[0]-1,:]
for n in xrange(forward_time):
XP = np.ndarray.flatten( np.dot(self.F, XFp.T) )
#VP = np.dot( np.dot(F, VF), F.T ) + np.dot( np.dot(G, Q), G.T )
y[n] = np.dot(self.H, XP) #Puisqu'il prend la valeur attendue, aucun bruit n'est inclus
XFp=XP
return y
def FGHset(self, al, k, p, q, w=10):
"""Définition de la matrice de la représentation de l'espace d'états du modèle de désaisonnalisation
al: vecteur α du modèle AR
k,p,q: Différence, cycle saisonnier, nombre de paramètres AR (k lors de la prédiction)>=2)
w:Dispersion de l'erreur système (pour la détection du point de changement, il doit être réglé petit et fixe)
"""
m = k + p + q -1
if q>0: G = np.zeros((m,3), dtype=np.float) #Lorsque le modèle d'état comprend trois, tendance, saison et RA
elif p>0: G = np.zeros((m,2), dtype=np.float) #Lorsqu'il ne contient pas de composant AR(q=0)
else: m=k; G = np.zeros((m,1), dtype=np.float)
F = np.zeros((m,m), dtype=np.float)
H = np.zeros((1,m), dtype=np.float)
ns = 0; ls =0
#Construction de la matrice de blocs du modèle de tendance
if k>0:
ns +=1
G[0,0] = 1; H[0,0] = 1
if k==1: F[0,0] = 1
if k==2: F[0,0] = 2; F[0,1] = -1; F[1,0] = 1
if k==3: F[0,0] = 3; F[0,1] = -3; F[0,2] = 1; F[1,0] = 1; F[2,1] = 1
ls += k
#Construction d'une matrice de blocs de composants saisonniers
if p>0:
ns +=1
G[ls, ns-1] = 1
H[0,ls] = 1
for i in xrange(p-1): F[ls, ls+i] = -1
for i in xrange(p-2): F[ls+i+1, ls+i] = 1
ls +=p-1
#Construction d'une matrice de blocs de composants AR
if q>0:
ns +=1
G[ls, ns-1] = 1
H[0,ls] = 1
for i in xrange(q): F[ls, ls+i-1] = al[i]
if q>1:
for i in xrange(q-1): F[ls+i, ls+i-1] = 1
#Calcul de la trame de la matrice de co-distribution de dispersion Q du modèle système
Q = np.eye(ns,dtype=np.float)*w
return m, F, G, H, Q
KF_AnomalyDetection.py
# coding: utf-8
from math import log, ceil
import numpy as np
from scipy.stats import norm, t
import matplotlib.pyplot as plt
import KF
class KFAnomalyDetection:
datalist = []
outlier_score_list = []
change_score_list = []
outlier_score_smooth = []
change_score_smooth = []
outlier_resid = None
change_resid = None
outlier_pred = None
change_pred = None
def __init__(self, term, smooth, k=2, p=0, q=0, w=10):
self.kf_outlier_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
self.kf_first_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
self.kf_change_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
self.kf_second_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
self.term = term
def forward_step(self, new_data):
# add new_data to datalist
if len(self.datalist)>=self.term:
self.datalist.pop(0)
self.datalist.append(new_data)
else:
self.datalist.append(new_data)
# compute score
if self.outlier_pred is None:
self.first_step(new_data)
else:
self.calculate_score_step(new_data)
self.learn_step(new_data)
def conversion_score(self, train, var):
"""convert score to log loss"""
m = np.mean(train)
s = np.std(train)
try:
if s < 1: s=1
px = norm.pdf(var, m, s) if norm.pdf(var, m, s)!=0.0 else 1e-308
res = -log(px)
return res
except:
return 0
def first_step(self, new_data):
# learn outlier model
self.outlier_resid, self.outlier_pred = \
self.learn_KF(self.kf_outlier_score, new_data)
# calculate outlier score
self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
self.outlier_pred, new_data, self.outlier_score_list,
self.outlier_score_smooth)
# learn cnage model
self.change_resid, self.change_pred = \
self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
# calculate change score
self.calculate_score(self.kf_second_smooth_score, self.change_resid,
self.change_pred, self.outlier_score_smooth[-1],
self.change_score_list, self.change_score_smooth)
def learn_step(self, data):
self.outlier_resid, self.outlier_pred = \
self.learn_KF(self.kf_outlier_score, data)
self.change_resid, self.change_pred = \
self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
def learn_KF(self, func, data):
"""leaning KF from new data"""
pred = func.forward_backward(data)
resid = np.abs( func.XSS[:func.term,0] - np.array(self.datalist) ) # residuals
return resid, pred
def calculate_score_step(self, new_data):
# calculate outlier score
self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
self.outlier_pred, new_data, self.outlier_score_list,
self.outlier_score_smooth)
# calculate change score
self.calculate_score(self.kf_second_smooth_score, self.change_resid,
self.change_pred, self.outlier_score_smooth[-1],
self.change_score_list, self.change_score_smooth)
def calculate_score(self, func, resid, pred, new_data, storage_score_list, storage_smooth_list):
score = self.conversion_score( resid, abs(float(pred) - float(new_data)) )
print 'got score', score
storage_score_list.append(score)
print 'smoothing score'
storage_smooth_list.append( func.forward_backward(score, smoothing=1) )
if __name__=='__main__':
fname = 'test'
term = 3 # time window of training
smooth = 1
kfad = KFAnomalyDetection(term,smooth,2,0,0,20)
datalist = []
of = open('score_out.txt','w')
dlist = np.hstack( (np.random.normal(0,1,100),np.random.normal(10,0.2,20),np.random.normal(0,1,100)) )
for data in dlist:
kfad.forward_step(data)
of.write( str(kfad.change_score_smooth[-1])+'\n' )
of.close()
rng = range( len(dlist.tolist()) )
plt.plot(rng,dlist,label=u"data")
plt.show()
plt.plot(rng,kfad.change_score_smooth,label=u"score")
plt.show()
Désolé pour le code sale comme d'habitude. J'ai écrit le code du filtre Kalman il y a plus d'un an, mais quand je le regarde maintenant, je pense: "Qui a écrit ce uncode?" .. .. Eh bien, je pense positivement qu'il se développe il y a plus d'un an w
Lors du calcul du score, si la valeur de l'écart type est petite, de nombreux points seront jugés comme des valeurs anormales, de sorte que la valeur d'écart standard minimum est définie sur 1. (Je me demande si les données de comptage sont acceptables, je les utilise comme si elles l'étaient.) En outre, si une valeur trop anormale est entrée, la probabilité est considérée comme égale à zéro (la valeur zéro est renvoyée) et les opérations de journalisation ne peuvent pas être effectuées, elle est donc remplacée par l'ordre minimum de float. Je pense qu'il est possible que cette zone puisse être résolue en utilisant une distribution avec un ourlet épais.
Collez le résultat de l'exécution avec le code ci-dessus ci-dessous.
Vous pouvez assez bien détecter le score. Il est naturel de pouvoir détecter des données aussi claires. .. ..
Quant aux paramètres, il est presque nécessaire de changer uniquement le terme (fenêtre temporelle). Il n'y a aucun problème avec 1 pour la fenêtre lisse. En d'autres termes, la partie lisse du code ci-dessus n'est que du calcul gaspillé ... orz De plus, il n'y a pas de problème de dispersion et il n'est pas nécessaire de répondre à des changements majeurs, donc je pense qu'environ 10 à 20, c'est bien, selon l'échelle.
Je pensais que le filtre Kalman aurait plus d'avantages que le modèle ARIMA, mais le résultat n'était pas aussi bon que prévu. .. Je vais continuer à chercher des endroits qui peuvent être améliorés.
Nous vous prions de nous excuser pour la gêne occasionnée, mais si vous faites une erreur, veuillez nous en informer.
Recommended Posts