[PYTHON] Semi-supervised label learning using DBN and Label Spreading

Some people were tired of the labeling work for classifying documents in supervised learning, so I made a semi-supervised learning text classification so that classification can be done with a small number of labels.

Reference material

What i did

procedure

Feature extraction with DBN

Semi-supervised learning with Label Spreading

advantage

Advantages of feature extraction with DBN

Benefits of Label Spreading

Reference code

Sorry for the pretty dirty code, but I'll paste it below. The DBN code is a round copy of the Deep Learning Tutorial. The Deep Learning Tutorial site has explanations with mathematical formulas, so I would like to give detailed explanations there.

DBN code

After reading the CSV data in BoW format and extracting the features, it feels like spitting out the CSV. Please note that it contains some methods that are not used.

python


# coding:utf-8

from __future__ import unicode_literals
import time

import numpy as np
import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams


class DBN:
    def __init__(self, numpy_rng, theano_rng=None, n_ins=784,
                 hidden_layers_sizes=[500, 500], n_outs=10):
        self.sigmoid_layers = []
        self.rbm_layers = []
        self.params = []
        self.n_layers = len(hidden_layers_sizes)

        assert self.n_layers > 0

        if not theano_rng:
            theano_rng = RandomStreams(numpy_rng.randint(2**30))

        # allocate symbolic variables for the data
        self.x = T.matrix('x')
        self.y = T.ivector('y')

        for i in xrange(self.n_layers):
            if i==0:
                input_size = n_ins
                layer_input = self.x
            else:
                input_size = hidden_layers_sizes[i - 1]
                layer_input = self.sigmoid_layers[-1].output

            sigmoid_layer = HiddenLayer(rng=numpy_rng,
                                         input=layer_input,
                                         n_in=input_size,
                                         n_out=hidden_layers_sizes[i],
                                         activation=T.nnet.sigmoid)
            self.sigmoid_layers.append( sigmoid_layer )
            self.params.extend(sigmoid_layer.params)

            rbm_layer = RBM(numpy_rng=numpy_rng,
                            theano_rng=theano_rng,
                            input=layer_input,
                            n_visible=input_size,
                            n_hidden=hidden_layers_sizes[i],
                            W=sigmoid_layer.W,
                            hbias=sigmoid_layer.b)
            self.rbm_layers.append(rbm_layer)

            self.logLayer = LogisticRegression(
                input=self.sigmoid_layers[-1].output,
                n_in=hidden_layers_sizes[-1],
                n_out=n_outs)
            self.params.extend(self.logLayer.params)
            self.finetune_cost = self.logLayer.negative_log_likelihood(self.y)
            self.errors = self.logLayer.errors(self.y)

    def pretrainig_functions(self, train_set_x, batch_size, k):
        index = T.lscalar('index')
        learning_rate = T.scalar('lr')  # learning rate to use
        # number of batches
        n_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
        # begining of a batch, given `index`
        batch_begin = index * batch_size
        # ending of a batch given `index`
        batch_end = batch_begin + batch_size

        pretrain_fns = []
        for rbm in self.rbm_layers:

            cost, updates = rbm.get_cost_updates(learning_rate,
                                                 persistent=None, k=k)

            # compile the theano function
            fn = theano.function(
                inputs=[index, theano.Param(learning_rate, default=0.1)],
                outputs=cost,
                updates=updates,
                givens={
                    self.x: train_set_x[batch_begin:batch_end]
                }
            )
            # append `fn` to the list of functions
            pretrain_fns.append(fn)

        return pretrain_fns


    def build_finetune_functions(self, datasets, batch_size, learning_rate):
        (train_set_x, train_set_y) = datasets[0]
        (valid_set_x, valid_set_y) = datasets[1]
        (test_set_x, test_set_y) = datasets[2]

        # compute number of minibatches for training, validation and testing
        n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
        n_valid_batches /= batch_size
        n_test_batches = test_set_x.get_value(borrow=True).shape[0]
        n_test_batches /= batch_size

        index = T.lscalar('index')  # index to a [mini]batch

        # compute the gradients with respect to the model parameters
        gparams = T.grad(self.finetune_cost, self.params)

        # compute list of fine-tuning updates
        updates = []
        for param, gparam in zip(self.params, gparams):
            updates.append((param, param - gparam * learning_rate))

        train_fn = theano.function(
            inputs=[index],
            outputs=self.finetune_cost,
            updates=updates,
            givens={
                self.x: train_set_x[
                    index * batch_size: (index + 1) * batch_size
                ],
                self.y: train_set_y[
                    index * batch_size: (index + 1) * batch_size
                ]
            }
        )

        test_score_i = theano.function(
            [index],
            self.errors,
            givens={
                self.x: test_set_x[
                    index * batch_size: (index + 1) * batch_size
                ],
                self.y: test_set_y[
                    index * batch_size: (index + 1) * batch_size
                ]
            }
        )

        valid_score_i = theano.function(
            [index],
            self.errors,
            givens={
                self.x: valid_set_x[
                    index * batch_size: (index + 1) * batch_size
                ],
                self.y: valid_set_y[
                    index * batch_size: (index + 1) * batch_size
                ]
            }
        )

        # Create a function that scans the entire validation set
        def valid_score():
            return [valid_score_i(i) for i in xrange(n_valid_batches)]

        # Create a function that scans the entire test set
        def test_score():
            return [test_score_i(i) for i in xrange(n_test_batches)]

        return train_fn, valid_score, test_score


