Some people were tired of the labeling work for classifying documents in supervised learning, so I made a semi-supervised learning text classification so that classification can be done with a small number of labels.
Sorry for the pretty dirty code, but I'll paste it below. The DBN code is a round copy of the Deep Learning Tutorial. The Deep Learning Tutorial site has explanations with mathematical formulas, so I would like to give detailed explanations there.
After reading the CSV data in BoW format and extracting the features, it feels like spitting out the CSV. Please note that it contains some methods that are not used.
python
# coding:utf-8
from __future__ import unicode_literals
import time
import numpy as np
import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams
class DBN:
def __init__(self, numpy_rng, theano_rng=None, n_ins=784,
hidden_layers_sizes=[500, 500], n_outs=10):
self.sigmoid_layers = []
self.rbm_layers = []
self.params = []
self.n_layers = len(hidden_layers_sizes)
assert self.n_layers > 0
if not theano_rng:
theano_rng = RandomStreams(numpy_rng.randint(2**30))
# allocate symbolic variables for the data
self.x = T.matrix('x')
self.y = T.ivector('y')
for i in xrange(self.n_layers):
if i==0:
input_size = n_ins
layer_input = self.x
else:
input_size = hidden_layers_sizes[i - 1]
layer_input = self.sigmoid_layers[-1].output
sigmoid_layer = HiddenLayer(rng=numpy_rng,
input=layer_input,
n_in=input_size,
n_out=hidden_layers_sizes[i],
activation=T.nnet.sigmoid)
self.sigmoid_layers.append( sigmoid_layer )
self.params.extend(sigmoid_layer.params)
rbm_layer = RBM(numpy_rng=numpy_rng,
theano_rng=theano_rng,
input=layer_input,
n_visible=input_size,
n_hidden=hidden_layers_sizes[i],
W=sigmoid_layer.W,
hbias=sigmoid_layer.b)
self.rbm_layers.append(rbm_layer)
self.logLayer = LogisticRegression(
input=self.sigmoid_layers[-1].output,
n_in=hidden_layers_sizes[-1],
n_out=n_outs)
self.params.extend(self.logLayer.params)
self.finetune_cost = self.logLayer.negative_log_likelihood(self.y)
self.errors = self.logLayer.errors(self.y)
def pretrainig_functions(self, train_set_x, batch_size, k):
index = T.lscalar('index')
learning_rate = T.scalar('lr') # learning rate to use
# number of batches
n_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
# begining of a batch, given `index`
batch_begin = index * batch_size
# ending of a batch given `index`
batch_end = batch_begin + batch_size
pretrain_fns = []
for rbm in self.rbm_layers:
cost, updates = rbm.get_cost_updates(learning_rate,
persistent=None, k=k)
# compile the theano function
fn = theano.function(
inputs=[index, theano.Param(learning_rate, default=0.1)],
outputs=cost,
updates=updates,
givens={
self.x: train_set_x[batch_begin:batch_end]
}
)
# append `fn` to the list of functions
pretrain_fns.append(fn)
return pretrain_fns
def build_finetune_functions(self, datasets, batch_size, learning_rate):
(train_set_x, train_set_y) = datasets[0]
(valid_set_x, valid_set_y) = datasets[1]
(test_set_x, test_set_y) = datasets[2]
# compute number of minibatches for training, validation and testing
n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
n_valid_batches /= batch_size
n_test_batches = test_set_x.get_value(borrow=True).shape[0]
n_test_batches /= batch_size
index = T.lscalar('index') # index to a [mini]batch
# compute the gradients with respect to the model parameters
gparams = T.grad(self.finetune_cost, self.params)
# compute list of fine-tuning updates
updates = []
for param, gparam in zip(self.params, gparams):
updates.append((param, param - gparam * learning_rate))
train_fn = theano.function(
inputs=[index],
outputs=self.finetune_cost,
updates=updates,
givens={
self.x: train_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: train_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)
test_score_i = theano.function(
[index],
self.errors,
givens={
self.x: test_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: test_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)
valid_score_i = theano.function(
[index],
self.errors,
givens={
self.x: valid_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: valid_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)
# Create a function that scans the entire validation set
def valid_score():
return [valid_score_i(i) for i in xrange(n_valid_batches)]
# Create a function that scans the entire test set
def test_score():
return [test_score_i(i) for i in xrange(n_test_batches)]
return train_fn, valid_score, test_score
class HiddenLayer:
def __init__(self, rng, input, n_in, n_out, W=None, b=None,
activation=T.tanh):
self.input = input
if W is None:
W_values = np.asarray(
rng.uniform(
low=-np.sqrt(6. / (n_in+n_out)),
high=np.sqrt(6. / (n_in+n_out)),
size=(n_in, n_out)
),
dtype=theano.config.floatX
)
if activation == theano.tensor.nnet.sigmoid:
W_values *=4
W = theano.shared(value=W_values, name='W', borrow=True)
if b is None:
b_values = np.zeros((n_out,), dtype=theano.config.floatX)
b = theano.shared(value=b_values, name='b', borrow=True)
self.W = W
self.b = b
lin_output = T.dot(input, self.W) + self.b
self.output = (
lin_output if activation is None
else activation(lin_output)
)
self.params = [self.W, self.b]
class LogisticRegression:
def __init__(self, input, n_in, n_out):
self.W = theano.shared(
value=np.zeros(
(n_in, n_out),
dtype=theano.config.floatX
),
name='W',
borrow=True
)
# initialize the baises b as a vector of n_out 0s
self.b = theano.shared(
value=np.zeros(
(n_out,),
dtype=theano.config.floatX
),
name='b',
borrow=True
)
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
self.params = [self.W, self.b]
def negative_log_likelihood(self, y):
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
def errors(self, y):
# check if y has same dimension of y_pred
if y.ndim != self.y_pred.ndim:
raise TypeError(
'y should have the same shape as self.y_pred',
('y', y.type, 'y_pred', self.y_pred.type)
)
# check if y is of the correct datatype
if y.dtype.startswith('int'):
return T.mean(T.neq(self.y_pred, y))
else:
raise NotImplementedError()
class RBM(object):
"""Restricted Boltzmann Machine (RBM) """
def __init__(
self,
input=None,
n_visible=784,
n_hidden=500,
W=None,
hbias=None,
vbias=None,
numpy_rng=None,
theano_rng=None
):
self.n_visible = n_visible
self.n_hidden = n_hidden
if numpy_rng is None:
# create a number generator
numpy_rng = np.random.RandomState(1234)
if theano_rng is None:
theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))
if W is None:
initial_W = np.asarray(
numpy_rng.uniform(
low=-4 * np.sqrt(6. / (n_hidden + n_visible)),
high=4 * np.sqrt(6. / (n_hidden + n_visible)),
size=(n_visible, n_hidden)
),
dtype=theano.config.floatX
)
# theano shared variables for weights and biases
W = theano.shared(value=initial_W, name='W', borrow=True)
if hbias is None:
# create shared variable for hidden units bias
hbias = theano.shared(
value=np.zeros(
n_hidden,
dtype=theano.config.floatX
),
name='hbias',
borrow=True
)
if vbias is None:
# create shared variable for visible units bias
vbias = theano.shared(
value=np.zeros(
n_visible,
dtype=theano.config.floatX
),
name='vbias',
borrow=True
)
# initialize input layer for standalone RBM or layer0 of DBN
self.input = input
if not input:
self.input = T.matrix('input')
self.W = W
self.hbias = hbias
self.vbias = vbias
self.theano_rng = theano_rng
self.params = [self.W, self.hbias, self.vbias]
def propup(self, vis):
pre_sigmoid_activation = T.dot(vis, self.W) + self.hbias
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]
def sample_h_given_v(self, v0_sample):
''' This function infers state of hidden units given visible units '''
pre_sigmoid_h1, h1_mean = self.propup(v0_sample)
h1_sample = self.theano_rng.binomial(size=h1_mean.shape,
n=1, p=h1_mean,
dtype=theano.config.floatX)
return [pre_sigmoid_h1, h1_mean, h1_sample]
def propdown(self, hid):
pre_sigmoid_activation = T.dot(hid, self.W.T) + self.vbias
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]
def sample_v_given_h(self, h0_sample):
''' This function infers state of visible units given hidden units '''
# compute the activation of the visible given the hidden sample
pre_sigmoid_v1, v1_mean = self.propdown(h0_sample)
v1_sample = self.theano_rng.binomial(size=v1_mean.shape,
n=1, p=v1_mean,
dtype=theano.config.floatX)
return [pre_sigmoid_v1, v1_mean, v1_sample]
def gibbs_hvh(self, h0_sample):
''' This function implements one step of Gibbs sampling,
starting from the hidden state'''
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h0_sample)
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v1_sample)
return [pre_sigmoid_v1, v1_mean, v1_sample,
pre_sigmoid_h1, h1_mean, h1_sample]
def gibbs_vhv(self, v0_sample):
''' This function implements one step of Gibbs sampling,
starting from the visible state'''
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v0_sample)
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h1_sample)
return [pre_sigmoid_h1, h1_mean, h1_sample,
pre_sigmoid_v1, v1_mean, v1_sample]
def free_energy(self, v_sample):
''' Function to compute the free energy '''
wx_b = T.dot(v_sample, self.W) + self.hbias
vbias_term = T.dot(v_sample, self.vbias)
hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis=1)
return -hidden_term - vbias_term
def get_cost_updates(self, lr=0.1, persistent=None, k=1):
# compute positive phase
pre_sigmoid_ph, ph_mean, ph_sample = self.sample_h_given_v(self.input)
if persistent is None:
chain_start = ph_sample
else:
chain_start = persistent
(
[
pre_sigmoid_nvs,
nv_means,
nv_samples,
pre_sigmoid_nhs,
nh_means,
nh_samples
],
updates
) = theano.scan(
self.gibbs_hvh,
outputs_info=[None, None, None, None, None, chain_start],
n_steps=k
)
chain_end = nv_samples[-1]
cost = T.mean(self.free_energy(self.input)) - T.mean(
self.free_energy(chain_end))
# We must not compute the gradient through the gibbs sampling
gparams = T.grad(cost, self.params, consider_constant=[chain_end])
for gparam, param in zip(gparams, self.params):
# make sure that the learning rate is of the right dtype
updates[param] = param - gparam * T.cast(
lr,
dtype=theano.config.floatX
)
if persistent:
# Note that this works only if persistent is a shared variable
updates[persistent] = nh_samples[-1]
# pseudo-likelihood is a better proxy for PCD
monitoring_cost = self.get_pseudo_likelihood_cost(updates)
else:
# reconstruction cross-entropy is a better proxy for CD
monitoring_cost = self.get_reconstruction_cost(updates,
pre_sigmoid_nvs[-1])
return monitoring_cost, updates
def get_pseudo_likelihood_cost(self, updates):
"""Stochastic approximation to the pseudo-likelihood"""
# index of bit i in expression p(x_i | x_{\i})
bit_i_idx = theano.shared(value=0, name='bit_i_idx')
# binarize the input image by rounding to nearest integer
xi = T.round(self.input)
# calculate free energy for the given bit configuration
fe_xi = self.free_energy(xi)
# flip bit x_i of matrix xi and preserve all other bits x_{\i}
# Equivalent to xi[:,bit_i_idx] = 1-xi[:, bit_i_idx], but assigns
# the result to xi_flip, instead of working in place on xi.
xi_flip = T.set_subtensor(xi[:, bit_i_idx], 1 - xi[:, bit_i_idx])
# calculate free energy with bit flipped
fe_xi_flip = self.free_energy(xi_flip)
# equivalent to e^(-FE(x_i)) / (e^(-FE(x_i)) + e^(-FE(x_{\i})))
cost = T.mean(self.n_visible * T.log(T.nnet.sigmoid(fe_xi_flip -
fe_xi)))
# increment bit_i_idx % number as part of updates
updates[bit_i_idx] = (bit_i_idx + 1) % self.n_visible
return cost
def get_reconstruction_cost(self, updates, pre_sigmoid_nv):
cross_entropy = T.mean(
T.sum(
self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +
(1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),
axis=1
)
)
return cross_entropy
def output(input_data, w, b):
x = np.dot(input_data,w)+np.kron( np.ones((input_data.shape[0],1)),b)
return 1/(1+np.exp(-x))
if __name__=='__main__':
numpy_rng = np.random.RandomState(123)
print '... building the model'
ifname = 'bow_data.csv'
data = np.loadtxt(ifname, delimiter=',')
train_set_x = theano.shared(np.asarray(data, np.float64))
dbn = DBN(numpy_rng=numpy_rng, n_ins=data.shape[1],
hidden_layers_sizes=[2000, 1000, 100],
n_outs=10)
#########################
# PRETRAINING THE MODEL #
#########################
print '... getting the pretraining functions'
batch_size=10
k = 5
pretraining_fns = dbn.pretrainig_functions(train_set_x=train_set_x,
batch_size=batch_size,
k=k)
print '... pre-training the model'
pretraining_epochs = 100
n_train_batches = 10
pretrain_lr = 0.1
## Pre-train layer-wise
for i in xrange(dbn.n_layers):
# go through pretraining epochs
for epoch in xrange(pretraining_epochs):
# go through the training set
c = []
for batch_index in xrange(n_train_batches):
c.append(pretraining_fns[i](index=batch_index,
lr=pretrain_lr))
print 'Pre-training layer %i, epoch %d, cost ' % (i, epoch),
print np.mean(c)
layer_output =[]
for i in xrange(dbn.n_layers):
w = dbn.rbm_layers[i].W.get_value()
hbias = dbn.rbm_layers[i].hbias.get_value()
if i==0:
layer_output.append( train_set_x.get_value() )
layer_output.append( output(layer_output[-1],w, hbias) )
else:
layer_output.append( output(layer_output[-1],w, hbias) )
print layer_output[-1]
np.savetxt('DBN_features.csv',layer_output[-1], delimiter=',')
The content of original_data.csv is (0 \ t1 0 1 0 0 0 0 \ txxxx) and consists of one line. What it represents is from the left Flag of data that spans multiple classes, flag of each class, text It has become. In the code below, when training, data with only one label is used as teacher data as much as possible. I have added a function so that you can check it with iris data, so please use it if you want to try Label Spreading when you do not have suitable data at hand.
python
# coding: utf-8
from sklearn import datasets
from sklearn.semi_supervised import LabelSpreading
import numpy as np
from numpy.random import seed
seed(555)
from collections import defaultdict
def iris():
iris = datasets.load_iris()
random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(iris.target)))
labels = np.copy(iris.target)
labels[random_unlabeled_points] = -1
label_prop_model = LabelSpreading()
label_prop_model.fit(iris.data, labels) # unlabeled as -1
pred_prop = label_prop_model.predict_proba(iris.data)
pred_label = label_prop_model.predict(iris.data)
for pp, pl, label, trgt in zip(pred_prop,pred_label,labels,iris.target):
print pp, pl, label, trgt
def main(X, labels_info, min_number=20, label_num=6, n_neighbors=7, alpha=0.3, typ='knn', threshold=0.5):
target = get_target(labels_info)
random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(target)))[0]
cnt_dict = defaultdict(int)
for i, t in enumerate(target):
if len(t)==1 and (i in random_unlabeled_points):
target[i] = -1
cnt_dict[-1] += 1
elif len(t)>=2:
target[i] = -1
cnt_dict[-1] += 1
elif cnt_dict[target[i][0]]<min_number:
target[i] = target[i][0]
cnt_dict[target[i]] += 1
elif cnt_dict[target[i][0]]>=min_number:
target[i] = -1
cnt_dict[target[i]] += 1
print cnt_dict
if typ=='knn':
label_prop_model = LabelSpreading(kernel=typ, n_neighbors=n_neighbors)
else:
label_prop_model = LabelSpreading(kernel=typ, alpha=alpha)
label_prop_model.fit(X, target) # unlabeled as -1
pred_prop = label_prop_model.predict_proba(X)
pred_label = label_prop_model.predict(X)
res_dict = defaultdict(dict) # TP, FP, FN,Store TN
for label in ('TP', 'FP', 'FN', 'TN'):
res_dict[label] = defaultdict(int)
label_dict = defaultdict(int)
for pp, pl, labels, trgt in zip(pred_prop,pred_label,get_target(labels_info),target):
#label is the correct label
print pp, np.where(pp>=threshold)[0]+1, labels, trgt
#Label that came out in the prediction
#Softmax
predicted_labels = np.where(pp/np.sum(pp)>=threshold)[0]+1
# predicted_labels = [int(pl)]
#Labels not included in the correct answer
F_labels = set([l+1 for l in xrange(label_num)]).difference(label)
#Labels not included in the forecast
predicted_F_labels = \
set([l+1 for l in xrange(label_num)]).difference(predicted_labels)
#Store TP in dictionary
print 'TP labels:'
print set(labels).intersection(predicted_labels)
for tp_l in set(labels).intersection(predicted_labels):
res_dict['TP'][tp_l] += 1
#Store FP in dictionary
print 'FP labels:'
print set(predicted_labels).difference(labels)
for fp_l in set(predicted_labels).difference(labels):
res_dict['FP'][fp_l] += 1
#Store FN in dictionary
print 'FN labels'
print set(labels).difference(predicted_labels)
for fn_l in set(labels).difference(predicted_labels):
res_dict['FN'][fn_l] += 1
#Store TN in dictionary
print 'TN labels'
print set(F_labels).intersection(predicted_F_labels)
for tn_l in set(F_labels).intersection(predicted_F_labels):
res_dict['TN'][tn_l] += 1
#Count the number of each correct label
for l in labels:
label_dict[l] += 1
for i_label in xrange(label_num):
print "label=",i_label+1
print 'TP:', res_dict['TP'][i_label+1], 'FP:',res_dict['FP'][i_label+1], 'FN:', res_dict['FN'][i_label+1], 'TN:',res_dict['TN'][i_label+1]
print float(res_dict['TP'][i_label+1])/label_dict[i_label+1], float(res_dict['FP'][i_label+1])/label_dict[i_label+1], float(res_dict['FN'][i_label+1])/label_dict[i_label+1], float(res_dict['TN'][i_label+1])/label_dict[i_label+1]
accuracy = float(res_dict['TP'][i_label+1]+res_dict['TN'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1]+res_dict['FN'][i_label+1]+res_dict['TN'][i_label+1])
precision = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1])
recall = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FN'][i_label+1])
f_measure = (2*recall*precision)/(recall+precision)
print 'Accuracy:', accuracy, 'Precision:', precision, 'Recall:', recall, 'F-measure:', f_measure
#Corrected the correct data label to a number from 1 to n
def get_target(labels_info):
result = []
raw_target = labels_info[:,1:]
for line in raw_target:
result.append( np.where(line==1)[0]+1 )
return result
def get_labels():
pass
def get_labels_info(label_fname):
label_flag = []
label_flag_apd = label_flag.append
labels_info = []
labels_info_apd = labels_info.append
with open(label_fname, 'r') as f:
for line in f:
data = line.strip().split('\t')
label_flag_apd(int(data[0]))
labels_info_apd(
np.array(data[1].strip().split(' '), dtype=np.int32 )
)
return np.hstack( (np.array(label_flag).reshape((len(label_flag), 1)), np.array(labels_info)) )
if __name__=='__main__':
ifname = 'DBN_features.csv'
label_fname = 'original_data.csv'
X =np.loadtxt(ifname, delimiter=',')
labels_info = get_labels_info(label_fname)
##typ is{knn,rbf}Select from
main(X, labels_info, 50, label_num=6, n_neighbors=7, alpha=0.2, typ='knn', threshold=0.5)
We apologize for the inconvenience, but we would appreciate it if you could point out any mistakes.