J'ai toujours voulu faire quelque chose avec les données Twitter. J'ai donc décidé de créer un classificateur simple en utilisant SQLite à partir de l'endroit où MeCab et cabocha sont insérés. (Je l'ai juste essayé pour le plaisir (ry) Ci-dessous, je vais résumer ce que j'ai fait.
Pour la méthode d'installation, reportez-vous à Présentation de mecab-python sous Windows S'il vous plaît. Ce site est également écrit en gras, mais lors de la réécriture de mecab.h, assurez-vous de l'ouvrir avec les privilèges d'administrateur et d'apporter des modifications. Sinon, il ne sera pas reconnu comme modifié. J'étais aussi accro. .. ..
À la manière de ZAORIKU-san peut également utiliser MeCab lui-même, mais si vous ne pouvez pas utiliser de treillis, vous ne pouvez pas utiliser Cabocha, alors utilisez MeCab 0.996. Je pense qu'il vaut mieux être installé.
Lors de la récupération de données avec l'API de recherche, je pense qu'il est préférable d'utiliser le curseur pour le moment et de vérifier quel genre de choses sont retournées avec doc.
Lorsqu'il est installé avec le programme d'installation de Windows, le fichier csv qui est la source du dictionnaire reste shift-jis même si utf-8 est sélectionné (même si le dictionnaire système est utf-8 ...). Par conséquent, veuillez vous référer à la commande suivante lors de l'ajout au dictionnaire système.
python
mecab-dict-index -f shift-jis -t utf-8
Vous pouvez spécifier le code de caractère du fichier csv avec -f et le code de caractère du dictionnaire système avec -t, de sorte que vous pouvez enregistrer le contenu du fichier csv de shift-jis dans le dictionnaire système de utf-8 avec la commande ci-dessus. (Ou, il peut être plus simple d'unifier tous les fichiers en utf-8.)
Jusqu'à présent, j'ai installé les applications nécessaires, acquis les données à l'aide de l'API Twitter et préparé le dictionnaire nécessaire à l'analyse des tweets. Le reste est le sujet à classer. Ce qui m'est venu à l'esprit, c'est l'émission télévisée «Demain, je n'ai pas de mère». Il semble que les bruits tels que "Demain, faire du shopping avec maman" ou "Demain, je n'ai pas de maman, donc je suis absent" seront inclus dans les résultats de recherche. J'ai donc défini le mot-clé de recherche sur "Maman demain" et j'ai obtenu les données. À propos, veuillez noter que l'API de streaming ne prend pas encore en charge la recherche de langues avec des délimiteurs ambigus tels que le japonais.
Après cela, étiquetez les tweets sur le programme "Demain, je n'ai pas de maman" et ceux qui n'en ont pas, et laissez-les apprendre. Veuillez concevoir la méthode de pré-traitement (normalisation unicode, capitalisation, conversion pleine largeur, etc.), la chose à acquérir (balise Hash, URL, mots, ...) selon votre propre objectif. Donc, cette fois, je vais omettre le code sur la méthode de pré-traitement.
Les mots sont enregistrés dans la DB pour chaque tweet afin que le coefficient d'oubli puisse être utilisé à l'avenir. De plus, en tant que méthode simple de sélection de variables, j'essaie de ne pas utiliser de mots avec une probabilité de spam de 0,4 à 0,6. Après cela, j'utilise le lissage Laplace. Cela permet de réduire le bruit et d'éviter les valeurs 0. Ensuite, le code est décrit ci-dessous.
python
# coding: utf-8
import sqlite3 as sqlite
import pickle
from math import log, exp
from separatewords import MecabTokenize #Je ne mettrai pas le code, mais je corrige également la fluctuation de notation etc. ici
class BF(object):
"""Former et tester le classificateur bayésien
Si la table existe par défaut, elle sera supprimée, donc
Lorsque vous utilisez une base de données existante, créez_table=Ajouter 0 à l'argument
"""
def __init__(self, fname, dbname, use=0):
"""utilisation en formation=0
utilisation en test=1
utiliser dans classifier=2
"""
self.fname = fname # input file name
self.con = sqlite.connect(dbname)
self.con.text_factory = str # utf-Spécifiez str pour utiliser 8
if use==0:
self.createindextables() #Créer une table
self.spam_denominator = 0.0
self.ham_denominator = 0.0
self.ham_weight = 1.0
self.init_pai = 0.4
self.threshold = 0.1
def __del__(self):
self.con.close()
def dbcommit(self):
self.con.commit()
def train(self):
"""Les tweets de moins de 10 caractères sont exclus"""
with open(self.fname,'r', 1000) as trainf:
for line in trainf:
tid, dtime, aid, tweet, y = line.strip().split('\t')
wordlist = self.get_wordlist(tweet)
#Si la phrase contient moins de 10 caractères, mecab sera bogué, alors sautez-le
if wordlist == True: print 'skip: %s' % (tweet); continue
y = int(0) if int(y)<1 else int(1) # spam=1, ham=Unifier à 0
self.addtoindex_tweet(tweet, wordlist, y, dtime)
if y==1: self.addtoindex_class(wordlist,'spam_words')
else: self.addtoindex_class(wordlist,'ham_words')
self.addtoindex_score(wordlist)
self.calc_denominator()
self.calc_word_prob()
self.predict()
def test(self, ifname):
"""Effectuer une validation croisée à l'aide de la base de données formée
Les tweets de moins de 10 caractères sont exclus
"""
with open(ifname, 'r', 1000) as testf:
prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
log_prior_spam = log(prior_spam)
log_prior_ham = log(prior_ham)
res = []
ans = [0.0, 0.0, 0.0, 0.0]
for line in testf:
tid, dtime, aid, tweet, y = line.strip().split('\t')
print 'testing:', tweet
wordlist = self.get_wordlist(tweet)
#Si la phrase contient moins de 10 caractères, mecab sera bogué, alors sautez-le
if wordlist == True: print 'skip: %s' % (tweet); continue
y = int(0) if int(y)<1 else int(1) # spam=1, ham=Unifier à 0
spam_score = self.pred_score(wordlist,log_prior_spam,log_prior_ham)
res = 1 if spam_score > 0.5 else 0
#Calcul du tableau des résultats
ans = self.get_ans(ans, y, res)
print ans
def classify(self,clfname,classify_dbname):
"""Les tweets de moins de 10 caractères sont exclus"""
self.clsfdb_con = sqlite.connect(classify_dbname)
self.create_classified_indextables()
self.clsfdb_con.text_factory = str # utf-Spécifiez str pour utiliser 8
with open(clfname, 'r', 1000) as testf:
prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
log_prior_spam = log(prior_spam)
log_prior_ham = log(prior_ham)
for line in testf:
tid, dtime, aid, tweet = line.strip().split('\t')
wordlist = self.get_wordlist(tweet)
#Si la phrase contient moins de 10 caractères, mecab sera bogué, alors sautez-le
if wordlist == True: print 'skip: %s' % (tweet); continue
spam_score = self.pred_score(wordlist,log_prior_spam,log_prior_ham)
label = 1 if spam_score > 0.5 else 0
self.addtoindex_classified_table(tweet, wordlist, spam_score, label, dtime)
def pred_score(self,wordlist,log_prior_spam,log_prior_ham):
"""spam_estimation du score"""
m = len(wordlist) - 1
psm = m*log_prior_spam
phm = m*log_prior_ham
denom_prior = phm - psm
denom_score = 0.0
for word in wordlist:
w_score = self.con.execute("select spam_score from words_score where word='%s'" % (word)).fetchone()
if w_score is None: w_score = self.init_pai
else: w_score = w_score[0]
if abs(w_score-0.5) > self.threshold:
denom_score += log(1-w_score) - log(w_score)
denom = exp(denom_prior + denom_score)
denom += 1
prob_spam = float(1.0)/denom
print 'spam_probability:', prob_spam
return prob_spam
# return 1 if prob_spam > 0.5 else 0
def get_wordlist(self, tweet):
#Si la phrase contient moins de 10 caractères, mecab sera bogué, alors sautez-le
if len(tweet.decode('utf-8')) < 10: return True
wordlist = MecabTokenize.tokenize(tweet)
if wordlist is None: return True
else: return wordlist
def get_ans(self,ans,y,res):
if y==1 and res==1: #Vrai positif
ans[0] += 1
elif y==1 and res==0: #Faux négatif
ans[1] += 1
elif y==0 and res==1: #faux positif
ans[2] += 1
else: #Vrai négatif
ans[3] += 1
return ans
def predict(self):
"""Trouvez la probabilité d'affiliation de catégorie du document et déterminez la catégorie à laquelle il appartient
p(category|document)
"""
#Boîte de vérification de la précision
ans = [0.0, 0.0, 0.0, 0.0]
prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
log_prior_spam = log(prior_spam)
log_prior_ham = log(prior_ham)
wordlists = self.con.execute("select wordlist from tweet_master")
true_labels = self.con.execute("select label from tweet_master")
res = []
while 1:
tmp = wordlists.fetchone()
if tmp == None: break
wordlist = pickle.loads( tmp[0] )
m = len(wordlist) - 1
psm = m*log_prior_spam
phm = m*log_prior_ham
denom_prior = phm - psm
denom_score = 0.0
for word in wordlist:
w_score = self.con.execute("select spam_score from words_score where word='%s'" % (word)).fetchone()
if w_score is None: w_score = self.init_pai
else: w_score = w_score[0]
if abs(w_score-0.5) > self.threshold:
denom_score += log(1-w_score) - log(w_score)
denom = exp(denom_prior + denom_score)
denom += 1
prob_spam = float(1.0)/denom
print 'spam_probability:', prob_spam
label = 1 if prob_spam > 0.5 else 0
res.append(label)
ans = self.get_ans(ans, true_labels.fetchone()[0], label)
print ans
print res
def calc_word_prob(self):
"""Score des mots dans la catégorie(probabilité)Cherchant
p(word_i|category)
"""
#Utiliser le lissage de Laplace pour le calcul
wordlist = self.con.execute("select word from words_score")
while 1:
word = wordlist.fetchone()
if word == None: break
word = word[0]
w_cnt_spam, w_cnt_ham = self.cnt_word_of_cat(word)
spam_prob = float(w_cnt_spam+1)/self.spam_denominator #Plus 1 pour le lissage Laplace
ham_prob = min(1, self.ham_weight*float(w_cnt_ham+1)/self.ham_denominator)
spam_score = spam_prob/(spam_prob+ham_prob)
self.update_word_score(word, spam_score)
self.dbcommit()
def calc_denominator(self):
"""Score des mots dans la catégorie(probabilité)Trouvez le dénominateur pour le calcul à trouver
"""
#Utiliser le lissage de Laplace pour le calcul
uniq_cnt_spam, uniq_cnt_ham = self.cnt_uniq_word_of_cat()
total_cnt_spam, total_cnt_ham = self.cnt_total_word_of_cat()
self.spam_denominator = total_cnt_spam + uniq_cnt_spam #Compter les nombres uniques pour le lissage Laplace
self.ham_denominator = total_cnt_ham + uniq_cnt_ham
def cnt_word_of_cat(self,word):
"""Comptez le nombre de mots spécifiques dans chaque catégorie
T(cat,word_i)
"""
w_cnt_spam = self.con.execute("select count(*) from spam_words where word ='%s'" % (word)).fetchone()[0]
w_cnt_ham = self.con.execute("select count(*) from ham_words where word ='%s'" % (word)).fetchone()[0]
if w_cnt_spam is None: w_cnt_spam = 0
if w_cnt_ham is None: w_cnt_ham = 0
return w_cnt_spam, w_cnt_ham
def cnt_uniq_word_of_cat(self):
"""Comptez le nombre total de mots dans chaque catégorie
p(word_i|cat)Du dénominateur|V|
"""
uniq_cnt_spam = self.con.execute("select count(distinct word) from spam_words").fetchone()[0]
uniq_cnt_ham = self.con.execute("select count(distinct word) from ham_words").fetchone()[0]
return uniq_cnt_spam, uniq_cnt_ham
def cnt_total_word_of_cat(self):
"""Somme du nombre d'occurrences de tous les mots dans chaque catégorie
ΣT(cat, word')
"""
total_cnt_spam = self.con.execute("select count(*) from spam_words").fetchone()[0]
total_cnt_ham = self.con.execute("select count(*) from ham_words").fetchone()[0]
return total_cnt_spam, total_cnt_ham
def calc_cat_prob(self):
""" p(categories)Calculs de"""
cnt_spam_tweet = self.con.execute("select count(*) from tweet_master where label=1").fetchone()[0]
cnt_total_tweet = self.con.execute("select count(*) from tweet_master").fetchone()[0]
cat_prob_spam = float(cnt_spam_tweet)/cnt_total_tweet
return cat_prob_spam, 1.0-cat_prob_spam
def addtoindex_tweet(self, tweet, wordlist, label, dtime):
"""Enregistrer le tweet"""
# if self.isindexed(tweet): return
print 'Indexing: ' + tweet
#Enregistrer la liste de mots dans DB pour chaque tweet
self.con.execute( "insert into tweet_master values(?,?,?,?)", \
(tweet, pickle.dumps(wordlist), label, dtime) )
self.dbcommit()
def addtoindex_class(self, wordlist, class_table_name):
"""Stockez les mots pour chaque classe"""
# get tweet_id
tweet_id = self.con.execute("select max(rowid) from tweet_master").fetchone()[0]
# tweet_Enregistrer la liste de mots pour chaque identifiant dans DB
for word in wordlist:
self.con.execute( "insert into %s values(?,?)" % (class_table_name), (tweet_id, word) )
self.dbcommit()
def addtoindex_score(self,wordlist):
"""Enregistrer les mots dans le tableau des scores"""
#Enregistrer la liste de mots dans la base de données
for word in wordlist:
if self.isindexed(word): continue
else:
self.con.execute( "insert into words_score values(?,?)", (word, self.init_pai) ) #Mettre une valeur temporaire dans le score
self.dbcommit()
def addtoindex_classified_table(self, tweet, wordlist, spam_score, label, dtime):
"""Catégoriser et stocker les tweets sans étiquette"""
# if self.isindexed(tweet): return
print 'Classifying: ' + tweet
#Enregistrer la liste de mots dans DB pour chaque tweet
self.clsfdb_con.execute( "insert into tweet_master values(?,?,?,?,?)", \
(tweet, pickle.dumps(wordlist), spam_score, label, dtime) )
self.clsfdb_con.commit()
def isindexed(self,word):
"""Renvoie ture si le tweet est déjà indexé"""
u=self.con.execute \
("select word from words_score where word='%s'" % (word)).fetchone()
if u!=None: return True
return False
def update_word_score(self,word, spam_score):
"""Trouvez la probabilité d'appartenir à chaque catégorie pour chaque mot"""
self.con.execute("update words_score set spam_score=%f where word='%s'" % \
(spam_score, word))
def createindextables(self):
"""Créer une table de base de données"""
tnlist = ['tweet_master' ,'spam_words', 'ham_words', 'words_score']
for table_name in tnlist:
sql="SELECT name FROM sqlite_master WHERE type='table' AND name='MYTABLE';" \
.replace('MYTABLE', table_name)
res = self.con.execute(sql).fetchone()
if res is not None: #Confirmation de l'existence de la table
self.con.execute('drop table %s' % (table_name))
self.con.execute('create table tweet_master(tweet, wordlist, label, create_time)') #le spam vaut 1,jambon vaut 0
self.con.execute('create table spam_words(tweet_id, word)')
self.con.execute('create table ham_words(tweet_id, word)')
self.con.execute('create table words_score(word, spam_score)')
self.con.execute('create index tweetidx on tweet_master(tweet)')
self.con.execute('create index spamidx on spam_words(word)')
self.con.execute('create index hamidx on ham_words(word)')
self.con.execute('create index scoreidx on words_score(word)')
self.dbcommit()
def create_classified_indextables(self):
"""Créer une table de base de données"""
table_name = 'tweet_master'
sql="SELECT name FROM sqlite_master WHERE type='table' AND name='MYTABLE';" \
.replace('MYTABLE', table_name)
res = self.clsfdb_con.execute(sql).fetchone()
if res is not None: #Confirmation de l'existence de la table
self.clsfdb_con.execute('drop table %s' % (table_name))
self.clsfdb_con.execute('create table tweet_master(tweet, wordlist, spam_score, label, create_time)') #le spam vaut 1,jambon vaut 0
self.clsfdb_con.execute('create index tweetidx on tweet_master(tweet)')
self.clsfdb_con.commit()
if __name__=='__main__':
trfname = 'training data file name'
dbname = 'asumama_bf.db'
bf = BF(trfname, dbname, use=0)
bf.train()
tefname = 'test data file name'
dbname = 'asumama_bf.db'
bf = BF(tefname, dbname, use=1)
bf.test(tefname)
clfname = 'classify data filename'
trained_dbname = 'asumama_bf.db'
classify_dbname = 'asumama_bf_classify.db'
bf = BF(clfname, trained_dbname, use=2)
bf.classify(clfname, classify_dbname)
Les données utilisées pour l'apprentissage étaient de 1 000 (1 000 car il était difficile de les étiqueter). Il existe 1 200 données de test. La précision était de 96% et le rappel (taux de détection HAM) était de 99%. Il y avait beaucoup de tweets de spam similaires, et les tweets qui n'avaient rien à voir avec la cible étaient des «gâteaux» et des «achats en voiture», et les tweets cibles étaient «Aina-chan», «intéressant» et «Miura». Je pense que c'était un tel résultat parce que les données étaient telles qu'elles pouvaient être classées relativement simplement, comme "Shohei ~".
C'est un commentaire comme un élève du primaire, mais il est intéressant de jouer avec les données Twitter. Le prétraitement est cependant gênant. .. .. À l'avenir, si j'ai le temps, j'aimerais faire quelque chose en utilisant la série chronologique. Suivez les informations touristiques. (Puisqu'il y a une autre histoire que je veux résumer, je veux en quelque sorte la résumer avant le début du travail au nouvel emploi ...)
Nous vous prions de nous excuser pour la gêne occasionnée, mais si vous faites une erreur, nous vous serions reconnaissants de bien vouloir la signaler.
Recommended Posts