[PYTHON] Anomaly detection introduction and method summary

I learned about anomaly detection, so I will summarize it.

References

I have greatly referred to the following documents: [1] Lukas Ruff, et al.:A Unifying Review of Deep and Shallow Anomaly Detection [2] [Ide, Sugiyama: Introductory Machine Learning Abnormality Detection-Practical Guide by R](https://www.amazon.co.jp/%E5%85%A5%E9%96%80-%E6%A9 % 9F% E6% A2% B0% E5% AD% A6% E7% BF% 92% E3% 81% AB% E3% 82% 88% E3% 82% 8B% E7% 95% B0% E5% B8% B8 % E6% A4% 9C% E7% 9F% A5% E2% 80% 95R% E3% 81% AB% E3% 82% 88% E3% 82% 8B% E5% AE% 9F% E8% B7% B5% E3 % 82% AC% E3% 82% A4% E3% 83% 89-% E4% BA% 95% E6% 89% 8B-% E5% 89% 9B / dp / 4339024910 / ref = sr_1_2? __ mk_ja_JP =% E3% 82% AB% E3% 82% BF% E3% 82% AB% E3% 83% 8A & dchild = 1 & keywords =% E7% 95% B0% E5% B8% B8% E6% A4% 9C% E7% 9F% A5 & qid = 1605247003 & sr = 8-2) [3] [Ide, Sugiyama: Abnormality detection and change detection (machine learning professional series)](https://www.amazon.co.jp/%E7%95%B0%E5%B8%B8%E6%A4% 9C% E7% 9F% A5% E3% 81% A8% E5% A4% 89% E5% 8C% 96% E6% A4% 9C% E7% 9F% A5-% E6% A9% 9F% E6% A2% B0 % E5% AD% A6% E7% BF% 92% E3% 83% 97% E3% 83% AD% E3% 83% 95% E3% 82% A7% E3% 83% 83% E3% 82% B7% E3 % 83% A7% E3% 83% 8A% E3% 83% AB% E3% 82% B7% E3% 83% AA% E3% 83% BC% E3% 82% BA-% E4% BA% 95% E6% 89% 8B% E5% 89% 9B-ebook / dp / B018K6C99U / ref = sr_1_1? __mk_ja_JP =% E3% 82% AB% E3% 82% BF% E3% 82% AB% E3% 83% 8A & dchild = 1 & keywords =% E7% 95% B0% E5% B8% B8% E6% A4% 9C% E7% 9F% A5 & qid = 1605247003 & sr = 8-1)

** §1 Anomaly detection overview **

Anomaly detection application example

--Medical x anomaly detection Identification of diseased sites from medical images

image.png Source: Thomas, et al. Unsupervised Anomaly Detection with Generative Adversarial Networks to Guide Marker Discovery

--Finance x Anomaly detection Money laundering detection

image.png Source: Mark, et al. Anti-Money Laundering in Bitcoin: Experimenting with Graph Convolutional Networks for Financial Forensics

Abnormalities are diverse

There are a wide variety of anomalies, which can be broadly divided into static anomalies and dynamic anomalies (please point out any strange points because this is a self-made cut). Static anomaly </ b> means that there is no relationship such as order between each data, and the value of the data is a measure of the anomaly.

――For example, if the shade of the relevant part in the medical image is significantly different from that in the healthy case, it is judged to be abnormal (presence of disease). --Such static anomalies are especially called outliers.

Dynamic anomaly </ b> means that there is a relationship such as order between data like time series data, and the behavior of the data is a measure of anomaly.

――Abnormal behavior depends on individual problems, such as a drastic increase or decrease in the observed value in the same section or a significantly smaller change. --The major difference from static anomalies is that dynamic anomalies do not always have anomalous values. ――For example, the central part of the graph below looks abnormal, but the value itself is a possible value. In this case, you will find anomalies in the behavior of "frequency changes."

import numpy as np
import matplotlib.pyplot as plt

x1 = np.concatenate([np.linspace(-10, -3, 50), np.zeros(50), np.linspace(3, 9, 50)])
x2 = np.concatenate([np.zeros(50), np.linspace(-3, 3, 50), np.zeros(50)])
y = np.sin(x1) + np.sin(5*x2)
plt.plot(np.linspace(-10, 9, 150), y)

image.png

Expressed as a formula, each point $ x_j $ is $ p (x) = \ int p (x, t) {\ rm d} t $ in the series data $ \ {x_1, ..., x_t \} $ Even if it is normal in the sense of, the conditional probability of time $ p (x_j \ mid t) = \ int \ cdots \ int p (x_1, ... x_t \ mid t) {\ rm d} t_1 \ cdots { \ rm d} t_ {j-1} {\ rm d} t_ {j + 1} \ cdots {\ rm d} t_t $ can be abnormal.

However, even in this case, the central part may not be abnormal. For example, it is possible that the upper graph is the machine operation signal of the factory and the operation mode is switched only in the central part, so it is not abnormal.

As mentioned above, there are various aspects to anomalies, but reference [1] summarizes the types of anomalies in this way:

image.png出典:[1]

Point Anomaly in the upper left figure looks like single-shot noise data (outliers) due to observation errors, but Group Anomaly looks like data groups generated from a different distribution than outliers. It is assumed that both require different anomaly detection approaches. When you reach Semantic Anomaly in the lower right, it is difficult to deal with with classical anomaly detection methods, and a deep learning approach is required.

As you can see, there is no unified solution for "abnormality", and "abnormality" must be defined on a case-by-case basis.

Anomaly detection algorithms are basically unsupervised learning

Anomaly detection algorithms basically use unsupervised learning rather than supervised learning. There are two reasons:

  1. Mostly because there are few abnormal data
  2. Abnormal data is diverse and difficult to model comprehensively.

is. Hundreds to thousands of anomaly data should not be collected, especially in sites where anomalies are critical (medical or factory). In addition, reason 2 is as explained at the beginning. It is easier and more straightforward to guess "normal / not normal" than to guess "normal / abnormal". The latter does not mention anomalous data and can only be achieved by understanding the structure of normal data. Unsupervised learning is a framework for understanding the structure of data by estimating $ p (x) $ from the sample $ x_i (i = 1, ..., n) $, so anomaly detection is basically unsupervised. You will use learning. However, it must be possible to assume that the sample for unsupervised learning does not contain anomalous data or is overwhelmingly small.

As an aside, reference [2] states the following and is my favorite:

In Tolstoy's novel Anna Karenina, a passage that says, "Happy families are all alike; every unhappy family is unhappy in its own way." Is suggestive.

Anomaly detection algorithms are roughly divided into four types

Then, I will introduce the anomaly detection algorithm concretely.

Algorithms can be broadly classified into four approaches. Classification Approach, Probabilistic Approach, Reconstruction Approach, Distance Approach, respectively. In addition to these four categories, here is a table that classifies whether or not to use deep learning:

image.png出典:[1]

Each word such as PCA or OC-SVM is the name of the anomaly detection algorithm that has been proposed in the past. The details of each algorithm will be introduced later, so here we will only give an overview of each of the four approaches. (I myself do not fully understand the table below, so I would appreciate it if you could give me your opinion.)

approach Overview Pros Cons
Classification An approach that directly estimates the boundary that separates normal and abnormal data There is no need to assume the shape of the distribution. In addition, the method using SVM can be learned with a relatively small amount of data. Normal data may not work well if it has a complicated structure
Probabilistic An approach that estimates the probability distribution of normal data and regards events with a low probability of occurrence as abnormal Very high accuracy achieved if the data follows the assumed model Many assumptions such as distribution model are required. It is also difficult to handle higher dimensions.
Reconstruction Well to reconstruct normal data-The approach that data that trained logic cannot reconstruct well is abnormal Powerful methods such as AE and GAN, which have been actively studied in recent years, can be applied. Since the objective function is designed to improve dimensional compression and reconstruction accuracy, it is not necessarily a method specialized for anomaly detection.
Distance An approach that detects anomalies based on the distance between data. The idea that abnormal data is far from normal data. The idea is simple and the explanation for the output is high. Large amount of calculation (O(n^2)Such)

The above table was created with reference to the following materials.

  1. Hido: Jubatus Casual Talks # 2 Anomaly Detection Introduction
  2. PANG, et al. Deep Learning for Anomaly Detection: A Review

** §2 Algorithm introduction and experiment **

Summary of methods to introduce

  • Classification Approach
    • One-Class SVM
  • Probability Approach --Hotelling's T2 method
    • GMM
    • KDE
  • Reconstruction Approach
    • AE
  • Distance Approach
    • k-NN
    • LOF

Classification Approach One-Class SVM Suppose you are given the data $ D = \ {x_1, ..., x_N \} $. However, assume that $ D $ does not contain anomalous data, or if it does, it is an overwhelmingly small number.

In the previous chapter, we introduced that the Classification Approach estimates the boundary that separates normal and abnormal. ** One-Class SVM is an algorithm that calculates and borders a sphere that completely surrounds normal data. ** (A sphere is not always three-dimensional.) image.png

If you make the radius very large, you can make a sphere that completely contains $ D $, but if it is too large, False Negative will be 1 and I am not happy. Therefore,

--Make the volume of the sphere as small as possible -Allow data that extends beyond the sphere in $ D $

Calculate a general-purpose sphere by allowing two points. Specifically, the optimum sphere radius $ R $ and center $ b $ can be obtained by solving the following optimization problem:

python


\min_{R,b,u} \left\{R^2 + C\sum_{n=1}^{N}u_n \right\} \quad {\rm s.t.} \quad \|x_n - b\|^2 \le R^2 + u_n

However, $ u $ is a penalty term that extends beyond the sphere, and $ C $ is a parameter of how important the penalty is. Non-linear conversion of data by kernel functions is possible with SVM, but of course it is also available with One-Class SVM. Proper use of kernel functions will give you the right sphere.

Let's do a numerical experiment. The data was prepared as follows.

python


import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns

sns.set()
np.random.seed(123)

#Mean and variance
mean1 = np.array([2, 2])
mean2 = np.array([-2,-2])
cov = np.array([[1, 0], [0, 1]])

#Generation of normal data(Generated from two normal distributions)
norm1 = np.random.multivariate_normal(mean1, cov, size=50)
norm2 = np.random.multivariate_normal(mean2, cov, size=50)
norm = np.vstack([norm1, norm2])

#Generation of anomalous data(Generated from uniform distribution)
lower, upper = -10, 10
anom = (upper - lower)*np.random.rand(20,20) + lower

s=8
plt.scatter(norm[:,0], norm[:,1], s=s, color='green', label='normal')
plt.scatter(anom[:,0], anom[:,1], s=s, color='red', label='anomaly')
plt.legend()

image.png

python


from sklearn import svm
xx, yy = np.meshgrid(np.linspace(-10, 10, 500), np.linspace(-10, 10, 500))

# fit the model
clf = svm.OneClassSVM(nu=0.1, kernel="rbf", gamma=0.1)
clf.fit(norm)

def draw_boundary(norm, anom, clf):
    # plot the line, the points, and the nearest vectors to the plane
    Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)

    plt.title("One-Class SVM")
    plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), Z.max(), 7), cmap=plt.cm.PuBu)
    bd = plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='darkred')
    plt.contourf(xx, yy, Z, levels=[0, Z.max()], colors='palevioletred')

    nm = plt.scatter(norm[:, 0], norm[:, 1], c='green', s=s)
    am = plt.scatter(anom[:, 0], anom[:, 1], c='red', s=s)

    plt.axis('tight')
    plt.xlim((-10, 10))
    plt.ylim((-10, 10))
    plt.legend([bd.collections[0], nm, am],
               ["learned frontier", "normal", "anomaly"],
               loc="upper left",
               prop=matplotlib.font_manager.FontProperties(size=11))
    plt.show()

