Le réseau de neurones convolutifs est généralement utilisé dans le traitement d'images, mais cette fois, je l'ai essayé avec un vecteur unidimensionnel comme on le voit dans les données des capteurs. Le point est le même 3D (RVB, X, Y) que l'image en convertissant la structure de données à l'aide de remodeler. Je pense que la convolution est efficace pour la détection d'anomalies car elle permet d'extraire des fonctionnalités même s'il y a peu de paramètres d'apprentissage.
Créez une onde Sin pour un cycle en utilisant numpy, Ajouter un pseudo bruit avec np.random.rand () Créez 100 éléments avec une certaine variation. 99 ont été utilisés pour l'apprentissage et le 1 restant a été utilisé pour la vérification.
data=[]
for i in range(100):
data.append([np.sin(np.pi * n /50)*(1+np.random.rand())for n in range(100)])
Créez un modèle d'entraînement à l'aide de la Convolution 2D de Chainer. La structure est telle que les données alambiquées sont restaurées aux données d'entrée d'origine dans la dernière couche. Cela effectuera l'extraction des fonctionnalités en tant qu'encodeur automatique.
~~ La fonction d'activation est réglée sur Tanh car lorsque ReLU est utilisé, les données au milieu de Convlution sont affichées. Je pensais que cela aurait l'air mal parce qu'il n'y avait pas de côté négatif lors de la visualisation. ~~ (Addition) Les données de visualisation sont retirées sans passer par la fonction d'activation. Je pensais que c'était plus correct.
Même avec ReLU, les données de l'onde Sin n'ont pas affecté les résultats de l'entraînement.
class MyChain(chainer.Chain):
def __init__(self,n_out):
super(MyChain, self).__init__()
with self.init_scope():
self.l1 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l2 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l3 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l4 = L.Linear(None, n_out)
def __Call__(self,x,y):
return F.mean_squared_error(self.fwd(x),y)
def fwd(self, x):
h1 = F.tanh(F.max_pooling_2d(self.l1(x),2))
h2 = F.tanh(F.max_pooling_2d(self.l2(h1),2))
h3 = F.tanh(F.max_pooling_2d(self.l3(h2),2))
h3 = h3.reshape(h3.shape[0],-1)
return self.l4(h3)
Lors de l'utilisation de CNN, les données vectorielles unidimensionnelles ne peuvent pas être lues. Par conséquent, les données d'entraînement sont converties à l'aide de Reshape. De plus, celles qui deviennent les données de l'enseignant sont les données vectorielles originales.
TrainData = np.array(data,dtype=np.float32).reshape(100,1,1,100)
x=chainer.Variable(TrainData[:99])
for epoch in range(201):
model.zerograds()
loss=model(x,x.reshape(99,100))
loss.backward()
optimizer.update()
Tout d'abord, le résultat de la restauration à partir de la forme d'onde d'entrée. Il peut être restauré normalement.
Pour référence, jetons un coup d'œil à la forme d'onde du processus de pliage. J'essaye de faire correspondre la taille des données d'une manière pseudo.
Eh bien, honnêtement, je ne comprends pas vraiment. On peut dire que la couche 3 est une forme d'onde avec des caractéristiques réduites. Quelles sont les caractéristiques de l'onde Sin elle-même? Capturez-vous la forme d'une montagne? J'ai essayé d'apprendre plusieurs fois, mais à chaque fois la forme d'onde est différente. Je pense que c'est intéressant.
J'ai vu la détection d'anomalies à l'aide d'AutoEncoder dans le passé, C'est une méthode pour exprimer le degré d'anomalie en utilisant la différence entre l'entrée et la sortie restaurée. De même, j'ai créé des données anormales et les ai vérifiées.
Le premier est le déphasage J'ai essayé de décaler la valeur d'entrée de 5 points (cycle 5/100).
La forme d'onde de prédiction est proche de la phase d'origine et la forme de la ligne est irrégulière. Il semble que cela puisse être facilement détecté comme une anomalie.
Ensuite, quand un point comme une pointe fait saillie
C'est aussi irrégulier, n'est-ce pas? S'il est dentelé comme celui-ci, il peut être utilisé pour la détection d'anomalies à partir de fonctionnalités telles que des différences.
Ceci est mon premier message posté. Grâce à ce site, je lis vos articles et étudie. J'ai pensé que je donnerais en retour, alors j'ai décidé de l'afficher. J'espère que cet article vous sera utile.
Environnement Python 3.6.1 Anaconda 4.4.0 (64-bit) Chainer 2.0.2
import chainer
import chainer.functions as F
import chainer.links as L
import chainer.optimizers
import numpy as np
import matplotlib.pyplot as plt
class MyChain(chainer.Chain):
def __init__(self,n_out):
super(MyChain, self).__init__()
with self.init_scope():
self.l1 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l2 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l3 = L.Convolution2D(None,2, ksize=(1,4),stride=(1,1))
self.l4 = L.Linear(None, n_out)
def __call__(self,x,y):
return F.mean_squared_error(self.fwd(x),y)
def fwd(self, x):
h1 = F.tanh(F.max_pooling_2d(self.l1(x),2))
h2 = F.tanh(F.max_pooling_2d(self.l2(h1),2))
h3 = F.tanh(F.max_pooling_2d(self.l3(h2),2))
h3 = h3.reshape(h3.shape[0],-1)
return self.l4(h3)
def Layaer1(self, x):
return F.max_pooling_2d(self.l1(x),2)
def Layaer2(self, x):
h1=F.tanh(F.max_pooling_2d(self.l1(x),2))
return F.max_pooling_2d(self.l2(h1),2)
def Layaer3(self, x):
h1 = F.tanh(F.max_pooling_2d(self.l1(x),2))
h2 = F.tanh(F.max_pooling_2d(self.l2(h1),2))
return F.max_pooling_2d(self.l3(h2),2)
def CreatePlotData(arr,n1):
Buf1,Buf2=[],[]
for j in range(n1):
Buf1.append(0)
Buf2.append(0)
for i in range(arr.shape[1]):
for j in range(n1):
Buf1.append(arr[0][i].real)
Buf2.append(arr[1][i].real)
return np.array(Buf1,dtype=np.float32),np.array(Buf2,dtype=np.float32)
data=[]
for i in range(100):
data.append([np.sin(np.pi * n /50)*(1+np.random.rand())for n in range(100)])
model = MyChain(100)
optimizer = chainer.optimizers.Adam()
optimizer.setup(model)
TrainData = np.array(data,dtype=np.float32).reshape(100,1,1,100)
x=chainer.Variable(TrainData[:99])
ValidationData=TrainData[99].reshape(1,1,1,100)
PlotInput = ValidationData.reshape(100)
for epoch in range(201):
model.zerograds()
loss=model(x,x.reshape(99,100))
loss.backward()
optimizer.update()
if epoch%20==0:
Layer1Arr = np.array(model.Layaer1(ValidationData).data).reshape(2,-1)
Layer1Arr1,Layer1Arr2 = CreatePlotData(Layer1Arr,2)
Layer2Arr = np.array(model.Layaer2(ValidationData).data).reshape(2,-1)
Layer2Arr1,Layer2Arr2 = CreatePlotData(Layer2Arr,4)
Layer3Arr = np.array(model.Layaer3(ValidationData).data).reshape(2,-1)
Layer3Arr1,Layer3Arr2 = CreatePlotData(Layer3Arr,8)
plt.plot(PlotInput,label='Input')
plt.plot(Layer1Arr1,label='Lalyer1-1')
plt.plot(Layer1Arr2,label='Lalyer1-2')
plt.plot(Layer2Arr1,label='Lalyer2-1')
plt.plot(Layer2Arr2,label='Lalyer2-2')
plt.plot(Layer3Arr1,label='Lalyer3-1')
plt.plot(Layer3Arr2,label='Lalyer3-2')
plt.legend()
plt.savefig(str(epoch)+'-Epoch Convolution Graph.png')
plt.close()
predict = model.fwd(ValidationData)
predict=np.array(predict.data).reshape(100)
plt.plot(predict,label='Predict')
plt.plot(PlotInput,label='Input')
plt.legend()
plt.savefig(str(epoch)+'-Epoch Validation Graph.png')
plt.close()
ErrorPlot = [PlotInput[i+5]for i in range(len(PlotInput)-5)]
for i in range(5):
ErrorPlot.append(PlotInput[i])
predict = model.fwd(chainer.Variable(np.array(ErrorPlot,dtype=np.float32)).reshape(1,1,1,100))
predict=np.array(predict.data).reshape(100)
plt.plot(predict,label='Predict')
plt.plot(ErrorPlot,label='Error Input')
plt.legend()
plt.savefig('Shift Error Input Graph.png')
plt.close()
Rnd = np.random.randint(0,99)
ErrorPlot2=np.array(PlotInput)
ErrorPlot2[Rnd]=ErrorPlot2[Rnd]+3
predict = model.fwd(chainer.Variable(np.array(ErrorPlot2,dtype=np.float32)).reshape(1,1,1,100))
predict=np.array(predict.data).reshape(100)
plt.plot(predict,label='Predict')
plt.plot(ErrorPlot2,label='Error Input')
plt.legend()
plt.savefig('Spike Error Input Graph.png')
plt.close()
Recommended Posts