class HiddenLayer:
    def __init__(self, rng, input, n_in, n_out, W=None, b=None,
                 activation=T.tanh):
        self.input = input

        if W is None:
            W_values = np.asarray(
                rng.uniform(
                    low=-np.sqrt(6. / (n_in+n_out)),
                    high=np.sqrt(6. / (n_in+n_out)),
                    size=(n_in, n_out)
                ),
                dtype=theano.config.floatX
            )
            if activation == theano.tensor.nnet.sigmoid:
                W_values *=4
            W = theano.shared(value=W_values, name='W', borrow=True)

        if b is None:
            b_values = np.zeros((n_out,), dtype=theano.config.floatX)
            b = theano.shared(value=b_values, name='b', borrow=True)
        self.W = W
        self.b = b

        lin_output = T.dot(input, self.W) + self.b
        self.output = (
              lin_output if activation is None
              else activation(lin_output)
        )
        self.params = [self.W, self.b]

class LogisticRegression:
    def __init__(self, input, n_in, n_out):
        self.W = theano.shared(
            value=np.zeros(
                (n_in, n_out),
                dtype=theano.config.floatX
            ),
            name='W',
            borrow=True
        )
        # initialize the baises b as a vector of n_out 0s
        self.b = theano.shared(
            value=np.zeros(
                (n_out,),
                dtype=theano.config.floatX
            ),
            name='b',
            borrow=True
        )
        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
        self.y_pred = T.argmax(self.p_y_given_x, axis=1)
        self.params = [self.W, self.b]

    def negative_log_likelihood(self, y):
        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])

    def errors(self, y):
        # check if y has same dimension of y_pred
        if y.ndim != self.y_pred.ndim:
            raise TypeError(
                'y should have the same shape as self.y_pred',
                ('y', y.type, 'y_pred', self.y_pred.type)
            )
        # check if y is of the correct datatype
        if y.dtype.startswith('int'):
            return T.mean(T.neq(self.y_pred, y))
        else:
            raise NotImplementedError()