draw_boundary(norm, anom, clf)

image.png

By the way, if you make the parameter $ C $ smaller, that is, make the penalty that sticks out of the sphere lighter, the boundary changes like this. (In sklearn, it is represented by the parameter $ \ nu $, but it seems that it is in the denominator, so it seems that $ \ nu $ is made smaller and $ = C $ is made larger.)

python


# fit the model
clf = svm.OneClassSVM(nu=0.5, kernel="rbf", gamma=0.1)
clf.fit(norm)
draw_boundary(norm, anom, clf)

image.png

Other proposals include One-Class NN. In One-Class SVM, I was searching for a suitable nonlinear transformation by adjusting the kernel function, but the idea is to substitute & automate it with NN. The implementation seems to be annoying, so I haven't done it.

Probability Approach

Hotelling's T2 method

Hoteling's T2 method is a classical method and has been theoretically analyzed. In conclusion, ** By assuming a normal distribution, it becomes an algorithm ** that utilizes the property that the anomaly score follows the $ \ chi ^ 2 $ distribution.

For the sake of simplicity, we will use one-dimensional data here. As usual, given the data $ D $. Here each data $ x $ is normally distributed

python


p(x\mid \mu, \sigma) = \frac{1}{\sqrt{2\pi\sigma^2}}\exp\left\{-\frac{1}{2\sigma^2}(x-\mu)^2\right\}

