Einige Leute hatten die Etikettierungsarbeit für die Klassifizierung von Dokumenten im überwachten Lernen satt, deshalb habe ich eine halbüberwachte Lerntextklassifizierung vorgenommen, damit die Klassifizierung mit einer kleinen Anzahl von Etiketten durchgeführt werden kann.
Entschuldigung für den ziemlich schmutzigen Code, aber ich werde ihn unten einfügen. Der DBN-Code ist eine runde Kopie des Deep Learning Tutorial. Die Deep Learning Tutorial-Site enthält Erklärungen mit mathematischen Formeln, daher möchte ich dort detaillierte Erklärungen geben.
Nach dem Lesen der CSV-Daten im BoW-Format und dem Extrahieren der Features fühlt es sich an, als würde man die CSV ausspucken. Bitte beachten Sie, dass es einige Methoden enthält, die nicht verwendet werden.
python
# coding:utf-8
from __future__ import unicode_literals
import time
import numpy as np
import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams
class DBN:
def __init__(self, numpy_rng, theano_rng=None, n_ins=784,
hidden_layers_sizes=[500, 500], n_outs=10):
self.sigmoid_layers = []
self.rbm_layers = []
self.params = []
self.n_layers = len(hidden_layers_sizes)
assert self.n_layers > 0
if not theano_rng:
theano_rng = RandomStreams(numpy_rng.randint(2**30))
# allocate symbolic variables for the data
self.x = T.matrix('x')
self.y = T.ivector('y')
for i in xrange(self.n_layers):
if i==0:
input_size = n_ins
layer_input = self.x
else:
input_size = hidden_layers_sizes[i - 1]
layer_input = self.sigmoid_layers[-1].output
sigmoid_layer = HiddenLayer(rng=numpy_rng,
input=layer_input,
n_in=input_size,
n_out=hidden_layers_sizes[i],
activation=T.nnet.sigmoid)
self.sigmoid_layers.append( sigmoid_layer )
self.params.extend(sigmoid_layer.params)
rbm_layer = RBM(numpy_rng=numpy_rng,
theano_rng=theano_rng,
input=layer_input,
n_visible=input_size,
n_hidden=hidden_layers_sizes[i],
W=sigmoid_layer.W,
hbias=sigmoid_layer.b)
self.rbm_layers.append(rbm_layer)
self.logLayer = LogisticRegression(
input=self.sigmoid_layers[-1].output,
n_in=hidden_layers_sizes[-1],
n_out=n_outs)
self.params.extend(self.logLayer.params)
self.finetune_cost = self.logLayer.negative_log_likelihood(self.y)
self.errors = self.logLayer.errors(self.y)
def pretrainig_functions(self, train_set_x, batch_size, k):
index = T.lscalar('index')
learning_rate = T.scalar('lr') # learning rate to use
# number of batches
n_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
# begining of a batch, given `index`
batch_begin = index * batch_size
# ending of a batch given `index`
batch_end = batch_begin + batch_size
pretrain_fns = []
for rbm in self.rbm_layers:
cost, updates = rbm.get_cost_updates(learning_rate,
persistent=None, k=k)
# compile the theano function
fn = theano.function(
inputs=[index, theano.Param(learning_rate, default=0.1)],
outputs=cost,
updates=updates,
givens={
self.x: train_set_x[batch_begin:batch_end]
}
)
# append `fn` to the list of functions
pretrain_fns.append(fn)
return pretrain_fns
def build_finetune_functions(self, datasets, batch_size, learning_rate):
(train_set_x, train_set_y) = datasets[0]
(valid_set_x, valid_set_y) = datasets[1]
(test_set_x, test_set_y) = datasets[2]
# compute number of minibatches for training, validation and testing
n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
n_valid_batches /= batch_size
n_test_batches = test_set_x.get_value(borrow=True).shape[0]
n_test_batches /= batch_size
index = T.lscalar('index') # index to a [mini]batch
# compute the gradients with respect to the model parameters
gparams = T.grad(self.finetune_cost, self.params)
# compute list of fine-tuning updates
updates = []
for param, gparam in zip(self.params, gparams):
updates.append((param, param - gparam * learning_rate))
train_fn = theano.function(
inputs=[index],
outputs=self.finetune_cost,
updates=updates,
givens={
self.x: train_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: train_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)
test_score_i = theano.function(
[index],
self.errors,
givens={
self.x: test_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: test_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)
valid_score_i = theano.function(
[index],
self.errors,
givens={
self.x: valid_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: valid_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)
# Create a function that scans the entire validation set
def valid_score():
return [valid_score_i(i) for i in xrange(n_valid_batches)]
# Create a function that scans the entire test set
def test_score():
return [test_score_i(i) for i in xrange(n_test_batches)]
return train_fn, valid_score, test_score
class HiddenLayer:
def __init__(self, rng, input, n_in, n_out, W=None, b=None,
activation=T.tanh):
self.input = input
if W is None:
W_values = np.asarray(
rng.uniform(
low=-np.sqrt(6. / (n_in+n_out)),
high=np.sqrt(6. / (n_in+n_out)),
size=(n_in, n_out)
),
dtype=theano.config.floatX
)
if activation == theano.tensor.nnet.sigmoid:
W_values *=4
W = theano.shared(value=W_values, name='W', borrow=True)
if b is None:
b_values = np.zeros((n_out,), dtype=theano.config.floatX)
b = theano.shared(value=b_values, name='b', borrow=True)
self.W = W
self.b = b
lin_output = T.dot(input, self.W) + self.b
self.output = (
lin_output if activation is None
else activation(lin_output)
)
self.params = [self.W, self.b]
class LogisticRegression:
def __init__(self, input, n_in, n_out):
self.W = theano.shared(
value=np.zeros(
(n_in, n_out),
dtype=theano.config.floatX
),
name='W',
borrow=True
)
# initialize the baises b as a vector of n_out 0s
self.b = theano.shared(
value=np.zeros(
(n_out,),
dtype=theano.config.floatX
),
name='b',
borrow=True
)
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
self.params = [self.W, self.b]
def negative_log_likelihood(self, y):
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
def errors(self, y):
# check if y has same dimension of y_pred
if y.ndim != self.y_pred.ndim:
raise TypeError(
'y should have the same shape as self.y_pred',
('y', y.type, 'y_pred', self.y_pred.type)
)
# check if y is of the correct datatype
if y.dtype.startswith('int'):
return T.mean(T.neq(self.y_pred, y))
else:
raise NotImplementedError()
class RBM(object):
"""Restricted Boltzmann Machine (RBM) """
def __init__(
self,
input=None,
n_visible=784,
n_hidden=500,
W=None,
hbias=None,
vbias=None,
numpy_rng=None,
theano_rng=None
):
self.n_visible = n_visible
self.n_hidden = n_hidden
if numpy_rng is None:
# create a number generator
numpy_rng = np.random.RandomState(1234)
if theano_rng is None:
theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))
if W is None:
initial_W = np.asarray(
numpy_rng.uniform(
low=-4 * np.sqrt(6. / (n_hidden + n_visible)),
high=4 * np.sqrt(6. / (n_hidden + n_visible)),
size=(n_visible, n_hidden)
),
dtype=theano.config.floatX
)
# theano shared variables for weights and biases
W = theano.shared(value=initial_W, name='W', borrow=True)
if hbias is None:
# create shared variable for hidden units bias
hbias = theano.shared(
value=np.zeros(
n_hidden,
dtype=theano.config.floatX
),
name='hbias',
borrow=True
)
if vbias is None:
# create shared variable for visible units bias
vbias = theano.shared(
value=np.zeros(
n_visible,
dtype=theano.config.floatX
),
name='vbias',
borrow=True
)
# initialize input layer for standalone RBM or layer0 of DBN
self.input = input
if not input:
self.input = T.matrix('input')
self.W = W
self.hbias = hbias
self.vbias = vbias
self.theano_rng = theano_rng
self.params = [self.W, self.hbias, self.vbias]
def propup(self, vis):
pre_sigmoid_activation = T.dot(vis, self.W) + self.hbias
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]
def sample_h_given_v(self, v0_sample):
''' This function infers state of hidden units given visible units '''
pre_sigmoid_h1, h1_mean = self.propup(v0_sample)
h1_sample = self.theano_rng.binomial(size=h1_mean.shape,
n=1, p=h1_mean,
dtype=theano.config.floatX)
return [pre_sigmoid_h1, h1_mean, h1_sample]
def propdown(self, hid):
pre_sigmoid_activation = T.dot(hid, self.W.T) + self.vbias
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]
def sample_v_given_h(self, h0_sample):
''' This function infers state of visible units given hidden units '''
# compute the activation of the visible given the hidden sample
pre_sigmoid_v1, v1_mean = self.propdown(h0_sample)
v1_sample = self.theano_rng.binomial(size=v1_mean.shape,
n=1, p=v1_mean,
dtype=theano.config.floatX)
return [pre_sigmoid_v1, v1_mean, v1_sample]
def gibbs_hvh(self, h0_sample):
''' This function implements one step of Gibbs sampling,
starting from the hidden state'''
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h0_sample)
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v1_sample)
return [pre_sigmoid_v1, v1_mean, v1_sample,
pre_sigmoid_h1, h1_mean, h1_sample]
def gibbs_vhv(self, v0_sample):
''' This function implements one step of Gibbs sampling,
starting from the visible state'''
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v0_sample)
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h1_sample)
return [pre_sigmoid_h1, h1_mean, h1_sample,
pre_sigmoid_v1, v1_mean, v1_sample]
def free_energy(self, v_sample):
''' Function to compute the free energy '''
wx_b = T.dot(v_sample, self.W) + self.hbias
vbias_term = T.dot(v_sample, self.vbias)
hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis=1)
return -hidden_term - vbias_term
def get_cost_updates(self, lr=0.1, persistent=None, k=1):
# compute positive phase
pre_sigmoid_ph, ph_mean, ph_sample = self.sample_h_given_v(self.input)
if persistent is None:
chain_start = ph_sample
else:
chain_start = persistent
(
[
pre_sigmoid_nvs,
nv_means,
nv_samples,
pre_sigmoid_nhs,
nh_means,
nh_samples
],
updates
) = theano.scan(
self.gibbs_hvh,
outputs_info=[None, None, None, None, None, chain_start],
n_steps=k
)
chain_end = nv_samples[-1]
cost = T.mean(self.free_energy(self.input)) - T.mean(
self.free_energy(chain_end))
# We must not compute the gradient through the gibbs sampling
gparams = T.grad(cost, self.params, consider_constant=[chain_end])
for gparam, param in zip(gparams, self.params):
# make sure that the learning rate is of the right dtype
updates[param] = param - gparam * T.cast(
lr,
dtype=theano.config.floatX
)
if persistent:
# Note that this works only if persistent is a shared variable
updates[persistent] = nh_samples[-1]
# pseudo-likelihood is a better proxy for PCD
monitoring_cost = self.get_pseudo_likelihood_cost(updates)
else:
# reconstruction cross-entropy is a better proxy for CD
monitoring_cost = self.get_reconstruction_cost(updates,
pre_sigmoid_nvs[-1])
return monitoring_cost, updates
def get_pseudo_likelihood_cost(self, updates):
"""Stochastic approximation to the pseudo-likelihood"""
# index of bit i in expression p(x_i | x_{\i})
bit_i_idx = theano.shared(value=0, name='bit_i_idx')
# binarize the input image by rounding to nearest integer
xi = T.round(self.input)
# calculate free energy for the given bit configuration
fe_xi = self.free_energy(xi)
# flip bit x_i of matrix xi and preserve all other bits x_{\i}
# Equivalent to xi[:,bit_i_idx] = 1-xi[:, bit_i_idx], but assigns
# the result to xi_flip, instead of working in place on xi.
xi_flip = T.set_subtensor(xi[:, bit_i_idx], 1 - xi[:, bit_i_idx])
# calculate free energy with bit flipped
fe_xi_flip = self.free_energy(xi_flip)
# equivalent to e^(-FE(x_i)) / (e^(-FE(x_i)) + e^(-FE(x_{\i})))
cost = T.mean(self.n_visible * T.log(T.nnet.sigmoid(fe_xi_flip -
fe_xi)))
# increment bit_i_idx % number as part of updates
updates[bit_i_idx] = (bit_i_idx + 1) % self.n_visible
return cost
def get_reconstruction_cost(self, updates, pre_sigmoid_nv):
cross_entropy = T.mean(
T.sum(
self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +
(1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),
axis=1
)
)
return cross_entropy
def output(input_data, w, b):
x = np.dot(input_data,w)+np.kron( np.ones((input_data.shape[0],1)),b)
return 1/(1+np.exp(-x))
if __name__=='__main__':
numpy_rng = np.random.RandomState(123)
print '... building the model'
ifname = 'bow_data.csv'
data = np.loadtxt(ifname, delimiter=',')
train_set_x = theano.shared(np.asarray(data, np.float64))
dbn = DBN(numpy_rng=numpy_rng, n_ins=data.shape[1],
hidden_layers_sizes=[2000, 1000, 100],
n_outs=10)
#########################
# PRETRAINING THE MODEL #
#########################
print '... getting the pretraining functions'
batch_size=10
k = 5
pretraining_fns = dbn.pretrainig_functions(train_set_x=train_set_x,
batch_size=batch_size,
k=k)
print '... pre-training the model'
pretraining_epochs = 100
n_train_batches = 10
pretrain_lr = 0.1
## Pre-train layer-wise
for i in xrange(dbn.n_layers):
# go through pretraining epochs
for epoch in xrange(pretraining_epochs):
# go through the training set
c = []
for batch_index in xrange(n_train_batches):
c.append(pretraining_fns[i](index=batch_index,
lr=pretrain_lr))
print 'Pre-training layer %i, epoch %d, cost ' % (i, epoch),
print np.mean(c)
layer_output =[]
for i in xrange(dbn.n_layers):
w = dbn.rbm_layers[i].W.get_value()
hbias = dbn.rbm_layers[i].hbias.get_value()
if i==0:
layer_output.append( train_set_x.get_value() )
layer_output.append( output(layer_output[-1],w, hbias) )
else:
layer_output.append( output(layer_output[-1],w, hbias) )
print layer_output[-1]
np.savetxt('DBN_features.csv',layer_output[-1], delimiter=',')
Der Inhalt von original_data.csv ist (0 \ t1 0 1 0 0 0 0 \ txxxx) und besteht aus einer Zeile. Was es darstellt, ist von links Datenflag, das mehrere Klassen umfasst, Flag jeder Klasse, Text Es ist geworden. Im folgenden Code werden beim Training Daten mit nur einem Etikett so oft wie möglich als Lehrerdaten verwendet. Ich habe eine Funktion hinzugefügt, mit der Sie sie mit Irisdaten überprüfen können. Verwenden Sie sie daher, wenn Sie Label Spreading ausprobieren möchten, wenn Sie keine geeigneten Daten zur Hand haben.
python
# coding: utf-8
from sklearn import datasets
from sklearn.semi_supervised import LabelSpreading
import numpy as np
from numpy.random import seed
seed(555)
from collections import defaultdict
def iris():
iris = datasets.load_iris()
random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(iris.target)))
labels = np.copy(iris.target)
labels[random_unlabeled_points] = -1
label_prop_model = LabelSpreading()
label_prop_model.fit(iris.data, labels) # unlabeled as -1
pred_prop = label_prop_model.predict_proba(iris.data)
pred_label = label_prop_model.predict(iris.data)
for pp, pl, label, trgt in zip(pred_prop,pred_label,labels,iris.target):
print pp, pl, label, trgt
def main(X, labels_info, min_number=20, label_num=6, n_neighbors=7, alpha=0.3, typ='knn', threshold=0.5):
target = get_target(labels_info)
random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(target)))[0]
cnt_dict = defaultdict(int)
for i, t in enumerate(target):
if len(t)==1 and (i in random_unlabeled_points):
target[i] = -1
cnt_dict[-1] += 1
elif len(t)>=2:
target[i] = -1
cnt_dict[-1] += 1
elif cnt_dict[target[i][0]]<min_number:
target[i] = target[i][0]
cnt_dict[target[i]] += 1
elif cnt_dict[target[i][0]]>=min_number:
target[i] = -1
cnt_dict[target[i]] += 1
print cnt_dict
if typ=='knn':
label_prop_model = LabelSpreading(kernel=typ, n_neighbors=n_neighbors)
else:
label_prop_model = LabelSpreading(kernel=typ, alpha=alpha)
label_prop_model.fit(X, target) # unlabeled as -1
pred_prop = label_prop_model.predict_proba(X)
pred_label = label_prop_model.predict(X)
res_dict = defaultdict(dict) # TP, FP, FN,Speichern Sie TN
for label in ('TP', 'FP', 'FN', 'TN'):
res_dict[label] = defaultdict(int)
label_dict = defaultdict(int)
for pp, pl, labels, trgt in zip(pred_prop,pred_label,get_target(labels_info),target):
#label ist das richtige Antwortetikett
print pp, np.where(pp>=threshold)[0]+1, labels, trgt
#Das Etikett, das in der Vorhersage herauskam
#Mach es softmax
predicted_labels = np.where(pp/np.sum(pp)>=threshold)[0]+1
# predicted_labels = [int(pl)]
#Etiketten, die nicht in der richtigen Antwort enthalten sind
F_labels = set([l+1 for l in xrange(label_num)]).difference(label)
#Beschriftungen, die nicht in der Prognose enthalten sind
predicted_F_labels = \
set([l+1 for l in xrange(label_num)]).difference(predicted_labels)
#Speichern Sie TP im Wörterbuch
print 'TP labels:'
print set(labels).intersection(predicted_labels)
for tp_l in set(labels).intersection(predicted_labels):
res_dict['TP'][tp_l] += 1
#Speichern Sie FP im Wörterbuch
print 'FP labels:'
print set(predicted_labels).difference(labels)
for fp_l in set(predicted_labels).difference(labels):
res_dict['FP'][fp_l] += 1
#Speichern Sie FN im Wörterbuch
print 'FN labels'
print set(labels).difference(predicted_labels)
for fn_l in set(labels).difference(predicted_labels):
res_dict['FN'][fn_l] += 1
#Speichern Sie TN im Wörterbuch
print 'TN labels'
print set(F_labels).intersection(predicted_F_labels)
for tn_l in set(F_labels).intersection(predicted_F_labels):
res_dict['TN'][tn_l] += 1
#Zählen Sie die Nummer jedes richtigen Etiketts
for l in labels:
label_dict[l] += 1
for i_label in xrange(label_num):
print "label=",i_label+1
print 'TP:', res_dict['TP'][i_label+1], 'FP:',res_dict['FP'][i_label+1], 'FN:', res_dict['FN'][i_label+1], 'TN:',res_dict['TN'][i_label+1]
print float(res_dict['TP'][i_label+1])/label_dict[i_label+1], float(res_dict['FP'][i_label+1])/label_dict[i_label+1], float(res_dict['FN'][i_label+1])/label_dict[i_label+1], float(res_dict['TN'][i_label+1])/label_dict[i_label+1]
accuracy = float(res_dict['TP'][i_label+1]+res_dict['TN'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1]+res_dict['FN'][i_label+1]+res_dict['TN'][i_label+1])
precision = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1])
recall = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FN'][i_label+1])
f_measure = (2*recall*precision)/(recall+precision)
print 'Accuracy:', accuracy, 'Precision:', precision, 'Recall:', recall, 'F-measure:', f_measure
#Korrigieren Sie die korrekte Datenbezeichnung auf eine Zahl von 1 bis n
def get_target(labels_info):
result = []
raw_target = labels_info[:,1:]
for line in raw_target:
result.append( np.where(line==1)[0]+1 )
return result
def get_labels():
pass
def get_labels_info(label_fname):
label_flag = []
label_flag_apd = label_flag.append
labels_info = []
labels_info_apd = labels_info.append
with open(label_fname, 'r') as f:
for line in f:
data = line.strip().split('\t')
label_flag_apd(int(data[0]))
labels_info_apd(
np.array(data[1].strip().split(' '), dtype=np.int32 )
)
return np.hstack( (np.array(label_flag).reshape((len(label_flag), 1)), np.array(labels_info)) )
if __name__=='__main__':
ifname = 'DBN_features.csv'
label_fname = 'original_data.csv'
X =np.loadtxt(ifname, delimiter=',')
labels_info = get_labels_info(label_fname)
##typ ist{knn,rbf}Wählen aus
main(X, labels_info, 50, label_num=6, n_neighbors=7, alpha=0.2, typ='knn', threshold=0.5)
Wir entschuldigen uns für die Unannehmlichkeiten, würden uns aber freuen, wenn Sie auf Fehler hinweisen könnten.
Recommended Posts