class RBM(object):
    """Restricted Boltzmann Machine (RBM)  """
    def __init__(
        self,
        input=None,
        n_visible=784,
        n_hidden=500,
        W=None,
        hbias=None,
        vbias=None,
        numpy_rng=None,
        theano_rng=None
    ):

        self.n_visible = n_visible
        self.n_hidden = n_hidden

        if numpy_rng is None:
            # create a number generator
            numpy_rng = np.random.RandomState(1234)

        if theano_rng is None:
            theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))

        if W is None:
            initial_W = np.asarray(
                numpy_rng.uniform(
                    low=-4 * np.sqrt(6. / (n_hidden + n_visible)),
                    high=4 * np.sqrt(6. / (n_hidden + n_visible)),
                    size=(n_visible, n_hidden)
                ),
                dtype=theano.config.floatX
            )
            # theano shared variables for weights and biases
            W = theano.shared(value=initial_W, name='W', borrow=True)

        if hbias is None:
            # create shared variable for hidden units bias
            hbias = theano.shared(
                value=np.zeros(
                    n_hidden,
                    dtype=theano.config.floatX
                ),
                name='hbias',
                borrow=True
            )

        if vbias is None:
            # create shared variable for visible units bias
            vbias = theano.shared(
                value=np.zeros(
                    n_visible,
                    dtype=theano.config.floatX
                ),
                name='vbias',
                borrow=True
            )

        # initialize input layer for standalone RBM or layer0 of DBN
        self.input = input
        if not input:
            self.input = T.matrix('input')

        self.W = W
        self.hbias = hbias
        self.vbias = vbias
        self.theano_rng = theano_rng
        self.params = [self.W, self.hbias, self.vbias]

    def propup(self, vis):
        pre_sigmoid_activation = T.dot(vis, self.W) + self.hbias
        return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]


    def sample_h_given_v(self, v0_sample):
        ''' This function infers state of hidden units given visible units '''
        pre_sigmoid_h1, h1_mean = self.propup(v0_sample)
        h1_sample = self.theano_rng.binomial(size=h1_mean.shape,
                                             n=1, p=h1_mean,
                                             dtype=theano.config.floatX)
        return [pre_sigmoid_h1, h1_mean, h1_sample]


    def propdown(self, hid):
        pre_sigmoid_activation = T.dot(hid, self.W.T) + self.vbias
        return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]


    def sample_v_given_h(self, h0_sample):
        ''' This function infers state of visible units given hidden units '''
        # compute the activation of the visible given the hidden sample
        pre_sigmoid_v1, v1_mean = self.propdown(h0_sample)
        v1_sample = self.theano_rng.binomial(size=v1_mean.shape,
                                             n=1, p=v1_mean,
                                             dtype=theano.config.floatX)
        return [pre_sigmoid_v1, v1_mean, v1_sample]


    def gibbs_hvh(self, h0_sample):
        ''' This function implements one step of Gibbs sampling,
            starting from the hidden state'''
        pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h0_sample)
        pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v1_sample)
        return [pre_sigmoid_v1, v1_mean, v1_sample,
                pre_sigmoid_h1, h1_mean, h1_sample]


    def gibbs_vhv(self, v0_sample):
        ''' This function implements one step of Gibbs sampling,
            starting from the visible state'''
        pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v0_sample)
        pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h1_sample)
        return [pre_sigmoid_h1, h1_mean, h1_sample,
                pre_sigmoid_v1, v1_mean, v1_sample]


    def free_energy(self, v_sample):
        ''' Function to compute the free energy '''
        wx_b = T.dot(v_sample, self.W) + self.hbias
        vbias_term = T.dot(v_sample, self.vbias)
        hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis=1)
        return -hidden_term - vbias_term


    def get_cost_updates(self, lr=0.1, persistent=None, k=1):
        # compute positive phase
        pre_sigmoid_ph, ph_mean, ph_sample = self.sample_h_given_v(self.input)

        if persistent is None:
            chain_start = ph_sample
        else:
            chain_start = persistent
        (
            [
                pre_sigmoid_nvs,
                nv_means,
                nv_samples,
                pre_sigmoid_nhs,
                nh_means,
                nh_samples
            ],
            updates
        ) = theano.scan(
            self.gibbs_hvh,
            outputs_info=[None, None, None, None, None, chain_start],
            n_steps=k
        )

        chain_end = nv_samples[-1]
        cost = T.mean(self.free_energy(self.input)) - T.mean(
            self.free_energy(chain_end))
        # We must not compute the gradient through the gibbs sampling
        gparams = T.grad(cost, self.params, consider_constant=[chain_end])


        for gparam, param in zip(gparams, self.params):
            # make sure that the learning rate is of the right dtype
            updates[param] = param - gparam * T.cast(
                lr,
                dtype=theano.config.floatX
            )
        if persistent:
            # Note that this works only if persistent is a shared variable
            updates[persistent] = nh_samples[-1]
            # pseudo-likelihood is a better proxy for PCD
            monitoring_cost = self.get_pseudo_likelihood_cost(updates)
        else:
            # reconstruction cross-entropy is a better proxy for CD
            monitoring_cost = self.get_reconstruction_cost(updates,
                                                           pre_sigmoid_nvs[-1])

        return monitoring_cost, updates

    def get_pseudo_likelihood_cost(self, updates):
        """Stochastic approximation to the pseudo-likelihood"""

        # index of bit i in expression p(x_i | x_{\i})
        bit_i_idx = theano.shared(value=0, name='bit_i_idx')

        # binarize the input image by rounding to nearest integer
        xi = T.round(self.input)

        # calculate free energy for the given bit configuration
        fe_xi = self.free_energy(xi)

        # flip bit x_i of matrix xi and preserve all other bits x_{\i}
        # Equivalent to xi[:,bit_i_idx] = 1-xi[:, bit_i_idx], but assigns
        # the result to xi_flip, instead of working in place on xi.
        xi_flip = T.set_subtensor(xi[:, bit_i_idx], 1 - xi[:, bit_i_idx])

        # calculate free energy with bit flipped
        fe_xi_flip = self.free_energy(xi_flip)

        # equivalent to e^(-FE(x_i)) / (e^(-FE(x_i)) + e^(-FE(x_{\i})))
        cost = T.mean(self.n_visible * T.log(T.nnet.sigmoid(fe_xi_flip -
                                                            fe_xi)))

        # increment bit_i_idx % number as part of updates
        updates[bit_i_idx] = (bit_i_idx + 1) % self.n_visible

        return cost


    def get_reconstruction_cost(self, updates, pre_sigmoid_nv):
        cross_entropy = T.mean(
             T.sum(
                 self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +
                 (1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),
                 axis=1
             )
        )

        return cross_entropy