It is assumed that it follows **. By fitting this normal distribution to the data by the maximum likelihood estimation method

python


\begin{align*}
    \hat{\mu} &= \frac{1}{N}\sum_{n=1}^{N}x_n \\
    \hat{\sigma}^2 &= \frac{1}{N}\sum_{n=1}^{N}(x_n - \hat{\mu})^2
\end{align*}

And each parameter is estimated. In other words, we estimated that the data $ D $ follows the normal distribution $ p (x \ mid \ hat {\ mu}, \ hat {\ sigma} ^ 2) $.

Now, let's introduce the Anomaly Score. Abnormality is a statistic that literally indicates the degree of anomaly, and negative log-likelihood is often used.

python


prob = np.linspace(0.01, 1, 100)
neg_log_likh = [-np.log(p) for p in prob]

plt.plot(prob, neg_log_likh)
plt.title('Negative Log Likelihood')
plt.ylabel('Anomaly Score')
plt.xlabel('Prob.')

image.png

As shown in the graph, the negative log-likelihood has a shape that takes a large value where the probability is small, and it seems to function as an anomaly. If you calculate the anomaly of the normal distribution you just estimated

python


-\log p(x\mid \hat{\mu}, \hat{\sigma}^2) = \frac{1}{2\hat{\sigma}^2}(x-\hat{\mu})^2 + \frac{1}{2}\log(2\pi\hat{\sigma}^2)

It will be. Since the anomaly is a quantity defined for the data $ x $, the anomaly $ a (x) $ is, excluding the terms unrelated to $ x $.

python


a(x) = \left( \frac{x - \hat{\mu}}{\hat{\sigma}} \right)^2

Is calculated. The numerator of anomaly indicates that "the farther the data is from the mean, the higher the anomaly". The denominator expresses "how much data is scattered?", And the feeling that scattered data is tolerated even if it is far from the average, and conversely, dense data does not allow any deviation. There is.

In fact, it is known that this anomaly $ a (x) $ follows a chi-square distribution $ \ chi ^ 2_1 $ with $ 1 $ degrees of freedom when the sample size $ N $ is large enough.

python


from scipy.stats import chi2
x = np.linspace(0, 8, 1000)
for df in [1,2,3,4,5]:
    plt.plot(x, chi2.pdf(x ,df = df), label='deg. of freedom:'+str(df))
plt.legend()
plt.ylim(0, 0.5)
plt.title('Chi-Square Dist.')

image.png

After that, anomaly detection is possible by determining the threshold $ a_ {th} $ at the significance level of $ 5 $%. Summary,

  1. Suppose the data follow a normal distribution.
  2. Estimate the parameters $ \ mu $, $ \ sigma $ of the normal distribution by the maximum likelihood estimation method.
  3. From the chi-square distribution, determine the point $ a_ {th} $ that separates normal and abnormal.
  4. If the new data $ x_ {new} $ is $ a (x_ {new})> a_ {th} $, it is considered abnormal.

Is the T2 method of hoteling. This time, I explained in one dimension, but the degree of anomaly follows the chi-square distribution in multiple dimensions as well. However, degrees of freedom = number of dimensions.

