I implemented Normalizing Flow, which is one of the variational inference methods, so I will leave it here as a memorandum. We will start with Bayesian linear regression so that we can cover the entire flow, and then we will explain step by step why variational inference is required and how Normalizing Flow performs variational inference. Although it is a Normalizing Flow, it can also be used as a generative model in addition to variational inference. GAN and VAE are famous as generative models, but Normalizing Flow belongs to a different flow-based generative model. The paper on Normalizing Flow is here. The implemented code is posted on GitHub.
Let's start with the story of Bayesian linear regression. As a recap, in normal linear regression, the predicted value $ \ boldsymbol y $ is output for the observed data $ \ boldsymbol x $ as shown below. $ \ Boldsymbol \ theta $ is a parameter and corresponds to the slope of a line in the case of simple regression.
Finally, $ p (\ boldsymbol y ^ {(train)} \ mid \ boldsymbol X ^ {(train)})
Avoiding the exact calculation of marginal likelihood analytically, we try to approximate the posterior distribution $ p (\ boldsymbol z \ mid \ boldsymbol x) $ that we want to find directly by the probability distribution $ q (\ boldsymbol z) $. The idea is variational reasoning. (To match the notation in the paper, the parameter $ \ boldsymbol \ theta $ that has appeared so far is described as $ \ boldsymbol z $.)
As an example of variational inference, there is a method of decomposing the probability distribution by assuming the independence of the distribution for the approximate posterior distribution $ q (\ boldsymbol z)
\begin{eqnarray}
\ln p(\boldsymbol x)=\ln \int p(\boldsymbol x, \boldsymbol z)d\boldsymbol z\\
=\ln \int q(\boldsymbol z)\frac{p(\boldsymbol x, \boldsymbol z)}{q(\boldsymbol z)}d\boldsymbol z\\
\geq \int q(\boldsymbol z)\ln \frac{p(\boldsymbol x, \boldsymbol z)}{q(\boldsymbol z)}d\boldsymbol z \\
= \mathcal{L}(\boldsymbol{x})
\end{eqnarray}
Here is the transformation of the formula in the (3) stage, but it was calculated using Jensen's inequality. As you can see from this formula, the lower bound of $ \ ln p (\ boldsymbol x) $ is ELBO $ \ mathcal {L} (\ boldsymbol {x}) $. I will omit the details, but since maximizing the ELBO is equivalent to minimizing the KL divergence of the target posterior distribution and the approximate posterior distribution, this task can be reduced to ELBO maximization.
Returning to the story, suppose that the approximate posterior distribution $ q (\ boldsymbol z) $ can be obtained by solving the optimization problem. However, in the first place, there is a problem with the assumption of Eq. (5). By assuming independence as in Eq. (5), the correlation existing between variables cannot be considered, and the probability distribution becomes poorly expressive, making it impossible to express the true posterior distribution. .. Normalizing Flow is one of the variational reasoning methods that solves this problem of lack of expressiveness.
Normalizing Flow Normalizing Flow is a complex distribution $ q_k (\ boldsymbol z_k) by superimposing a non-linear transformation $ f $ on a random variable $ \ boldsymbol z $ that follows a simple probability distribution (Gaussian distribution, etc.) $ q (\ boldsymbol z) $. ) The idea is to get $. <img src="https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/302831/4cfcd80e-18ee-f050-df70-5f59a8e3e8fd.png ", width="100%"> Source: L.Weng. ["Flow-based Deep Generative Model" (https://lilianweng.github.io/lil-log/2018/10/13/flow-based-deep-generative-models.html)
Let's take a closer look.
Equations (7) and (8) are obtained by applying the nonlinear transformation $ f $ to the random variable $ \ boldsymbol z $ that follows the probability distribution $ q (\ boldsymbol z)
planar flow is the following function. $ \ boldsymbol u, \ boldsymbol w, and b $ are parameters, and their values are updated by learning. $ h $ is the activation function and uses tanh.
Next, we optimize the probability distribution obtained by Eq. (13) to approach the true posterior distribution. As mentioned earlier, variational inference optimizes by maximizing ELBO. We will manually calculate the ELBO formula for Normalizing Flow.
\mathcal{L}(\boldsymbol{x})=
\int q(\boldsymbol z)\ln \frac{p(\boldsymbol x, \boldsymbol z)}{q(\boldsymbol z)}d\boldsymbol z\\
=\mathbb{E}_{q(\boldsymbol z)}[\ln p(\boldsymbol x, \boldsymbol z) - \ln q(\boldsymbol z)]\\
=\mathbb{E}_{q_k(\boldsymbol z_k)}[\ln p(\boldsymbol x, \boldsymbol z_k) - \ln q(\boldsymbol z_k)]\\
\sim \sum_{l=1}^{L}[\ln p(\boldsymbol{x}, \boldsymbol{z_k^{(l)}})-\ln q(\boldsymbol{z_k^{(l)}})]
\tag{14}
The sum that appears at the end of the formula is the sum of the L sample random variables contained in the mini-batch (subscript $ l).
$ Means the $ l $ th sample).
When implementing, minimizing is more convenient than maximizing, so the following equation, which is ELBO multiplied by a minus, is minimized.
Python 3.7.5 TensorFlow 1.15.0
Implement by TensorFlow. As with the paper, the problem setting is the Gaussian distribution as the initial distribution, and the target distribution is reproduced by superimposing the nonlinear transformation by the planar flow. (The left is the Gaussian distribution, which is the initial distribution, and the center and right are the target distributions).
First, we will implement it from the planar flow. When calculating the loss function, $ \ ln q_k (\ boldsymbol z_k) $ is required, so it is calculated here.
normalizing_flow.py
class PlanarFlow:
def __init__(self, dim):
self.dim = dim
self.h = lambda x: tf.tanh(x)
self.h_prime = lambda x: 1 - tf.tanh(x)**2
self.w = tf.Variable(tf.random.truncated_normal(shape=(1, self.dim)))
self.b = tf.Variable(tf.zeros(shape=(1)))
self.u = tf.Variable(tf.random.truncated_normal(shape=(1, self.dim)))
def __call__(self, z, log_q):
z = z + self.u*self.h(tf.expand_dims(tf.reduce_sum(z*self.w, -1), -1) + self.b)
psi = self.h_prime(tf.expand_dims(tf.reduce_sum(z*self.w, -1), -1) + self.b)*self.w
det_jacob = tf.abs(1 + tf.reduce_sum(psi*self.u, -1))
log_q = log_q - tf.log(1e-7 + det_jacob)
return z, log_q
Next, a Normalizing Flow is constructed by stacking K planar flows.
normalizing_flow.py
class NormalizingFlow:
def __init__(self, K, dim):
self.K = K
self.dim = dim
self.planar_flow = [PlanarFlow(self.dim) for i in range(self.K)]
def __call__(self, z_0,log_q_0):
z, log_q = self.planar_flow[0](z_0,log_q_0)
for pf in self.planar_flow[1:]:
z, log_q = pf(z, log_q)
return z, log_q
Then, the loss function (15) is calculated as follows. Here, target_density calculates the target probability distribution. The optimization method is also decided here. Use Adam to perform optimizations to minimize the loss function. Also define a function to get the placeholder.
normalizing_flow.py
def calc_loss(z_k, log_q_k, target_density):
log_p = tf.log(target_density.calc_prob_tf(z_k)+1e-7)
loss = tf.reduce_mean(log_q_k - log_p, -1)
return loss
def get_train(loss):
return tf.train.AdamOptimizer().minimize(loss)
def get_placeholder():
z_0 = tf.placeholder(tf.float32, shape=[None, 2])
log_q_0 = tf.placeholder(tf.float32, shape=[None])
return z_0,log_q_0
Build a computational graph using the above classes and functions.
main.py
normalizing_flow = NormalizingFlow(K=16, dim=2)
z_0,log_q_0 = get_placeholder()
z_k, log_q_k = normalizing_flow(z_0,log_q_0)
loss = calc_loss(z_k, log_q_k, target_density)
train = get_train(loss)
Learning is performed using the stochastic gradient descent method. The mini-batch has 1000 samples and the number of learnings is 100,000.
main.py
with tf.Session() as sess:
invisible_axis = True
sess.run(tf.global_variables_initializer())
for iteration in range(100000+1):
z_0_batch = normal_distribution.sample(1000)
log_q_0_batch = np.log(normal_distribution.calc_prob(z_0_batch))
_, loss_value = sess.run([train, loss], {z_0:z_0_batch, log_q_0:log_q_0_batch})
The result is here. You can see that the complex distribution is sampled correctly, starting from the Gaussian distribution. <img src="https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/302831/4ab8c588-1d49-e110-c3c9-deb72f6e8eac.png ", width=33%> <img src="https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/302831/4d21dd7d-6aa1-fd70-57c0-e689f7525111.png ", width=33%><img src="https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/302831/4dc89cef-d8ac-a2b6-4897-4224d1de1fae.png ", width=33%>
Finally, a summary of all implementations is listed below. There are three types: main.py, normalizing_flow.py, and distribution.py. main.py is a file for building a computational graph and executing training, normalizing_flow.py is a file for defining a model and a loss function for constructing a computational graph, and finally distribution.py is a sampling of probability distribution. It is a file that has a function to calculate the probability. I will also post the file on GitHub.
main.py
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from distribution import *
from normalizing_flow import *
normal_distribution = NormalDistribution2D()
target_density = TargetDistribution1()
normalizing_flow = NormalizingFlow(K=16, dim=2)
z_0,log_q_0 = get_placeholder()
z_k, log_q_k = normalizing_flow(z_0,log_q_0)
loss = calc_loss(z_k, log_q_k, target_density)
train = get_train(loss)
with tf.Session() as sess:
invisible_axis = True
sess.run(tf.global_variables_initializer())
for iteration in range(100000+1):
z_0_batch = normal_distribution.sample(1000)
log_q_0_batch = np.log(normal_distribution.calc_prob(z_0_batch))
_, loss_value = sess.run([train, loss], {z_0:z_0_batch, log_q_0:log_q_0_batch})
if iteration % 100 == 0:
print('Iteration : {} Loss : {}'.format(iteration, loss_value))
if iteration % 10000 == 0:
z_k_value = sess.run(z_k, {z_0:z_0_batch, log_q_0:log_q_0_batch})
plt.figure(figsize=(6, 6))
plt.scatter(z_k_value[:, 0], z_k_value[:, 1], alpha=0.7)
if invisible_axis:
plt.tick_params(bottom=False,left=False,right=False,top=False)
plt.tick_params(labelbottom=False,labelleft=False,labelright=False,labeltop=False)
plt.show()
normalizing_flow.py
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
def get_placeholder():
z_0 = tf.placeholder(tf.float32, shape=[None, 2])
log_q_0 = tf.placeholder(tf.float32, shape=[None])
return z_0,log_q_0
def calc_loss(z_k, log_q_k, target_density):
log_p = tf.log(target_density.calc_prob_tf(z_k)+1e-7)
loss = tf.reduce_mean(log_q_k - log_p, -1)
return loss
def get_train(loss):
return tf.train.AdamOptimizer().minimize(loss)
class PlanarFlow:
def __init__(self, dim):
self.dim = dim
self.h = lambda x: tf.tanh(x)
self.h_prime = lambda x: 1 - tf.tanh(x)**2
self.w = tf.Variable(tf.random.truncated_normal(shape=(1, self.dim)))
self.b = tf.Variable(tf.zeros(shape=(1)))
self.u = tf.Variable(tf.random.truncated_normal(shape=(1, self.dim)))
def __call__(self, z, log_q):
z = z + self.u*self.h(tf.expand_dims(tf.reduce_sum(z*self.w, -1), -1) + self.b)
psi = self.h_prime(tf.expand_dims(tf.reduce_sum(z*self.w, -1), -1) + self.b)*self.w
det_jacob = tf.abs(1 + tf.reduce_sum(psi*self.u, -1))
log_q = log_q - tf.log(1e-7 + det_jacob)
return z, log_q
class NormalizingFlow:
def __init__(self, K, dim):
self.K = K
self.dim = dim
self.planar_flow = [PlanarFlow(self.dim) for i in range(self.K)]
def __call__(self, z_0,log_q_0):
z, log_q = self.planar_flow[0](z_0,log_q_0)
for pf in self.planar_flow[1:]:
z, log_q = pf(z, log_q)
return z, log_q
distribution.py
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
class Distribution:
def calc_prob(self, z):
p = np.zeros(z.shape[0])
return p
def plot(self, size=5):
side = np.linspace(-size, size, 1000)
z1, z2 = np.meshgrid(side, side)
shape = z1.shape
z1 = z1.ravel()
z2 = z2.ravel()
z = np.c_[z1, z2]
probability = self.calc_prob(z).reshape(shape)
plt.figure(figsize=(6, 6))
plt.imshow(probability)
plt.tick_params(bottom=False,left=False,right=False,top=False)
plt.tick_params(labelbottom=False,labelleft=False,labelright=False,labeltop=False)
plt.show()
class NormalDistribution2D(Distribution):
def sample(self, sample_num):
z = np.random.randn(sample_num, 2)
return z
def sample_tf(self, sample_num):
z = tf.random_normal([sample_num, 2])
return z
def calc_prob(self, z):
p = np.exp(-(z[:, 0]**2+z[:, 1]**2)/2)/(2*np.pi)
return p
def calc_prob_tf(self, z):
p = tf.exp(-(z[:, 0]**2+z[:, 1]**2)/2)/(2*np.pi)
return p
class TargetDistribution1(Distribution):
def calc_prob(self, z):
z1, z2 = z[:, 0], z[:, 1]
norm = np.sqrt(z1**2+z2**2)
exp1 = np.exp(-0.5*((z1-2)/0.6)**2)
exp2 = np.exp(-0.5*((z1+2)/0.6)**2)
p = 0.5*((norm - 2)/0.4)**2 - np.log(exp1 + exp2)
return np.exp(-p)
def calc_prob_tf(self, z):
z1, z2 = z[:, 0], z[:, 1]
norm = tf.sqrt(z1**2+z2**2)
exp1 = tf.exp(-0.5*((z1-2)/0.6)**2)
exp2 = tf.exp(-0.5*((z1+2)/0.6)**2)
p = 0.5*((norm - 2)/0.4)**2 - tf.log(exp1 + exp2)
return tf.exp(-p)
class TargetDistribution2(Distribution):
def calc_prob(self, z):
z1, z2 = z[:, 0], z[:, 1]
w1 = np.sin(0.5*np.pi*z1)
p = 0.5*((z2 - w1)/0.4)**2
return np.exp(-p)
def calc_prob_tf(self, z):
z1, z2 = z[:, 0], z[:, 1]
w1 = tf.sin(0.5*np.pi*z1)
p = 0.5*((z2 - w1)/0.4)**2
return tf.exp(-p)
[1] Ian.Goodfellow, et.al., "Deep Learning". [2] C.M. Bishop, "Pattern Recognition and Machine Learning". [3] C.M. Bishop, "Pattern Recognition and Machine Learning". [4] Atsushi Suyama, "Introduction to Machine Learning by Bayesian Inference".
Recommended Posts