def output(input_data, w, b):
    x = np.dot(input_data,w)+np.kron( np.ones((input_data.shape[0],1)),b)
    return 1/(1+np.exp(-x))

if __name__=='__main__':
    numpy_rng = np.random.RandomState(123)
    print '... building the model'

    ifname = 'bow_data.csv'
    data = np.loadtxt(ifname, delimiter=',')
    train_set_x = theano.shared(np.asarray(data, np.float64))

    dbn = DBN(numpy_rng=numpy_rng, n_ins=data.shape[1],
              hidden_layers_sizes=[2000, 1000, 100],
              n_outs=10)
    #########################
    # PRETRAINING THE MODEL #
    #########################
    print '... getting the pretraining functions'
    batch_size=10
    k = 5
    pretraining_fns = dbn.pretrainig_functions(train_set_x=train_set_x,
                                                batch_size=batch_size,
                                                k=k)

    print '... pre-training the model'
    pretraining_epochs = 100
    n_train_batches = 10
    pretrain_lr = 0.1
    ## Pre-train layer-wise
    for i in xrange(dbn.n_layers):
        # go through pretraining epochs
        for epoch in xrange(pretraining_epochs):
            # go through the training set
            c = []
            for batch_index in xrange(n_train_batches):
                c.append(pretraining_fns[i](index=batch_index,
                                            lr=pretrain_lr))
            print 'Pre-training layer %i, epoch %d, cost ' % (i, epoch),
            print np.mean(c)

    layer_output =[]
    for i in xrange(dbn.n_layers):
        w = dbn.rbm_layers[i].W.get_value()
        hbias = dbn.rbm_layers[i].hbias.get_value()
        if i==0:
            layer_output.append( train_set_x.get_value() )
            layer_output.append( output(layer_output[-1],w, hbias) )
        else:
            layer_output.append( output(layer_output[-1],w, hbias) )
    print layer_output[-1]
    np.savetxt('DBN_features.csv',layer_output[-1], delimiter=',')

Label Spreading code

The content of original_data.csv is (0 \ t1 0 1 0 0 0 0 \ txxxx) and consists of one line. What it represents is from the left Flag of data that spans multiple classes, flag of each class, text It has become. In the code below, when training, data with only one label is used as teacher data as much as possible. I have added a function so that you can check it with iris data, so please use it if you want to try Label Spreading when you do not have suitable data at hand.

python


# coding: utf-8

from sklearn import datasets
from sklearn.semi_supervised import LabelSpreading
import numpy as np
from numpy.random import seed
seed(555)
from collections import defaultdict

def iris():
    iris = datasets.load_iris()
    random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(iris.target)))
    labels = np.copy(iris.target)
    labels[random_unlabeled_points] = -1

    label_prop_model = LabelSpreading()
    label_prop_model.fit(iris.data, labels) # unlabeled as -1
    pred_prop = label_prop_model.predict_proba(iris.data)
    pred_label = label_prop_model.predict(iris.data)

    for pp, pl, label, trgt in zip(pred_prop,pred_label,labels,iris.target):
        print pp, pl, label, trgt