Let's experiment. The data was created as follows.

python


norm = np.random.normal(0, 1, 20) #Normal data
anom = np.array([-3, 3]) #Abnormal data
plt.scatter(norm, [0]*len(norm), color='green', label='normal')
plt.scatter(anom, [0]*len(anom), color='red', label='anomaly')

image.png

python


#Maximum likelihood estimation
N = len(norm)
mu_hat = sum(norm)/N
s_hat = sum([(x-mu_hat)**2 for x in norm])/N

#Use the quantile of the chi-square distribution as the threshold
a_th_5 = chi2.ppf(q=0.95, df=1)
a_th_30 = chi2.ppf(q=0.7, df=1)

#Calculate the degree of anomaly of each data
data = np.hstack([norm, anom])
anom_score = []
for d in data:
    score = ((d - mu_hat)/s_hat)**2
    anom_score.append(score)

#drawing
norm_pred_5 = [d for d,a in zip(data, anom_score) if a<=a_th_5]
anom_pred_5 = [d for d,a in zip(data, anom_score) if a>a_th_5]
norm_pred_30 = [d for d,a in zip(data, anom_score) if a<=a_th_30]
anom_pred_30 = [d for d,a in zip(data, anom_score) if a>a_th_30]

fig, axes = plt.subplots(1,3, figsize=(15,5))

axes[0].scatter(norm, [0]*len(norm), color='green', label='normal')
axes[0].scatter(anom, [0]*len(anom), color='red', label='anomaly')
axes[0].set_title('Truth')
axes[1].scatter(norm_pred_5, [0]*len(norm_pred_5), color='green', label='normal pred')
axes[1].scatter(anom_pred_5, [0]*len(anom_pred_5), color='red', label='anomaly pred')
axes[1].set_title('Threshold:5%')
axes[2].scatter(norm_pred_30, [0]*len(norm_pred_30), color='green', label='normal pred')
axes[2].scatter(anom_pred_30, [0]*len(anom_pred_30), color='red', label='anomaly pred')
axes[2].set_title('Threshold:30%')
plt.legend()

image.png

It can be seen that if the threshold of the degree of abnormality is reduced, the area considered to be abnormal will expand. The threshold should be determined according to the problem. For example, the threshold should be set small when overlooking abnormalities is critical, such as in the medical field, and may be set a little higher in the case of data cleansing.

GMM(Gaussian Mixture Model) The Hoteling T2 method assumed a normal distribution for the data, but in some cases this may be inappropriate. For example, if the data is concentrated in multiple places (multiple peaks instead of single peaks) as shown below, fitting with a normal distribution

python


lower, upper = -3, -1
data1 = (upper - lower)*np.random.rand(20) + lower
lower, upper = 1, 3
data2 = (upper - lower)*np.random.rand(30) + lower
data = np.hstack([data1, data2])

#Maximum likelihood estimation
N = len(data)
mu_hat = sum(data)/N
s_hat = sum([(x-mu_hat)**2 for x in data])/N

#plot
from scipy.stats import norm
X = np.linspace(-7, 7, 1000)
Y = norm.pdf(X, mu_hat, s_hat)

plt.scatter(data, [0]*len(data))
plt.plot(X, Y)

image.png

Like, it is estimated that the peak of the normal distribution comes to the place where no data is observed. In such cases, it is better to set a ** slightly more complicated distribution instead of the normal distribution. This time, as an example, we will introduce anomaly detection ** by GMM (Gaussian Mixture Model).

GMM is a distribution that expresses a multimodal type by adding several normal distributions. The number to add is often expressed in $ K $ and is called the number of components. The GMM formulas and visuals are as follows (one-dimensional).

python


p(x\mid \pi, \mu, \sigma) = \sum_{k=1}^{K}\pi_k N(x\mid \mu_k, \sigma_k)

However,

python


0 \le\pi_k\le 1,\quad \sum_{k=1}^{K}\pi_k=1,\quad N(x\mid\mu,\sigma)={\text normal distribution}

is.

python


#plot
from scipy.stats import norm
X = np.linspace(-7, 7, 1000)

Y1 = norm.pdf(X, 1, 1)
Y2 = norm.pdf(X, -3, 2)
Y = 0.5*Y1 + 0.5*Y2 

plt.plot(X, Y1, label='Component1')
plt.plot(X, Y2, label='Component2')
plt.plot(X, Y,  label='GMM')
plt.legend()

image.png

The mean ($\ mu ) / variance ( \ sigma $) and mixing ratio $ \ pi $ of each normal distribution are parameters learned from the data. However, the number of components $ K $ needs to be fixed. Parameters that are decided by the analyst side in this way are called hyperparameters.

Let's experiment with anomaly detection by GMM. Let's also observe how the shape of the estimated distribution changes depending on the number of components $ K $. The data uses the same as the One-Class SVM: image.png

python


def draw_GMM(norm, anom, clf, K):
    # plot the line, the points, and the nearest vectors to the plane
    xx, yy = np.meshgrid(np.linspace(-10, 10, 1000), np.linspace(-10, 10, 1000))
    Z = np.exp(clf.score_samples(np.c_[xx.ravel(), yy.ravel()]))
    Z = Z.reshape(xx.shape)

    plt.title("GMM:K="+str(K))
    plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), Z.max(), 7), cmap=plt.cm.PuBu)
    
    nm = plt.scatter(norm[:, 0], norm[:, 1], c='green', s=s)
    am = plt.scatter(anom[:, 0], anom[:, 1], c='red', s=s)

    plt.axis('tight')
    plt.xlim((-10, 10))
    plt.ylim((-10, 10))
    plt.legend([nm, am],
               ["normal", "anomaly"],
               loc="upper left",
               prop=matplotlib.font_manager.FontProperties(size=11))
    plt.show()

