Ich hatte die Möglichkeit, Änderungspunkte bei der Arbeit zu erkennen. Zu dieser Zeit hatte ich nicht viel Zeit, also habe ich es unter Bezugnahme auf Algorithmus mit ARIMA-Modell, den Yokkuns gemacht hat gemacht. Ich tat. Das ARIMA-Modell hatte jedoch verschiedene Probleme, daher habe ich versucht, es mit dem Kalman-Filter neu zu schreiben.
Die Hauptreferenz war "Anomalieerkennung durch Data Mining", die jeder liebt.
Die Berechnungsschritte sind wie folgt: Die Berechnung kann grob in einen Lernschritt und einen Bewertungsberechnungsschritt unterteilt werden.
Hier sind die Schritte zum Berechnen, bevor neue Daten eingehen.
Dies ist der Schritt, um zu berechnen, wann neue Daten eingehen.
Der Umriss der Berechnung der Änderungspunktbewertung ist wie folgt.
Es ist einfach, aber das war's. Dann werde ich den folgenden Code schreiben.
KF.py
# coding: utf8
from numpy.oldnumeric.linear_algebra import inverse
from scipy import linalg
import numpy as np
from math import log
class KalmanFiltering:
limy = 1e20 #Grenze der als fehlend anzusehenden Zahlenwerte
GSIG2 = 1
L = 1
R = np.identity(L)
NSUM = 0.0
SIG2 = 0.0
LDET = 0.0
def __init__(self, k, p, q, term=10, w=10):
self.k = k #Bodenunterschied
self.p = p #Saisonale Auflage
self.q = q #AR-Komponente
self.m, self.F, self.G, \
self.H, self.Q = self.FGHset(0,k,p,q,w)
self.term = term
self.strg_trm = term
self.resid = np.zeros(self.term)
self.pred = 0.0
# matrix for storage predicted value
self.XPS = np.zeros((term,self.m), dtype=np.float)
self.VPS = np.array([np.eye(self.m, dtype=np.float)]*term)
# matrix for storage predicted value
self.XFS = np.zeros((term,self.m), dtype=np.float)
self.VFS = np.array([np.eye(self.m, dtype=np.float)]*term)
# matrix for storage smoothed value
self.XSS = np.zeros((term,self.m), dtype=np.float)
self.VSS = np.array([np.eye(self.m, dtype=np.float)]*term)
def forward_backward(self, new_data, smoothing=0):
self.NSUM += 1
if self.NSUM < self.strg_trm:
self.term = int(self.NSUM)
else:
self.term = self.strg_trm
# forward
self.forward(new_data)
# smoothing
self.SMO()
if smoothing==1:
return np.mean( self.XSS[:self.term,0] )
return self.predict()[0]
def forward(self, y):
XF = self.XFS[self.term-1]
VF = self.VFS[self.term-1]
# 1span predicting
XP, VP = self.forward_predicting(VF, XF)
XF, VF = self.filtering(y, XP, VP)
self.storage_params(XP, XF, VP, VF)
# sig2 = self.SIG2 / self.NSUM
# FF = -0.5 * (self.NSUM * (log(2 * np.pi * sig2) + 1) + self.LDET)
# return {'LLF':FF, 'Ovar':sig2}
def storage_params(self, XP, XF, VP, VF):
if self.NSUM>self.term:
self.XPS[:self.term-1] = self.XPS[1:self.term]
self.XFS[:self.term-1] = self.XFS[1:self.term]
self.VPS[:self.term-1] = self.VPS[1:self.term]
self.VFS[:self.term-1] = self.VFS[1:self.term]
self.normal_storage(XP, XF, VP, VF)
else:
self.normal_storage(XP, XF, VP, VF)
def normal_storage(self, XP, XF, VP, VF):
self.XPS[self.term-1] = XP
self.XFS[self.term-1] = XF
self.VPS[self.term-1] = VP
self.VFS[self.term-1] = VF
def forward_predicting(self, VF, XF):
"""1span predicting"""
XP = np.ndarray.flatten( np.dot(self.F, XF.T) ) #Da es ab der zweiten Woche ein vertikaler Vektor wird, wird es immer in einen horizontalen Vektor konvertiert
VP = self.F.dot(VF).dot(self.F.T) + self.G.dot(self.Q).dot(self.G.T)
return XP, VP
def filtering(self, y, XP, VP):
if y < self.limy:
B = np.dot( np.dot(self.H, VP), self.H.T) + self.R #H ist mathematisch ein horizontaler Vektor
B1 = inverse(B)
K = np.matrix(np.dot(VP, self.H.T)) * np.matrix(B1) #K wird ein vertikaler Vektor(matrix)
e = np.array(y).T - np.dot(self.H, XP.T)
XF = np.array(XP) + np.array( K * np.matrix(e) ).T #Horizontaler Vektor
VF = np.array(VP) - np.array( K* np.matrix(self.H) * VP)
self.SIG2 += np.ndarray.flatten(np.array( np.matrix(e) * np.matrix(B1) * np.matrix(e).T ))[0] #Machen Sie es Matrix, so dass es auch in einer Dimension berechnet werden kann
self.LDET += log(linalg.det(B))
else:
XF = XP; VF = VP
return XF, VF
def SMO(self):
"""fixed-interval smoothing"""
XS1 = self.XFS[self.term-1]
VS1 = self.VFS[self.term-1]
self.XSS[self.term-1] = XS1
self.VSS[self.term-1] = VS1
for n1 in xrange(self.term):
n = (self.term-1) - n1; XP = self.XPS[n]; XF = self.XFS[n-1]
VP = self.VPS[n]; VF = self.VFS[n-1]; VPI = inverse(VP)
A = np.dot( np.dot(VF, self.F.T), VPI)
XS2 = XF + np.dot(A, (XS1 - XP))
VS2 = VF + np.dot( np.dot(A, (VS1 - VP)), A.T )
XS1 = XS2; VS1 = VS2
self.XSS[n-1] = XS1
self.VSS[n-1] = VS1
#Definition der logarithmischen Wahrscheinlichkeitsfunktion von TAU2x
def LogL(self, parm, *args):
y=args[0]
LLF = self.forward(y)
LL = LLF['LLF']
return -LL #Da optimeze eine Minimierungsfunktion ist, wird die logarithmische Wahrscheinlichkeit multipliziert mit minus zurückgegeben.
def predict(self, forward_time=1):
"""pridint average value"""
y = np.zeros(forward_time, dtype=np.float)
XFp=self.XFS[-1] #Holen Sie sich nur die neueste Datenmatrix
#VFp=VF[XF.shape[0]-1,:]
for n in xrange(forward_time):
XP = np.ndarray.flatten( np.dot(self.F, XFp.T) )
#VP = np.dot( np.dot(F, VF), F.T ) + np.dot( np.dot(G, Q), G.T )
y[n] = np.dot(self.H, XP) #Da es den erwarteten Wert annimmt, ist kein Rauschen enthalten
XFp=XP
return y
def FGHset(self, al, k, p, q, w=10):
"""Matrixeinstellung der Zustandsraumdarstellung des saisonalen Anpassungsmodells
al: α-Vektor des AR-Modells
k,p,q: Differenz, saisonaler Zyklus, Anzahl der AR-Parameter (k bei Vorhersage)>=2)
w:Streuung des Systemfehlers (Zur Erkennung von Änderungspunkten sollte dieser klein und fest eingestellt werden.)
"""
m = k + p + q -1
if q>0: G = np.zeros((m,3), dtype=np.float) #Wenn das Zustandsmodell drei enthält, Trend, Saison und AR
elif p>0: G = np.zeros((m,2), dtype=np.float) #Wenn keine AR-Komponente enthalten ist(q=0)
else: m=k; G = np.zeros((m,1), dtype=np.float)
F = np.zeros((m,m), dtype=np.float)
H = np.zeros((1,m), dtype=np.float)
ns = 0; ls =0
#Konstruktion der Blockmatrix des Trendmodells
if k>0:
ns +=1
G[0,0] = 1; H[0,0] = 1
if k==1: F[0,0] = 1
if k==2: F[0,0] = 2; F[0,1] = -1; F[1,0] = 1
if k==3: F[0,0] = 3; F[0,1] = -3; F[0,2] = 1; F[1,0] = 1; F[2,1] = 1
ls += k
#Konstruktion einer saisonalen Komponentenblockmatrix
if p>0:
ns +=1
G[ls, ns-1] = 1
H[0,ls] = 1
for i in xrange(p-1): F[ls, ls+i] = -1
for i in xrange(p-2): F[ls+i+1, ls+i] = 1
ls +=p-1
#Konstruktion der AR-Komponentenblockmatrix
if q>0:
ns +=1
G[ls, ns-1] = 1
H[0,ls] = 1
for i in xrange(q): F[ls, ls+i-1] = al[i]
if q>1:
for i in xrange(q-1): F[ls+i, ls+i-1] = 1
#Berechnung des Rahmens der Dispersions-Co-Verteilungsmatrix Q des Systemmodells
Q = np.eye(ns,dtype=np.float)*w
return m, F, G, H, Q
KF_AnomalyDetection.py
# coding: utf-8
from math import log, ceil
import numpy as np
from scipy.stats import norm, t
import matplotlib.pyplot as plt
import KF
class KFAnomalyDetection:
datalist = []
outlier_score_list = []
change_score_list = []
outlier_score_smooth = []
change_score_smooth = []
outlier_resid = None
change_resid = None
outlier_pred = None
change_pred = None
def __init__(self, term, smooth, k=2, p=0, q=0, w=10):
self.kf_outlier_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
self.kf_first_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
self.kf_change_score = KF.KalmanFiltering(k,p,q,term=term, w=w)
self.kf_second_smooth_score = KF.KalmanFiltering(k,p,q,term=smooth, w=w)
self.term = term
def forward_step(self, new_data):
# add new_data to datalist
if len(self.datalist)>=self.term:
self.datalist.pop(0)
self.datalist.append(new_data)
else:
self.datalist.append(new_data)
# compute score
if self.outlier_pred is None:
self.first_step(new_data)
else:
self.calculate_score_step(new_data)
self.learn_step(new_data)
def conversion_score(self, train, var):
"""convert score to log loss"""
m = np.mean(train)
s = np.std(train)
try:
if s < 1: s=1
px = norm.pdf(var, m, s) if norm.pdf(var, m, s)!=0.0 else 1e-308
res = -log(px)
return res
except:
return 0
def first_step(self, new_data):
# learn outlier model
self.outlier_resid, self.outlier_pred = \
self.learn_KF(self.kf_outlier_score, new_data)
# calculate outlier score
self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
self.outlier_pred, new_data, self.outlier_score_list,
self.outlier_score_smooth)
# learn cnage model
self.change_resid, self.change_pred = \
self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
# calculate change score
self.calculate_score(self.kf_second_smooth_score, self.change_resid,
self.change_pred, self.outlier_score_smooth[-1],
self.change_score_list, self.change_score_smooth)
def learn_step(self, data):
self.outlier_resid, self.outlier_pred = \
self.learn_KF(self.kf_outlier_score, data)
self.change_resid, self.change_pred = \
self.learn_KF(self.kf_change_score, self.outlier_score_smooth[-1])
def learn_KF(self, func, data):
"""leaning KF from new data"""
pred = func.forward_backward(data)
resid = np.abs( func.XSS[:func.term,0] - np.array(self.datalist) ) # residuals
return resid, pred
def calculate_score_step(self, new_data):
# calculate outlier score
self.calculate_score(self.kf_first_smooth_score, self.outlier_resid,
self.outlier_pred, new_data, self.outlier_score_list,
self.outlier_score_smooth)
# calculate change score
self.calculate_score(self.kf_second_smooth_score, self.change_resid,
self.change_pred, self.outlier_score_smooth[-1],
self.change_score_list, self.change_score_smooth)
def calculate_score(self, func, resid, pred, new_data, storage_score_list, storage_smooth_list):
score = self.conversion_score( resid, abs(float(pred) - float(new_data)) )
print 'got score', score
storage_score_list.append(score)
print 'smoothing score'
storage_smooth_list.append( func.forward_backward(score, smoothing=1) )
if __name__=='__main__':
fname = 'test'
term = 3 # time window of training
smooth = 1
kfad = KFAnomalyDetection(term,smooth,2,0,0,20)
datalist = []
of = open('score_out.txt','w')
dlist = np.hstack( (np.random.normal(0,1,100),np.random.normal(10,0.2,20),np.random.normal(0,1,100)) )
for data in dlist:
kfad.forward_step(data)
of.write( str(kfad.change_score_smooth[-1])+'\n' )
of.close()
rng = range( len(dlist.tolist()) )
plt.plot(rng,dlist,label=u"data")
plt.show()
plt.plot(rng,kfad.change_score_smooth,label=u"score")
plt.show()
Entschuldigung für den schmutzigen Code wie gewohnt. Ich habe den Code für den Kalman-Filter vor über einem Jahr geschrieben, aber wenn ich ihn mir jetzt ansehe, denke ich: "Wer hat diesen Uncode geschrieben?" .. .. Nun, ich werde positiv denken, dass es vor mehr als einem Jahr wächst w
Wenn bei der Berechnung der Punktzahl der Standardabweichungswert klein ist, werden viele Punkte als abnormale Werte beurteilt, sodass der minimale Standardabweichungswert auf 1 gesetzt wird. (Ich frage mich, ob Zähldaten akzeptabel sind. Ich verwende sie so, als ob sie es wären.) Wenn ein zu abnormaler Wert eingegeben wird, wird die Wahrscheinlichkeit als Null betrachtet (der Wert Null wird zurückgegeben), und Protokolloperationen können nicht ausgeführt werden. Daher wird sie durch die Mindestreihenfolge des Gleitkommas ersetzt. Ich denke, es besteht die Möglichkeit, dass dieser Bereich durch Verwendung einer Verteilung mit einem dicken Saum gelöst werden kann.
Fügen Sie das Ergebnis der Ausführung mit dem obigen Code ein.
Sie können die Punktzahl ziemlich gut erkennen. Es ist natürlich, solche eindeutigen Daten erkennen zu können. .. ..
Bei den Parametern muss fast nur der Begriff (Zeitfenster) geändert werden. Es gibt kein Problem mit 1 für das glatte Fenster. Mit anderen Worten, der glatte Teil des obigen Codes ist nur eine verschwendete Berechnung ... orz Es gibt auch kein Problem mit der Dispersion, und es ist nicht notwendig, auf größere Änderungen zu reagieren. Daher denke ich, dass 10 bis 20 je nach Maßstab in Ordnung sind.
Ich dachte, dass der Kalman-Filter mehr Vorteile als das ARIMA-Modell haben würde, aber das Ergebnis war nicht so gut wie ich erwartet hatte. .. Ich werde weiterhin nach Orten suchen, die verbessert werden können.
Wir entschuldigen uns für die Unannehmlichkeiten, aber wenn Sie einen Fehler machen, würden wir uns freuen, wenn Sie darauf hinweisen könnten.
Recommended Posts