Poursuivant la précédente méthode de détection d'anomalies (MSPC), nous avons cette fois organisé la méthode de détection d'anomalies par apprentissage de structure clairsemée.
La méthode $ T ^ 2 $ de Hotelling et $ MSPC $, qui sont des méthodes de détection d'anomalies bien connues, sont des méthodes utilisées pour surveiller des données dont la valeur moyenne ne change pas. D'autre part, sur le site de fabrication, non seulement de telles données, mais aussi des données d'équipement et d'exploitation avec une structure de données dont les valeurs changent tout en conservant une certaine relation entre les variables. A cette époque, l'une des méthodes de détection d'anomalies se concentrant sur les relations entre les variables (corrélation, etc.) est la méthode de détection d'anomalies par apprentissage de structure clairsemée.
Aperçu de la méthode
Pour plus d'informations sur la méthode, voir ([Détection d'anomalies et détection de point de changement](https://www.amazon.co.jp/%E7%95%B0%E5%B8%B8%E6%A4%9C%E7%9F%A5] % E3% 81% A8% E5% A4% 89% E5% 8C% 96% E6% A4% 9C% E7% 9F% A5-% E6% A9% 9F% E6% A2% B0% E5% AD% A6% E7% BF% 92% E3% 83% 97% E3% 83% AD% E3% 83% 95% E3% 82% A7% E3% 83% 83% E3% 82% B7% E3% 83% A7% E3% 83% 8A% E3% 83% AB% E3% 82% B7% E3% 83% AA% E3% 83% BC% E3% 82% BA-% E4% BA% 95% E6% 89% 8B-% E5% Si vous lisez 89% 9B / dp / 4061529080)), vous approfondirez votre compréhension. J'ai également lu ce livre, je l'ai étudié et je l'ai mis en pratique dans mon travail réel.
Cette fois, j'ai utilisé un ensemble de données appelé Water Treatment Plant du référentiel UCI Machine Learning pour l'échantillon. Nous avons utilisé 100 données normales et les 279 autres comme données d'évaluation pour voir à quel point elles s'écartaient de l'état normal.
Le code python est ci-dessous.
#Importer les bibliothèques requises
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
%matplotlib inline
from scipy import stats
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors.kde import KernelDensity
from sklearn.covariance import GraphicalLassoCV
df = pd.read_csv('watertreatment_mod.csv', encoding='shift_jis', header=0, index_col=0)
df.head()
window_size = 10
cr = 0.99
Les données d'entrée ressemblent à celles ci-dessus.
Ensuite, nous diviserons les données d'entraînement et les données d'évaluation et les standardiserons.
#Point de partage
split_point = 100
#Divisé en données d'entraînement (train) et données d'évaluation (test)
train_df = df.iloc[:(split_point-1),]
test_df = df.iloc[split_point:,]
#Standardiser les données
sc = StandardScaler()
sc.fit(train_df)
train_df_std = sc.transform(test_df)
test_df_std = sc.transform(test_df)
Ensuite, nous estimons la covariance.
#Estimation de la covariance
n_samples, n_features = train_df_std.shape
emp_cov = np.dot(train_df_std.T, train_df_std) / n_samples
model = GraphicalLassoCV()
model.fit(train_df_std)
cov_ = model.covariance_
prec_ = model.precision_
Ensuite, définissez une fonction pour trouver la distance KL.
def Calc_KL(cov_, prec_, xtest):
"""Calculer la distance KL
Parameters
----------
cov_ : np.array
Matrice de covariance des données d'entraînement
prec_ : np.array
Matrice de précision des données d'entraînement
df : pd.DataFrame
base de données
Returns
-------
d_ab : pd.DataFrame
Distance KL
"""
n_samples, n_features = xtest.shape
d_abp=np.zeros(n_features)
d_abm=np.zeros(n_features)
d_ab=np.zeros(n_features)
model_test = GraphicalLassoCV()
try:
model_test.fit(xtest)
except FloatingPointError:
print("floating error")
return d_ab
cov__test = model_test.covariance_
prec__test = model_test.precision_
#Calculer l'ampleur de l'effondrement de la corrélation pour chaque variable
for i in range(n_features):
temp_prec_a = np.r_[prec_[i:n_features,:], prec_[0:i,:]]
temp_prec_a = np.c_[temp_prec_a[:,i:n_features], temp_prec_a[:,0:i]]
temp_prec_b = np.r_[prec__test[i:n_features,:], prec__test[0:i,:]]
temp_prec_b = np.c_[temp_prec_b[:,i:n_features], temp_prec_b[:,0:i]]
temp_cov_a = np.r_[cov_[i:n_features,:], cov_[0:i,:]]
temp_cov_a = np.c_[temp_cov_a[:,i:n_features], temp_cov_a[:,0:i]]
temp_cov_b = np.r_[cov__test[i:n_features,:], cov__test[0:i,:]]
temp_cov_b = np.c_[temp_cov_b[:,i:n_features], temp_cov_b[:,0:i]]
La = temp_prec_a[:-1, :-1]
la = temp_prec_a[:-1, -1]
lama = temp_prec_a[-1, -1]
Wa = temp_cov_a[:-1, :-1]
wa = temp_cov_a[:-1, -1]
sigmaa = temp_cov_a[-1, -1]
Lb = temp_prec_b[:-1, :-1]
lb = temp_prec_b[:-1, -1]
lamb = temp_prec_b[-1, -1]
Wb = temp_cov_b[:-1, :-1]
wb = temp_cov_b[:-1, -1]
sigmab = temp_cov_b[-1, -1]
d_abp[i] = np.dot(wa, la)+0.5*(np.dot(np.dot(lb, Wb), lb)-np.dot(np.dot(la, Wa), la))+0.5*(np.log(lama/lamb)+sigmaa-sigmab)
d_abm[i] = np.dot(wb, lb)+0.5*(np.dot(np.dot(la, Wa), la)-np.dot(np.dot(lb, Wb), lb))+0.5*(np.log(lamb/lama)+sigmab-sigmaa)
d_ab[i] = max(-d_abp[i], -d_abm[i])
return d_ab
Ensuite, préparez une fonction qui calcule la limite de gestion par estimation de la densité du noyau.
def cl_limit(x, cr=0.99):
"""Calculer les limites de contrôle
Parameters
----------
x : np.array
Distance KL
cr : float
Limites de contrôle des limites
Returns
-------
cl : float
Limite de gestion
"""
X = x.reshape(np.shape(x)[0],1)
bw= (np.max(X)-np.min(X))/100
kde = KernelDensity(kernel='gaussian', bandwidth=bw).fit(X)
X_plot = np.linspace(np.min(X), np.max(X), 1000)[:, np.newaxis]
log_dens = kde.score_samples(X_plot)
prob = np.exp(log_dens) / np.sum(np.exp(log_dens))
calprob = np.zeros(np.shape(prob)[0])
calprob[0] = prob[0]
for i in range(1,np.shape(prob)[0]):
calprob[i]=calprob[i-1]+prob[i]
cl = X_plot[np.min(np.where(calprob>cr))]
return cl
Faites une contre-validation des données d'entraînement pour calculer la limite de gestion.
K = 5
cv_data_size = np.int(np.shape(train_df_std)[0]/5)
n_train_samples = np.shape(train_df_std)[0]
counter = 0
for i in range(K):
cv_train_data=np.r_[train_df_std[0:i*cv_data_size,], train_df_std[(i+1)*cv_data_size:,]]
if i < K-1:
cv_test_data=train_df_std[i*cv_data_size:(i+1)*cv_data_size,]
else:
cv_test_data=train_df_std[i*cv_data_size:,]
model_cv = GraphicalLassoCV()
model_cv.fit(cv_train_data)
cov__cv = model.covariance_
prec__cv = model.precision_
for n in range(window_size, np.shape(cv_test_data)[0]):
count = i*cv_data_size + n
tempX = cv_test_data[n-window_size:n,:]
d_ab_temp = Calc_KL(cov__cv, prec__cv, tempX)
if 0 == counter:
d_ab = d_ab_temp.reshape(1,n_features)
TimeIndices2 = TimeIndices[count]
else:
d_ab = np.r_[d_ab,d_ab_temp.reshape(1,n_features)]
#Ici erreur
TimeIndices2 = np.vstack((TimeIndices2,TimeIndices[count]))
counter = counter + 1
split_point = np.shape(d_ab)[0]
d_ab_cv = d_ab[np.sum(d_ab,axis=1)!=0,:]
cl = np.zeros([n_features])
for i in range(n_features):
cl[i] = cl_limit(d_ab_cv[:,i],cr)
Enfin, la covariance des données d'évaluation est estimée et visualisée.
#Estimer la covariance pour les données d'évaluation
n_test_samples = np.shape(test_df_std)[0]
for n in range(window_size, n_test_samples):
tempX = test_df_std[n-window_size:n,:]
d_ab_temp = Calc_KL(cov_, prec_, tempX)
d_ab = np.r_[d_ab,d_ab_temp.reshape(1,n_features)]
TimeIndices2 = np.vstack((TimeIndices2,TimeIndices[n+n_train_samples]))
x2 = [0, np.shape(d_ab)[0]]
x3 = [split_point, split_point]
x = range(0, np.shape(TimeIndices2)[0],20)
NewTimeIndices = np.array(TimeIndices2[x])
for i in range(38):
plt.figure(figsize=(200, 3))
plt.subplot(1, 38, i+1)
plt.title('%s Contribution' % (i))
plt.plot(d_ab[:, i], marker="o")
plt.xlabel("Time")
plt.ylabel("Contribution")
plt.xticks(x,NewTimeIndices,rotation='vertical')
y2 = [cl[i],cl[i]]
plt.plot(x2,y2,ls="-", color = "r")
y3 = [0, np.nanmax(d_ab[:,i])]
plt.plot(x3,y3,ls="--", color = "b")
plt.show()
Merci d'avoir lu jusqu'au bout. Cette fois, nous avons implémenté la détection des anomalies par apprentissage de structure clairsemée.
Si vous avez une demande de correction, nous vous serions reconnaissants de bien vouloir nous contacter.
Recommended Posts