python


#Fitting by GMM
from sklearn.mixture import GaussianMixture

for K in [1,2,3,4]:
    clf = GaussianMixture(n_components=K, covariance_type='full')
    clf.fit(norm)
    draw_GMM(norm, anom, clf, K)

image.png image.png image.png image.png By increasing the number of components in this way, you can see that even the trends in the details of the data are captured. However, it is necessary to be careful because it does not capture the details = generalization performance is high. The number of components can be determined using information criteria such as WAIC (I tried using WAIC for the time being: GitHub)

KDE(Kernel Density Estimation) Until now, we have assumed a normal distribution or a mixed normal distribution and estimated the internal parameters through fitting to the data. Such an approach is called a parametric approach. But what if the parametric distribution doesn't work?

A non-parametric approach is effective in such cases, and ** KDE is a typical non-parametric estimation method **. KDE is a method that estimates the probability distribution by overlaying a kernel (usually Gaussian) on each data point **. The visualization is as follows.

python


from scipy.stats import norm
np.random.seed(123)
lower, upper = -1, 5
data = (upper - lower)*np.random.rand(3) + lower
for i, d in enumerate(data):
    X = np.linspace(lower,upper,500)
    Y = norm.pdf(X, loc=d, scale=1)
    knl = plt.plot(X,Y,color='green', label='kernel'+str(i+1))

# KDE
n = 3
h = 0.24
Y_pdf = []
for x in X:
    Y = 0
    for d in data:
        Y += norm.pdf((x-d)/h, loc=0, scale=1)
    Y /= n*h
    Y_pdf.append(Y)
kde = plt.plot(X,Y_pdf, label='KDE')
plt.scatter(data, [0]*len(data), marker='x')
plt.ylim([-0.05, 1])
plt.legend()

image.png

In this way, the probability distribution is estimated by superimposing the kernels. If you use a normal distribution for your kernel, you need to set the variance (called bandwidth). In subsequent experiments, let's also observe the change in the estimated distribution when this bandwidth is changed.

The details of KDE's theory and its strengths and weaknesses are [Suzuki. Data Analysis 10th "Nonparametric Density Estimate Method"](http://ibis.tu-tokyo.ac.jp/suzuki/lecture/2015/dataanalysis/L9. pdf) was easy to understand:

** Estimated distribution **

python


p(x) = \frac{1}{Nh}\sum_{n=1}^{N}K\left(\frac{x-X_n}{h}\right)

However, $ h> 0 $ is the bandwidth, $ K () $ is the kernel function, and $ X_n $ is each data point. ** Advantages / Disadvantages ** image.png Source: Suzuki. Data analysis 10th "Nonparametric density estimation method"

Let's experiment with anomaly detection by KDE. Data is as usual image.png Use the. The estimated distribution when the bandwidth is changed is as follows.

python


def draw_KDE(norm, anom, clf, K):
    # plot the line, the points, and the nearest vectors to the plane
    xx, yy = np.meshgrid(np.linspace(-10, 10, 1000), np.linspace(-10, 10, 1000))
    Z = np.exp(clf.score_samples(np.c_[xx.ravel(), yy.ravel()]))
    Z = Z.reshape(xx.shape)

    plt.title("KDE:BandWidth="+str(bw))
    plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), Z.max(), 7), cmap=plt.cm.PuBu)
    
    nm = plt.scatter(norm[:, 0], norm[:, 1], c='green', s=s)
    am = plt.scatter(anom[:, 0], anom[:, 1], c='red', s=s)

    plt.axis('tight')
    plt.xlim((-10, 10))
    plt.ylim((-10, 10))
    plt.legend([nm, am],
               ["normal", "anomaly"],
               loc="upper left",
               prop=matplotlib.font_manager.FontProperties(size=11))
    plt.show()

python


from sklearn.neighbors import KernelDensity
for bw in [0.2,0.5,1.0,2.0]:
    clf = KernelDensity(bandwidth=bw, kernel='gaussian')
    clf.fit(norm)
    draw_KDE(norm, anom, clf, bw)

image.png image.png image.png image.png As you can see, the estimated distribution becomes smoother as the bandwidth increases. Also, since there are other kernel functions besides the normal distribution, it is necessary to set both "bandwidth and kernel function" appropriately.

Reconstruction Approach Since the Reconstruction Approach is often used for image data, we will use image data here as well. Let the normal data be MNIST and the abnormal data be Fashion-MNIST.

python


#MNIST load
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

#Reading training data
train_mnist = datasets.MNIST(
    root="./data", 
    train=True, 
    transform=transforms.Compose([
               transforms.ToTensor(),
               transforms.Normalize((0.5,), (0.5,)) #section[-1,1]Mean, standard deviation within= 0.5, 0.Normalized to 5
               ]), 
    download=True
)

train_mnist_loader = torch.utils.data.DataLoader(
    train_mnist, batch_size=64, shuffle=True
)

#Read test data
test_mnist = datasets.MNIST(
    root="./data", 
    train=False, 
    transform=transforms.Compose([
               transforms.ToTensor(),
               transforms.Normalize((0.5,), (0.5,)) #section[-1,1]Mean, standard deviation within= 0.5, 0.Normalized to 5
               ]), 
    download=True
)

