LSTM erkennt Anomalien in Zeitreihendaten.
Implementiert mit Keras unter Bezugnahme auf den Inhalt von ReNom Tutorial in Referenz [1]. ..
Der allgemeine Fluss ist wie folgt.
Step1 Erstellen Sie ein Modell, das zukünftige Werte aus vergangenen Daten unter Verwendung normaler Daten vorhersagt.
Step2 Mit Testdaten anhand des erstellten Modells vorhersagen und den Fehlervektor abtasten. Dann wird eine Normalverteilung an den Fehlervektor angepasst.
Step3 Berechnen Sie den Fehlervektor des Teils, in dem die Abnormalität aufgetreten zu sein scheint, und wenn sich am Ende der in Schritt 2 geschätzten Normalverteilung ein Fehlervektor befindet, wird beurteilt, dass die Abnormalität aufgetreten ist.
%matplotlib inline
import keras
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
Verwenden Sie EKG-Daten mit der Bezeichnung qtdb / sel102 EKG-Datensatz [2].
Die Daten erhalten Sie unter:
http://www.cs.ucr.edu/~eamonn/discords/qtdbsel102.txt
df = pd.read_csv('qtdbsel102.txt', header=None, delimiter='\t')
print(df.shape)
(45000, 3)
Dieses Mal konzentrieren wir uns auf die Daten in der dritten Spalte, um Anomalien zu erkennen.
ecg = df.iloc[:,2].values
ecg = ecg.reshape(len(ecg), -1)
print("length of ECG data:", len(ecg))
length of ECG data: 45000
Setzen Sie als Datenvorverarbeitung den Mittelwert auf 0 und die Varianz auf 1.
scaler = StandardScaler()
std_ecg = scaler.fit_transform(ecg)
Als nächstes werden die Daten dargestellt, um die Tendenz zu bestätigen.
plt.figure()
plt.style.use('ggplot')
plt.figure(figsize=(15,5))
plt.xlabel('time')
plt.ylabel('ECG\'s value')
plt.plot(np.arange(45000), std_ecg[:45000], color='b')
plt.legend()
plt.show()
Obwohl es insgesamt periodisch ist, kann man sehen, dass die Form vor 5000 Schritten an einer Stelle gebrochen ist.
Erweitern wir also das Diagramm von 0 Schritten auf 5000 Schritte.
plt.figure()
plt.style.use('ggplot')
plt.figure(figsize=(15,5))
plt.xlabel('time')
plt.ylabel('ECG\'s value')
plt.plot(np.arange(5000), std_ecg[:5000], color='b')
plt.ylim(-3, 3)
x = np.arange(4200,4400)
y1 = [-3]*len(x)
y2 = [3]*len(x)
plt.fill_between(x, y1, y2, facecolor='g', alpha=.3)
plt.legend()
plt.show()
Es ist zu sehen, dass die Periodizität an dem grün gestrichenen Teil gebrochen ist.
Die spezifischen Ziele dieser Zeit sind wie folgt.
Um nur mit normalen Daten zu lernen, werden Daten nach 5000 Schritten herausgenommen.
normal_cycle = std_ecg[5000:]
Bereiten Sie die folgenden Generatoren vor, um die vorhandenen Daten in Trainingsdaten, Verifizierungsdaten und Testdaten zu unterteilen.
Hier werden die folgenden 3 Daten aus den 10 Daten vorhergesagt.
Hinweis) Verzögerung und Schritt werden diesmal nicht verwendet. Dies bedeutet "Daten nach Verzögerung vorhersagen" bzw. "Daten bei jedem Schritt verwenden".
def generator(data, lookback, delay, pred_length, min_index, max_index, shuffle=False,
batch_size=100, step=1):
if max_index is None:
max_index = len(data) - delay - pred_length - 1
i = min_index + lookback
while 1:
if shuffle:
rows = np.random.randint(min_index + lookback, max_index,
size=batch_size)
else:
if i + batch_size >= max_index:
i = min_index + lookback
rows = np.arange(i, min(i + batch_size, max_index))
i += len(rows)
samples = np.zeros((len(rows),
lookback//step,
data.shape[-1]))
targets = np.zeros((len(rows), pred_length))
for j, row in enumerate(rows):
indices = range(rows[j] - lookback, rows[j], step)
samples[j] = data[indices]
targets[j] = data[rows[j] + delay : rows[j] + delay + pred_length].flatten()
yield samples, targets
lookback = 10
pred_length = 3
step = 1
delay = 1
batch_size = 100
#Trainingsgenerator
train_gen = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=0,
max_index=20000,
shuffle=True,
step=step,
batch_size=batch_size)
val_gen = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=20001,
max_index=30000,
step=step,
batch_size=batch_size)
#Val, um den gesamten Validierungsdatensatz zu untersuchen_Anzahl der aus gen extrahierten Zeitinkremente
val_steps = (30001 - 20001 -lookback) // batch_size
#Andere sind Testdaten
Hier wird ein Modell verwendet, in dem zwei LSTMs gestapelt sind.
LSTM ist ein neuronales Netz, das für Zeitreihendaten geeignet ist.
Es werden nur die letzten 5 Schritte veröffentlicht.
import keras
from keras.models import Sequential
from keras import layers
from keras.optimizers import RMSprop
model = Sequential()
model.add(layers.LSTM(35, return_sequences = True, input_shape=(None,normal_cycle.shape[-1])))
model.add(layers.LSTM(35))
model.add(layers.Dense(pred_length))
model.compile(optimizer=RMSprop(), loss="mse")
history = model.fit_generator(train_gen,
steps_per_epoch=200,
epochs=60,
validation_data=val_gen,
validation_steps=val_steps)
Epoch 56/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0037 - val_loss: 0.0043
Epoch 57/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0039 - val_loss: 0.0045
Epoch 58/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0036 - val_loss: 0.0047
Epoch 59/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0036 - val_loss: 0.0043
Epoch 60/60
200/200 [==============================] - 3s 17ms/step - loss: 0.0036 - val_loss: 0.0038
Der Übergang des Verlustwerts in den Trainingsdaten und den Verifizierungsdaten ist unten gezeigt.
loss = history.history["loss"]
val_loss = history.history["val_loss"]
epochs = range(len(loss))
plt.figure(figsize=(10,5))
plt.plot(epochs, loss, "b", label="Training loss")
plt.plot(epochs, val_loss, "r", label="Validatioin loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.title("Training and validation loss")
plt.legend(fontsize=20)
plt.show()
Bereiten Sie den vorhergesagten Wert und den Zielwert in den Testdaten vor.
test_gen_pred = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=30001,
max_index=None,
step=step,
batch_size=batch_size)
test_steps = (len(normal_cycle) - 30001 - lookback) // batch_size
test_pred = model.predict_generator(test_gen_pred, steps=test_steps)
test_gen_target = generator(normal_cycle,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=30001,
max_index=None,
step=step,
batch_size=batch_size)
test_target = np.zeros((test_steps * batch_size , pred_length))
for i in range(test_steps):
test_target[i*batch_size:(i+1)*batch_size] = next(test_gen_target)[1]
Die mse wird unten ausgewertet. Es kann bestätigt werden, dass das Modell geeignet ist.
from sklearn.metrics import mean_squared_error
print(mean_squared_error(test_pred, test_target))
0.0032039127006786993
Überprüfen Sie für alle Fälle auch die Grafik.
plt.figure(figsize=(15,5))
plt.plot(range(len(test_pred[0:2000,0])), test_pred[0:2000,0], "r", label="Prediction")
plt.plot(range(len(test_target[0:2000,0])), test_target[0:2000,0], "b", label="Target")
plt.xlabel("time")
plt.ylabel('ECG\'s value')
plt.legend(fontsize=10)
plt.show()
plt.figure(figsize=(15,5))
plt.plot(range(len(test_pred[:,0])), test_pred[:,0], "r", label="Prediction")
plt.plot(range(len(test_target[:,0])), test_target[:,0], "b", label="Target")
plt.xlabel("time")
plt.ylabel('ECG\'s value')
plt.legend(fontsize=10)
plt.show()
Angenommen, der Fehler $ e $ zwischen dem vorhergesagten Wert und dem Zielwert folgt einer Normalverteilung. Da wir jedoch drei Daten vorhergesagt haben, ist $ e $ ein dreidimensionaler Vektor, und wir müssen eine dreidimensionale Normalverteilung berücksichtigen.
\begin{align*}
p(e) = N(e| \mu, \Sigma)
\end{align*}
Dabei ist $ \ mu $ der mittlere Vektor und $ \ Sigma $ die verteilte, gemeinsam verteilte Matrix. Im Allgemeinen wird eine mehrdimensionale Normalverteilung durch den mittleren Vektor und die Varianz-Kovarianz-Matrix bestimmt.
Die Parameterschätzung wird aus den abgetasteten Daten durchgeführt. Derzeit kann der Fehler in den Testdaten für 10.000 Schritte erfasst werden.
\begin{align*}
\{ e_1, e_2, ..., e_{10000} \}
\end{align*}
Außerdem ist jedes Element
\begin{align*}
e_i = (e_i ^{(1)}, e_i ^{(2)}, e_i ^{(3)} )
\end{align*}
Ist.
Hier wird die wahrscheinlichste Methode als Methode zur Schätzung der Parameter angewendet. Die wahrscheinlichsten Schätzungen der Normalverteilung sind wie folgt.
\begin{align*}
& \hat{\mu} = \frac{1}{N}\sum_{n=1}^{N}e_n \\
& \hat{\Sigma} = \frac{1}{N}\sum_{n=1}^{N}(e_n-\hat{\mu})(e_n-\hat{\mu})^{\top}
\end{align*}
Es ist zu beachten, dass die Schätzung der Varianz-Co-Verteilungsmatrix durch das wahrscheinlichste Verfahren eher eine Probendispersions-Co-Verteilungsmatrix als eine invariante Varianz-Co-Verteilungsmatrix ist.
Mit diesen Daten wird jeder geschätzte Betrag berechnet.
error = test_pred - test_target
mean = np.mean(error, axis=0)
print(mean)
cov = np.cov(error, rowvar=False, bias=True)
print(cov)
[0.00562191 0.00312449 0.00928306]
[[0.00149367 0.0016201 0.0013881 ]
[0.0016201 0.00302257 0.00310156]
[0.0013881 0.00310156 0.00496796]]
Der Grund, warum rowbar = False ist, besteht darin, die Richtung der Summe vertikal zu machen. Außerdem bedeutet Bias = True, die Kovarianzmatrix der Stichprobe zu finden. Standardmäßig ist Bias = False. In diesem Fall kann die invariante Verteilungs-Co-Verteilungsmatrix erhalten werden.
Die Maharanobis-Entfernung wird unten unter Verwendung der vorherigen Schätzungen definiert.
\begin{align*}
a(e) = (e-\hat{\mu})^\top \hat{\Sigma}^{-1}(e-\hat{\mu})
\end{align*}
Es ist schwer zu verstehen, ob es mehrdimensional ist, aber wenn es eindimensional ist, wird es wie folgt sein. $ {\ Hat {\ sigma} ^ 2} $ ist jedoch die Stichprobenverteilung.
\begin{align*}
a(e) = \frac{(e-\hat{\mu})^2}{\hat{\sigma}^2}
\end{align*}
Mit anderen Worten, $ a (e) $ gibt an, wie weit die Probendaten vom Mittelwert entfernt sind, und in der Normalverteilung ist, wenn dieser Wert groß ist, ein Ereignis, das wahrscheinlich nicht auftritt, mit anderen Worten, ein abnormales Ereignis ist aufgetreten. in der Lage sein. Dadurch kann ein abnormaler Wert erkannt werden.
def Mahalanobis_dist(x, mean, cov):
d = np.dot(x-mean, np.linalg.inv(cov))
d = np.dot(d, (x-mean).T)
return d
detection_gen_pred = generator(std_ecg,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=0,
max_index=5000,
step=step,
batch_size=batch_size)
detection_steps = (5000 -lookback) // batch_size
detection_pred = model.predict_generator(detection_gen_pred, steps=detection_steps)
detection_gen_target = generator(std_ecg,
lookback=lookback,
pred_length=pred_length,
delay=delay,
min_index=0,
max_index=5000,
step=step,
batch_size=batch_size)
detection_target = np.zeros((detection_steps * batch_size , pred_length))
for i in range(detection_steps):
detection_target[i*batch_size:(i+1)*batch_size] = next(detection_gen_target)[1]
Bereiten Sie einen Fehlervektor vor und ermitteln Sie die Maharanobis-Entfernung.
error_detection = detection_pred - detection_target
m_dist = []
for e in error_detection:
m_dist.append(Mahalanobis_dist(e, mean, cov))
Das Ergebnis ist wie folgt.
fig, axes = plt.subplots(nrows=2, figsize=(15,10))
axes[0].plot(std_ecg[:5000],color='b',label='original data')
axes[0].set_xlabel('time')
axes[0].set_ylabel('ECG\'s value' )
axes[0].set_xlim(-250, 5250)
axes[0].set_ylim(-3, 3)
x = np.arange(4200,4400)
y1 = [-3]*len(x)
y2 = [3]*len(x)
axes[0].fill_between(x, y1, y2, facecolor='g', alpha=.3)
axes[1].plot(m_dist, color='r',label='Mahalanobis Distance')
axes[1].set_xlabel('time')
axes[1].set_ylabel('Mahalanobis Distance')
axes[1].set_xlim(-250, 5250)
axes[1].set_ylim(0, 1000)
y1 = [0]*len(x)
y2 = [1000]*len(x)
axes[1].fill_between(x, y1, y2, facecolor='g', alpha=.3)
plt.legend(fontsize=15)
plt.show()
Es ist zu sehen, dass der Maharanobis-Abstand in den Grünflächen zunimmt. Mit anderen Worten könnte die Abnormalität (sinnlich) erkannt werden.
Oben wird der abnormale Wert durch qualitative Beurteilung erfasst, es ist jedoch erforderlich, einen bestimmten Schwellenwert einzustellen, um den abnormalen Wert automatisch zu erfassen.
Die folgenden Tatsachen sind bezüglich der Maharanobis-Entfernung bekannt.
Weitere Informationen finden Sie in Referenz [2].
Glauben Sie an diese Tatsache und setzen Sie damit den Schwellenwert. In diesem Fall ist $ M = 3 $, $ N = 10000 $, daher kann man es sich sicher als $ N \ gg M $ vorstellen.
Im Folgenden wird angenommen, dass $ a (e) $ einer Chi-Quadrat-Verteilung mit 3 Freiheitsgraden folgt. Überprüfen Sie zunächst die Chi-Quadrat-Verteilung mit 3 Freiheitsgraden.
from scipy import stats
x = np.linspace(0, 50, 2000)
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(x, stats.chi2.pdf(x, 3), "b", label="chi2, 3")
plt.legend(fontsize=15)
plt.show()
Dann wird der obere 1% -Punkt berechnet.
a_99 = stats.chi2.ppf(0.99, 3)
print(a_99)
11.344866730144373
Dies wird als Schwellenwert übernommen, und wenn dieser Wert überschritten wird, wird er als abnormaler Wert beurteilt.
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(m_dist, color='r',label='Mahalanobis Distance')
plt.xlabel('time')
plt.ylabel('Mahalanobis Distance')
plt.xlim(-250, 5250)
plt.ylim(0, 300)
plt.plot(np.linspace(-250, 5250), [a_99] * len(np.linspace(-250, 5250)), 'b')
plt.legend(fontsize=15)
plt.show()
Es ist irgendwie subtil. Es klingt so, ist aber unflexibel.
Also habe ich beschlossen, eine andere Methode auszuprobieren. Die nächste Methode besteht darin, den optimalen Schwellenwert gemäß den Daten zu finden.
Es wird davon ausgegangen, dass die gespeicherten Daten als normal oder abnormal gekennzeichnet werden können oder dass dies bereits geschehen ist.
Der folgende Inhalt basiert auf Referenz [3].
Zur Vorbereitung werden wir mit diesen Daten beschriften. Hier werden Daten von 0 bis 5000 als Ziel festgelegt und 4200 bis 4400 als abnormaler Wert festgelegt.
m_dist_th = np.zeros(4910)
m_dist_th[10:4910] = m_dist
TF_label = np.zeros(4910)
TF_label[4200:4400] = 1
Durch Einstellen des Schwellenwerts werden die folgenden Werte erhalten. Die rechte Seite zeigt jedoch die Anzahl der Daten, die die Bedingungen erfüllen.
\begin{align*}
& TP = (\, Label:positive, \quad Prediction:positive \, )\\
& FP = (\, Label:negative, \quad Prediction:positive \, )\\
& FN = (\, Label:positive, \quad Prediction:negative \, )\\
& TN = (\, Label:negative, \quad Prediction:negative \, )
\end{align*}
Darüber hinaus werden die folgenden Indikatoren eingeführt.
\begin{align*}
& precision = \frac{TP}{TP + FP} \\
& recall = \frac{TP}{TP + FN} \\
& F_{\beta} = (1+\beta^2) \frac{precision \cdot recal}{(\beta^2 \cdot precision) + recall}
\end{align*}
Finden Sie den Schwellenwert, der $ F_ {\ beta} $ maximiert. Definieren Sie zunächst eine Funktion, die $ F_ {\ beta} $ zurückgibt.
def f_score(th, TF_label, m_dist_th, beta):
TF_pred = np.where(m_dist_th > th, 1, 0)
TF_pred = 2 * TF_pred
PN = TF_label + TF_pred
TN = np.count_nonzero(PN == 0)
FN = np.count_nonzero(PN == 1)
FP = np.count_nonzero(PN == 2)
TP = np.count_nonzero(PN == 3)
precision = TP/(TP + FP)
recall = TP/(TP+FN)
return (1+beta**2)*precision*recall/(beta**2*precision + recall)
Im Folgenden ist $ \ beta = 0,1 $. Abbildung von $ F_ {0.1} $ für die Schwellenwerte 0-1000.
th_line = np.linspace(0, 1000, 5000)
dx = 0.2
f_score_graph = []
for i in range(5000):
f_score_graph.append(f_score(dx * i, TF_label, m_dist_th, 0.1))
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(th_line, f_score_graph, color='g', label = "f_score")
plt.legend(fontsize=15)
plt.show()
Finden Sie den Schwellenwert, bei dem $ F_ {0.1} $ das Maximum ist.
from scipy import optimize
def minus_f_score(th, TF_label, m_dist_th, beta):
return -f_score(th, TF_label, m_dist_th, beta)
ave_m_dist_th = np.average(m_dist_th)
opt_th = optimize.fmin_powell(minus_f_score, ave_m_dist_th, (TF_label, m_dist_th, 0.1))
print(opt_th)
Optimization terminated successfully.
Current function value: -0.875333
Iterations: 2
Function evaluations: 86
309.19475611029304
plt.figure(figsize=(15,5))
plt.style.use('ggplot')
plt.plot(m_dist_th, color='r',label='Mahalanobis Distance')
plt.xlabel('time')
plt.ylabel('Mahalanobis Distance')
plt.xlim(-250, 5250)
plt.ylim(0, 1000)
plt.plot(np.linspace(-250, 5250), [opt_th] * len(np.linspace(-250, 5250)), 'b')
plt.legend(fontsize=15)
plt.show()
Gutes Gefühl.
[1] ReNom-Tutorial, Erkennung von Abnormalitäten von Zeitreihendaten durch LSTM, https://www.renom.jp/ja/notebooks/tutorial/time_series/lstm-anomalydetection/notebook.html.
[2] Tsuyoshi Ide, Einführung in die Erkennung von Anomalien durch maschinelles Lernen - Praktischer Leitfaden von R-, Corona, 2015.
[3] P.Malhotra, L.Vig, G.Shroff and P.Agarwal, Long short term memory networks for anomaly detection in time series, Proceedings. Presses universitaires de Louvain, 2015.
Recommended Posts