def main(X, labels_info, min_number=20, label_num=6, n_neighbors=7, alpha=0.3, typ='knn', threshold=0.5):
    target = get_target(labels_info)
    random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(target)))[0]
    cnt_dict = defaultdict(int)
    for i, t in enumerate(target):
        if len(t)==1 and (i in random_unlabeled_points):
            target[i] = -1
            cnt_dict[-1] += 1
        elif len(t)>=2:
            target[i] = -1
            cnt_dict[-1] += 1
        elif cnt_dict[target[i][0]]<min_number:
            target[i] = target[i][0]
            cnt_dict[target[i]] += 1
        elif cnt_dict[target[i][0]]>=min_number:
            target[i] = -1
            cnt_dict[target[i]] += 1
    print cnt_dict

    if typ=='knn':
        label_prop_model = LabelSpreading(kernel=typ, n_neighbors=n_neighbors)
    else:
        label_prop_model = LabelSpreading(kernel=typ, alpha=alpha)
    label_prop_model.fit(X, target) # unlabeled as -1
    pred_prop = label_prop_model.predict_proba(X)
    pred_label = label_prop_model.predict(X)

    res_dict = defaultdict(dict)  # TP, FP, FN,Store TN
    for label in ('TP', 'FP', 'FN', 'TN'):
        res_dict[label] = defaultdict(int)
    label_dict = defaultdict(int)

    for pp, pl, labels, trgt in zip(pred_prop,pred_label,get_target(labels_info),target):
        #label is the correct label
        print pp, np.where(pp>=threshold)[0]+1, labels, trgt
        #Label that came out in the prediction
        #Softmax
        predicted_labels = np.where(pp/np.sum(pp)>=threshold)[0]+1
        # predicted_labels = [int(pl)]
        #Labels not included in the correct answer
        F_labels = set([l+1 for l in xrange(label_num)]).difference(label)
        #Labels not included in the forecast
        predicted_F_labels = \
                    set([l+1 for l in xrange(label_num)]).difference(predicted_labels)

        #Store TP in dictionary
        print 'TP labels:'
        print set(labels).intersection(predicted_labels)
        for tp_l in set(labels).intersection(predicted_labels):
            res_dict['TP'][tp_l] += 1
        #Store FP in dictionary
        print 'FP labels:'
        print set(predicted_labels).difference(labels)
        for fp_l in set(predicted_labels).difference(labels):
            res_dict['FP'][fp_l] += 1
        #Store FN in dictionary
        print 'FN labels'
        print set(labels).difference(predicted_labels)
        for fn_l in set(labels).difference(predicted_labels):
            res_dict['FN'][fn_l] += 1
        #Store TN in dictionary
        print 'TN labels'
        print set(F_labels).intersection(predicted_F_labels)
        for tn_l in set(F_labels).intersection(predicted_F_labels):
            res_dict['TN'][tn_l] += 1
        #Count the number of each correct label
        for l in labels:
            label_dict[l] += 1

    for i_label in xrange(label_num):
        print "label=",i_label+1
        print 'TP:', res_dict['TP'][i_label+1], 'FP:',res_dict['FP'][i_label+1], 'FN:', res_dict['FN'][i_label+1], 'TN:',res_dict['TN'][i_label+1]
        print float(res_dict['TP'][i_label+1])/label_dict[i_label+1], float(res_dict['FP'][i_label+1])/label_dict[i_label+1], float(res_dict['FN'][i_label+1])/label_dict[i_label+1], float(res_dict['TN'][i_label+1])/label_dict[i_label+1]
        accuracy = float(res_dict['TP'][i_label+1]+res_dict['TN'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1]+res_dict['FN'][i_label+1]+res_dict['TN'][i_label+1])
        precision = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1])
        recall = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FN'][i_label+1])
        f_measure = (2*recall*precision)/(recall+precision)
        print 'Accuracy:', accuracy, 'Precision:', precision, 'Recall:', recall, 'F-measure:', f_measure

#Corrected the correct data label to a number from 1 to n
def get_target(labels_info):
    result = []
    raw_target = labels_info[:,1:]
    for line in raw_target:
        result.append( np.where(line==1)[0]+1 )
    return result

def get_labels():
    pass

def get_labels_info(label_fname):
    label_flag = []
    label_flag_apd = label_flag.append
    labels_info = []
    labels_info_apd = labels_info.append
    with open(label_fname, 'r') as f:
        for line in f:
            data = line.strip().split('\t')
            label_flag_apd(int(data[0]))
            labels_info_apd(
                            np.array(data[1].strip().split(' '), dtype=np.int32 )
                            )
    return np.hstack( (np.array(label_flag).reshape((len(label_flag), 1)), np.array(labels_info)) )

if __name__=='__main__':
    ifname = 'DBN_features.csv'
    label_fname = 'original_data.csv'

    X =np.loadtxt(ifname, delimiter=',')
    labels_info = get_labels_info(label_fname)

    ##typ is{knn,rbf}Select from
    main(X, labels_info, 50, label_num=6, n_neighbors=7, alpha=0.2, typ='knn', threshold=0.5)

We apologize for the inconvenience, but we would appreciate it if you could point out any mistakes.

Recommended Posts

Semi-supervised label learning using DBN and Label Spreading
Collection and automation of erotic images using deep learning
Examination of Forecasting Method Using Deep Learning and Wavelet Transform-Part 2-
What I learned about AI and machine learning using Python (4)