test_mnist_loader = torch.utils.data.DataLoader(
    test_mnist, batch_size=64, shuffle=True
)

python


# Fashion-MNIST load
#Reading training data
train_fashion = datasets.FashionMNIST(
    root="./data", 
    train=True, 
    transform=transforms.Compose([
               transforms.ToTensor(),
               transforms.Normalize((0.5,), (0.5,)) #section[-1,1]Mean, standard deviation within= 0.5, 0.Normalized to 5
               ]), 
    download=True
)

train_fashion_loader = torch.utils.data.DataLoader(
    train_fashion, batch_size=64, shuffle=True
)

#Read test data
test_fashion = datasets.FashionMNIST(
    root="./data", 
    train=False, 
    transform=transforms.Compose([
               transforms.ToTensor(),
               transforms.Normalize((0.5,), (0.5,)) #section[-1,1]Mean, standard deviation within= 0.5, 0.Normalized to 5
               ]), 
    download=True
)

test_fashion_loader = torch.utils.data.DataLoader(
    test_fashion, batch_size=64, shuffle=True
)

python


import matplotlib.pyplot as plt

#A function that visualizes images side by side at once
def imshow(image_set, nrows=2, ncols=10):
    plot_num = nrows * ncols
    fig, ax = plt.subplots(ncols=ncols, nrows=nrows, figsize=(15, 15*nrows/ncols))
    ax = ax.ravel()
    for i in range(plot_num):
        ax[i].imshow(image_set[i].reshape(28, 28), cmap='gray')
        ax[i].set_xticks([])
        ax[i].set_yticks([])

python


#MNIST sample visualization
imshow(train_mnist.data)

image.png

python


# Fashion-MNIST sample visualization
imshow(train_fashion.data)

image.png

AE(Auto Encoder) AE (Auto Encoder) is a type of NN (Neural Network), and is an NN that learns so that the input and output are equal. Since a simple identity function is boring, it usually sandwiches an intermediate layer with dimensions smaller than the input dimension.

image.png

What this is doing is "summarizing" the input data in the middle layer. Mathematically, it means that the input data is mapped to a low-dimensional space while retaining enough information to reconstruct it.

** Anomaly detection using AE is based on the idea that "AEs trained to reconstruct normal data may not be able to reconstruct abnormal data well." ** **

Now, let's experiment with anomaly detection by AE. After training AE with MNIST, we will try to detect the reconstruction accuracy of MNIST and Fashion-MNIST as the degree of anomaly.

python


import torch.nn as nn
import torch.nn.functional as F

#Definition of AE
class AE(nn.Module):
    def __init__(self):
        super(AE, self).__init__()
        self.fc1 = nn.Linear(28*28, 256)
        self.fc2 = nn.Linear(256, 28*28)
        
    def forward(self, x):
        x = torch.tanh(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        return x
ae = AE()

#Definition of loss function and optimization method
import torch.optim as optim

criterion = nn.MSELoss()
optimizer = optim.Adam(ae.parameters(), lr=0.0001)

python


#Learning loop
N_train = train_mnist.data.shape[0]
N_test = test_mnist.data.shape[0]

train_loss = []
test_loss = []
for epoch in range(10):  
    # Train step
    running_train_loss = 0.0
    for data in train_mnist_loader:
        inputs, _ = data
        inputs = inputs.view(-1, 28*28)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = ae(inputs)
        loss = criterion(outputs, inputs)
        loss.backward()
        optimizer.step()

        running_train_loss += loss.item()
    running_train_loss /= N_train
    train_loss.append(running_train_loss)
    
    # Test step
    with torch.no_grad():
        running_test_loss = 0
        for data in test_mnist_loader:
            inputs, _ = data
            inputs = inputs.view(-1, 28*28)
            outputs = ae(inputs)
            running_test_loss += criterion(outputs, inputs).item()
        running_test_loss /= N_test
        test_loss.append(running_test_loss)
    if (epoch+1)%1==0:    # print every 1 epoch
        print('epoch:{}, train_loss:{}, test_loss:{}'.format(epoch + 1, running_train_loss, running_test_loss))
        
plt.plot(train_loss, label='Train loss')
plt.plot(test_loss, label='Test loss')
plt.ylabel('Loss')
plt.xlabel('epoch')
plt.legend()

image.png

python


#Let's visualize the state of reconstruction
with torch.no_grad():
    for data in test_mnist_loader:
        inputs, _ = data
        inputs = inputs.view(-1, 28*28)
        outputs = ae(inputs)

n = 5 # num to viz
fig, axes = plt.subplots(ncols=n, nrows=2, figsize=(15, 15*2/n))
for i in range(n):
    axes[0, i].imshow(inputs[i].reshape(28, 28), cmap='gray')
    axes[1, i].imshow(outputs[i].reshape(28, 28), cmap='gray')
    axes[0, i].set_title('Input')
    axes[1, i].set_title('Reconstruct')
    axes[0, i].set_xticks([])
    axes[0, i].set_yticks([])
    axes[1, i].set_xticks([])
    axes[1, i].set_yticks([])

image.png

It seems that it has been reconstructed neatly.

Now, let's draw the distribution of the reconstruction accuracy (square error this time) of MNIST and Fashion-MNIST. If AE anomaly detection is successful, Fashion-MNIST should be less accurate.

python


#Calculation of reconstruction accuracy
mnist_loss = []
with torch.no_grad():
    for data in test_mnist_loader:
        inputs, _ = data
        for data_i in inputs:
            data_i = data_i.view(-1, 28*28)
            outputs = ae(data_i)
            loss = criterion(outputs, data_i)
            mnist_loss.append(loss.item())

fashion_loss = []
with torch.no_grad():
    for data in test_fashion_loader:
        inputs, _ = data
        for data_i in inputs:
            data_i = data_i.view(-1, 28*28)
            outputs = ae(data_i)
            loss = criterion(outputs, data_i)
            fashion_loss.append(loss.item())

python


plt.hist([mnist_loss, fashion_loss], bins=25, label=['MNIST', 'Fashion-MNIST'])
plt.xlabel('Loss')
plt.ylabel('data count')
plt.legend()
plt.show()

image.png

Certainly, the reconstruction of Fashion-MNIST does not seem to work well, so it seems that anomaly detection based on the reconstruction accuracy will be established.

Finally, I will pick up and draw the ones with low Loss (= the ones that are difficult to judge as abnormal) and the ones with high Loss (= the ones that can be easily judged as abnormal) among Fashion-MNIST.

python


lower_loss, upper_loss = 0.1, 0.8
data_low = [] #Stores data with low Loss
data_up = []  #Stores data with high Loss

with torch.no_grad():
    for data in test_fashion_loader:
        inputs, _ = data
        for data_i in inputs:
            data_i = data_i.view(-1, 28*28)
            outputs = ae(data_i)
            loss = criterion(outputs, data_i).item()
            if loss<lower_loss:
                data_low.append([data_i, outputs, loss])
            if loss>upper_loss:
                data_up.append([data_i, outputs, loss])

python


#Index for drawing randomly
np.random.seed(0)

n = 3 #Number of drawing data
lower_indices = np.random.choice(np.arange(len(data_low)), n)
upper_indices = np.random.choice(np.arange(len(data_up)), n)
indices = np.hstack([lower_indices, upper_indices])

fig, axes = plt.subplots(ncols=n*2, nrows=2, figsize=(15, 15*2/(n*2)))

#Drawing data with small Loss
for i in range(n):
    inputs = data_low[indices[i]][0]
    outputs = data_low[indices[i]][1]
    loss = data_low[indices[i]][2]
    axes[0, i].imshow(inputs.reshape(28, 28), cmap='gray')
    axes[1, i].imshow(outputs.reshape(28, 28), cmap='gray')
    axes[0, i].set_title('Input')
    axes[1, i].set_title('Small Loss={:.2f}'.format(loss))
    axes[0, i].set_xticks([])
    axes[0, i].set_yticks([])
    axes[1, i].set_xticks([])
    axes[1, i].set_yticks([])

#Drawing data with large loss
for i in range(n, 2*n):
    inputs = data_up[indices[i]][0]
    outputs = data_up[indices[i]][1]
    loss = data_up[indices[i]][2]
    axes[0, i].imshow(inputs.reshape(28, 28), cmap='gray')
    axes[1, i].imshow(outputs.reshape(28, 28), cmap='gray')
    axes[0, i].set_title('Input')
    axes[1, i].set_title('Large Loss={:.2f}'.format(loss))
    axes[0, i].set_xticks([])
    axes[0, i].set_yticks([])
    axes[1, i].set_xticks([])
    axes[1, i].set_yticks([])
plt.tight_layout()

image.png

Images with low Loss tend to have a large black area, while images with a high Loss tend to have a large white area. Since MNIST is basically an image with a large black area, if the white area is large, it will be "foreign" for AE and it seems that the reconstruction is not working well.

Distance Approach k-NN(k-Nearest Neighbor) k-NN is a method to measure the degree of anomaly based on the distance between data. In k-NN, consider a circle containing k neighboring data points. The figure below is for $ k = 5 $.

image.png

As shown in the figure, k-NN defines the radius of the circle as the degree of anomaly based on the idea that the circle drawn by the anomalous data should be larger than the normal data. However, the threshold of the radius that separates normal / abnormal from the number of neighboring points $ k $ must be tuned appropriately.

Let's do a numerical experiment. The threshold is the 90% quantile in the radius value distribution of normal data.

python


#Data creation
np.random.seed(123)

#Mean and variance
mean = np.array([2, 2])
cov = np.array([[3, 0], [0, 3]])

#Generation of normal data
norm = np.random.multivariate_normal(mean1, cov, size=50)

#Generation of anomalous data(Generated from uniform distribution)
lower, upper = -10, 10
anom = (upper - lower)*np.random.rand(20,20) + lower

s=8
plt.scatter(norm[:,0], norm[:,1], s=s, color='green', label='normal')
plt.scatter(anom[:,0], anom[:,1], s=s, color='red', label='anomaly')
plt.legend()

image.png

I didn't know how to complete the anomaly detection by k-NN with the library, so I made it myself. .. ..

python


#A function that calculates the radius of the k-nearest neighbor circle of all data
def kNN(data, k=5):
    """
    input: data:list, k:int
    output:Radius of the k-nearest neighbor circle of all data:list
    
The distance adopts the Euclidean distance.
    """
    output = []
    for d in data:
        distances = []
        for p in data:
            distances.append(np.linalg.norm(d - p))
        output.append(sorted(distances)[k])
    return output

#1 if greater than the threshold,A function that returns 0 if it is small
def kNN_scores(point, data, thres, K):
    distances = []
    for p in data:
        distances.append(np.linalg.norm(point - p))
    dist = sorted(distances)[K]
    if dist > thres:
        return 1
    else:
        return 0

python


def draw_KNN(norm, anom, thres, K, only_norm=False):
    # plot the line, the points, and the nearest vectors to the plane
    xx, yy = np.meshgrid(np.linspace(-10, 10, 100), np.linspace(-10, 10, 100))
    Z = [kNN_scores(point, norm, thres, K) for point in np.c_[xx.ravel(), yy.ravel()]]
    Z = np.array(Z).reshape(xx.shape)

    plt.title("KNN:K="+str(K))
    plt.contourf(xx, yy, Z)
    
    if not only_norm:
        nm = plt.scatter(norm[:, 0], norm[:, 1], c='green', s=s)
        am = plt.scatter(anom[:, 0], anom[:, 1], c='red', s=s)

        plt.axis('tight')
        plt.xlim((-10, 10))
        plt.ylim((-10, 10))
        plt.legend([nm, am],
                   ["normal", "anomaly"],
                   loc="upper left",
                   prop=matplotlib.font_manager.FontProperties(size=11))

    if only_norm:
        nm = plt.scatter(norm[:, 0], norm[:, 1], c='green', s=s)

        plt.axis('tight')
        plt.xlim((-10, 10))
        plt.ylim((-10, 10))
        plt.legend([nm],
                   ["normal"],
                   loc="upper left",
                   prop=matplotlib.font_manager.FontProperties(size=11))
        
    plt.show()

python


for k in [1,5,30]:
    anomaly_scores = kNN(norm, k=k)
    thres = pd.Series(anomaly_scores).quantile(0.90)
    draw_KNN(norm, anom, thres, k)

image.png image.png image.png

You can see that the normal / abnormal boundary becomes smoother as k increases.

Now, let's experiment with a slightly messy example. Suppose ** normal data is generated from two normal distributions with different densities ** as shown below.

python


np.random.seed(123)

#Mean and variance
mean1 = np.array([3, 3])
mean2 = np.array([-5,-5])
cov1 = np.array([[0.5, 0], [0, 0.5]])
cov2 = np.array([[3, 0], [0, 3]])

#Generation of normal data(Generated from two normal distributions)
norm1 = np.random.multivariate_normal(mean1, cov1, size=50)
norm2 = np.random.multivariate_normal(mean2, cov2, size=10)
norm = np.vstack([norm1, norm2])

s=8
plt.scatter(norm[:,0], norm[:,1], s=s, color='green', label='normal')
plt.legend()

image.png

python


for k in [1,5,30]:
    anomaly_scores = kNN(norm, k=k)
    thres = pd.Series(anomaly_scores).quantile(0.90)
    draw_KNN(norm, [], thres, k, only_norm=True)

image.png image.png image.png

Focusing especially on the case of k = 1, we can see that kNN is not working well when normal data originates from different clusters with different densities. ** You can imagine that it can be dealt with by adding the process of "important the deviation from the dense cluster and tolerate the deviation from the sparse cluster" **.

In fact, that is the method called LOF introduced in the next section.

LOF (Local Outlier Factor)

The case where k-NN introduced in the previous section does not work well is shown in the figure.

image.png

It will be. Of course, I would like to increase the degree of anomaly on the left side, but LOF introduces the concept of ** data density ** and realizes it. I will omit the mathematical details, but I am doing the following.

image.png

The degree of anomaly is defined by ** the ratio of the degree of squishyness of oneself to the degree of squishyness of nearby points **.

Now let's experiment with LOF.

python


def draw_LOF(norm, anom, K, only_norm=False):
    # plot the line, the points, and the nearest vectors to the plane
    xx, yy = np.meshgrid(np.linspace(-10, 10, 100), np.linspace(-10, 10, 100))
    Z = - model.predict(np.c_[xx.ravel(), yy.ravel()])
    Z = np.array(Z).reshape(xx.shape)

    plt.title("LOF:K="+str(K))
    plt.contourf(xx, yy, Z)
    
    if not only_norm:
        nm = plt.scatter(norm[:, 0], norm[:, 1], c='green', s=s)
        am = plt.scatter(anom[:, 0], anom[:, 1], c='red', s=s)

        plt.axis('tight')
        plt.xlim((-10, 10))
        plt.ylim((-10, 10))
        plt.legend([nm, am],
                   ["normal", "anomaly"],
                   loc="upper left",
                   prop=matplotlib.font_manager.FontProperties(size=11))

    if only_norm:
        nm = plt.scatter(norm[:, 0], norm[:, 1], c='green', s=s)

        plt.axis('tight')
        plt.xlim((-10, 10))
        plt.ylim((-10, 10))
        plt.legend([nm],
                   ["normal"],
                   loc="upper left",
                   prop=matplotlib.font_manager.FontProperties(size=11))
        
    plt.show()

python


from sklearn.neighbors import LocalOutlierFactor

for k in [1, 5, 30]:
    model = LocalOutlierFactor(n_neighbors=k, novelty=True)
    model.fit(norm)
    draw_LOF(norm, [], k, only_norm=True)

image.png image.png image.png

In this way, it seems that anomaly detection can be performed after considering the density of the cluster.

Evolving anomaly detection problem

As I study anomaly detection in the future, I would like to study the following topics:

--Abnormality detection of time series data --Abnormality detection of graph data --Can be used for money laundering detection

Recommended Posts