Ich wollte schon immer etwas mit Twitter-Daten machen. Daher habe ich beschlossen, einen einfachen Klassifikator mit SQLite an der Stelle zu erstellen, an der MeCab und Cabocha eingefügt werden. (Ich habe es nur zum Spaß versucht (ry) Im Folgenden werde ich zusammenfassen, was ich getan habe.
Informationen zur Installationsmethode finden Sie unter Einführung in Mecab-Python unter Windows. Bitte. Diese Site ist ebenfalls fett gedruckt. Wenn Sie jedoch mecab.h neu schreiben, müssen Sie sie mit Administratorrechten öffnen und Änderungen vornehmen. Andernfalls wird es nicht als geändert erkannt. Ich war auch süchtig danach. .. ..
ZAORIKU-san's way kann auch MeCab selbst verwenden. Wenn Sie jedoch kein Gitter verwenden können, können Sie Cabocha nicht verwenden. Verwenden Sie daher MeCab 0.996. Ich denke, es ist besser installiert zu werden.
Beim Abrufen von Daten mit der Such-API ist es meines Erachtens vorerst besser, den Cursor zu verwenden und zu überprüfen, welche Art von Dingen mit doc zurückgegeben werden.
Wenn Sie es mit dem Windows-Installationsprogramm installieren, ist die CSV-Datei, aus der das Wörterbuch stammt, auch bei Auswahl von utf-8 immer noch shift-jis (obwohl das Systemwörterbuch utf-8 ist ...). Lesen Sie daher beim Hinzufügen zum Systemwörterbuch den folgenden Befehl.
python
mecab-dict-index -f shift-jis -t utf-8
Sie können den Zeichencode der CSV-Datei mit -f und den Zeichencode des Systemwörterbuchs mit -t angeben, um den Inhalt der CSV-Datei von shift-jis mit dem obigen Befehl im Systemwörterbuch von utf-8 zu registrieren. (Oder es ist am einfachsten, alle Dateien mit utf-8 zu vereinheitlichen.)
Bisher habe ich die erforderlichen Apps installiert, die Daten über die Twitter-API erfasst und das für die Analyse von Tweets erforderliche Wörterbuch vorbereitet. Der Rest ist das zu klassifizierende Thema. Was mir in den Sinn kam, war die Fernsehsendung "Morgen habe ich keine Mutter." Es scheint, dass Geräusche wie "Morgen, Einkaufen mit Mutter" oder "Morgen, ich habe keine Mutter, also bin ich weg" in den Suchergebnissen enthalten sind. Also habe ich das Suchwort auf "Mama morgen" gesetzt und die Daten erhalten. Beachten Sie übrigens, dass die Streaming-API die Suche nach Sprachen mit mehrdeutigen Trennzeichen wie Japanisch noch nicht unterstützt.
Beschriften Sie danach Tweets über das Programm "Morgen habe ich keine Mutter" und diejenigen, die dies nicht tun, und lassen Sie sie lernen. Bitte entwickeln Sie die Vorverarbeitungsmethode (Unicode-Normalisierung, Großschreibung, Konvertierung in voller Breite usw.), die zu erwerbende Sache (Hash-Tag, URL, Wörter, ...) für Ihren eigenen Zweck. Deshalb werde ich dieses Mal den Code über die Vorverarbeitungsmethode weglassen.
Für jeden Tweet werden Wörter in der Datenbank gespeichert, damit der Vergesslichkeitskoeffizient in Zukunft verwendet werden kann. Als einfache Methode zur Variablenauswahl versuche ich auch, keine Wörter mit einer Spam-Wahrscheinlichkeit von 0,4 bis 0,6 zu verwenden. Danach verwende ich Laplace-Glättung. Dies dient zur Rauschunterdrückung und zu Gegenmaßnahmen mit 0 Werten. Dann wird der Code unten beschrieben.
python
# coding: utf-8
import sqlite3 as sqlite
import pickle
from math import log, exp
from separatewords import MecabTokenize #Ich werde den Code nicht einfügen, aber ich korrigiere hier auch die Notationsschwankung usw.
class BF(object):
"""Trainiere und teste den Bayes'schen Klassifikator
Wenn die Tabelle standardmäßig vorhanden ist, wird sie gelöscht
Erstellen Sie bei Verwendung einer vorhandenen Datenbank_table=Fügen Sie dem Argument 0 hinzu
"""
def __init__(self, fname, dbname, use=0):
"""Verwendung im Training=0
im Test verwenden=1
Verwendung in klassifizieren=2
"""
self.fname = fname # input file name
self.con = sqlite.connect(dbname)
self.con.text_factory = str # utf-Geben Sie str an, um 8 zu verwenden
if use==0:
self.createindextables() #Eine Tabelle erstellen
self.spam_denominator = 0.0
self.ham_denominator = 0.0
self.ham_weight = 1.0
self.init_pai = 0.4
self.threshold = 0.1
def __del__(self):
self.con.close()
def dbcommit(self):
self.con.commit()
def train(self):
"""Tweets mit weniger als 10 Zeichen sind ausgeschlossen"""
with open(self.fname,'r', 1000) as trainf:
for line in trainf:
tid, dtime, aid, tweet, y = line.strip().split('\t')
wordlist = self.get_wordlist(tweet)
#Wenn der Satz weniger als 10 Zeichen enthält, ist der Mecab fehlerhaft. Überspringen Sie ihn daher
if wordlist == True: print 'skip: %s' % (tweet); continue
y = int(0) if int(y)<1 else int(1) # spam=1, ham=Vereinige dich auf 0
self.addtoindex_tweet(tweet, wordlist, y, dtime)
if y==1: self.addtoindex_class(wordlist,'spam_words')
else: self.addtoindex_class(wordlist,'ham_words')
self.addtoindex_score(wordlist)
self.calc_denominator()
self.calc_word_prob()
self.predict()
def test(self, ifname):
"""Führen Sie eine Kreuzvalidierung mit einer geschulten Datenbank durch
Tweets mit weniger als 10 Zeichen sind ausgeschlossen
"""
with open(ifname, 'r', 1000) as testf:
prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
log_prior_spam = log(prior_spam)
log_prior_ham = log(prior_ham)
res = []
ans = [0.0, 0.0, 0.0, 0.0]
for line in testf:
tid, dtime, aid, tweet, y = line.strip().split('\t')
print 'testing:', tweet
wordlist = self.get_wordlist(tweet)
#Wenn der Satz weniger als 10 Zeichen enthält, ist der Mecab fehlerhaft. Überspringen Sie ihn daher
if wordlist == True: print 'skip: %s' % (tweet); continue
y = int(0) if int(y)<1 else int(1) # spam=1, ham=Vereinige dich auf 0
spam_score = self.pred_score(wordlist,log_prior_spam,log_prior_ham)
res = 1 if spam_score > 0.5 else 0
#Berechnung der Ergebnistabelle
ans = self.get_ans(ans, y, res)
print ans
def classify(self,clfname,classify_dbname):
"""Tweets mit weniger als 10 Zeichen sind ausgeschlossen"""
self.clsfdb_con = sqlite.connect(classify_dbname)
self.create_classified_indextables()
self.clsfdb_con.text_factory = str # utf-Geben Sie str an, um 8 zu verwenden
with open(clfname, 'r', 1000) as testf:
prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
log_prior_spam = log(prior_spam)
log_prior_ham = log(prior_ham)
for line in testf:
tid, dtime, aid, tweet = line.strip().split('\t')
wordlist = self.get_wordlist(tweet)
#Wenn der Satz weniger als 10 Zeichen enthält, ist der Mecab fehlerhaft. Überspringen Sie ihn daher
if wordlist == True: print 'skip: %s' % (tweet); continue
spam_score = self.pred_score(wordlist,log_prior_spam,log_prior_ham)
label = 1 if spam_score > 0.5 else 0
self.addtoindex_classified_table(tweet, wordlist, spam_score, label, dtime)
def pred_score(self,wordlist,log_prior_spam,log_prior_ham):
"""spam_Punktzahl schätzen"""
m = len(wordlist) - 1
psm = m*log_prior_spam
phm = m*log_prior_ham
denom_prior = phm - psm
denom_score = 0.0
for word in wordlist:
w_score = self.con.execute("select spam_score from words_score where word='%s'" % (word)).fetchone()
if w_score is None: w_score = self.init_pai
else: w_score = w_score[0]
if abs(w_score-0.5) > self.threshold:
denom_score += log(1-w_score) - log(w_score)
denom = exp(denom_prior + denom_score)
denom += 1
prob_spam = float(1.0)/denom
print 'spam_probability:', prob_spam
return prob_spam
# return 1 if prob_spam > 0.5 else 0
def get_wordlist(self, tweet):
#Wenn der Satz weniger als 10 Zeichen enthält, ist der Mecab fehlerhaft. Überspringen Sie ihn daher
if len(tweet.decode('utf-8')) < 10: return True
wordlist = MecabTokenize.tokenize(tweet)
if wordlist is None: return True
else: return wordlist
def get_ans(self,ans,y,res):
if y==1 and res==1: #Richtig positiv
ans[0] += 1
elif y==1 and res==0: #Falsch negativ
ans[1] += 1
elif y==0 and res==1: #falsch positiv
ans[2] += 1
else: #Richtig negativ
ans[3] += 1
return ans
def predict(self):
"""Finden Sie die Wahrscheinlichkeit der Kategoriezugehörigkeit des Dokuments und bestimmen Sie die Kategorie, zu der es gehört
p(category|document)
"""
#Box zur Überprüfung der Genauigkeit
ans = [0.0, 0.0, 0.0, 0.0]
prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
log_prior_spam = log(prior_spam)
log_prior_ham = log(prior_ham)
wordlists = self.con.execute("select wordlist from tweet_master")
true_labels = self.con.execute("select label from tweet_master")
res = []
while 1:
tmp = wordlists.fetchone()
if tmp == None: break
wordlist = pickle.loads( tmp[0] )
m = len(wordlist) - 1
psm = m*log_prior_spam
phm = m*log_prior_ham
denom_prior = phm - psm
denom_score = 0.0
for word in wordlist:
w_score = self.con.execute("select spam_score from words_score where word='%s'" % (word)).fetchone()
if w_score is None: w_score = self.init_pai
else: w_score = w_score[0]
if abs(w_score-0.5) > self.threshold:
denom_score += log(1-w_score) - log(w_score)
denom = exp(denom_prior + denom_score)
denom += 1
prob_spam = float(1.0)/denom
print 'spam_probability:', prob_spam
label = 1 if prob_spam > 0.5 else 0
res.append(label)
ans = self.get_ans(ans, true_labels.fetchone()[0], label)
print ans
print res
def calc_word_prob(self):
"""Punktzahl der Wörter in der Kategorie(Wahrscheinlichkeit)Ich suche
p(word_i|category)
"""
#Verwenden Sie zur Berechnung die Laplace-Glättung
wordlist = self.con.execute("select word from words_score")
while 1:
word = wordlist.fetchone()
if word == None: break
word = word[0]
w_cnt_spam, w_cnt_ham = self.cnt_word_of_cat(word)
spam_prob = float(w_cnt_spam+1)/self.spam_denominator #Plus 1 für die Laplace-Glättung
ham_prob = min(1, self.ham_weight*float(w_cnt_ham+1)/self.ham_denominator)
spam_score = spam_prob/(spam_prob+ham_prob)
self.update_word_score(word, spam_score)
self.dbcommit()
def calc_denominator(self):
"""Punktzahl der Wörter in der Kategorie(Wahrscheinlichkeit)Finden Sie den Nenner, den die Berechnung finden soll
"""
#Verwenden Sie zur Berechnung die Laplace-Glättung
uniq_cnt_spam, uniq_cnt_ham = self.cnt_uniq_word_of_cat()
total_cnt_spam, total_cnt_ham = self.cnt_total_word_of_cat()
self.spam_denominator = total_cnt_spam + uniq_cnt_spam #Zählen Sie eindeutige Zahlen für die Laplace-Glättung
self.ham_denominator = total_cnt_ham + uniq_cnt_ham
def cnt_word_of_cat(self,word):
"""Zählen Sie die Anzahl bestimmter Wörter in jeder Kategorie
T(cat,word_i)
"""
w_cnt_spam = self.con.execute("select count(*) from spam_words where word ='%s'" % (word)).fetchone()[0]
w_cnt_ham = self.con.execute("select count(*) from ham_words where word ='%s'" % (word)).fetchone()[0]
if w_cnt_spam is None: w_cnt_spam = 0
if w_cnt_ham is None: w_cnt_ham = 0
return w_cnt_spam, w_cnt_ham
def cnt_uniq_word_of_cat(self):
"""Zählen Sie die Gesamtzahl der Wörter in jeder Kategorie
p(word_i|cat)Vom Nenner|V|
"""
uniq_cnt_spam = self.con.execute("select count(distinct word) from spam_words").fetchone()[0]
uniq_cnt_ham = self.con.execute("select count(distinct word) from ham_words").fetchone()[0]
return uniq_cnt_spam, uniq_cnt_ham
def cnt_total_word_of_cat(self):
"""Summe der Anzahl der Vorkommen aller Wörter in jeder Kategorie
ΣT(cat, word')
"""
total_cnt_spam = self.con.execute("select count(*) from spam_words").fetchone()[0]
total_cnt_ham = self.con.execute("select count(*) from ham_words").fetchone()[0]
return total_cnt_spam, total_cnt_ham
def calc_cat_prob(self):
""" p(categories)Berechnung von"""
cnt_spam_tweet = self.con.execute("select count(*) from tweet_master where label=1").fetchone()[0]
cnt_total_tweet = self.con.execute("select count(*) from tweet_master").fetchone()[0]
cat_prob_spam = float(cnt_spam_tweet)/cnt_total_tweet
return cat_prob_spam, 1.0-cat_prob_spam
def addtoindex_tweet(self, tweet, wordlist, label, dtime):
"""Tweet speichern"""
# if self.isindexed(tweet): return
print 'Indexing: ' + tweet
#Speichern Sie die Wortliste für jeden Tweet in der Datenbank
self.con.execute( "insert into tweet_master values(?,?,?,?)", \
(tweet, pickle.dumps(wordlist), label, dtime) )
self.dbcommit()
def addtoindex_class(self, wordlist, class_table_name):
"""Speichern Sie Wörter für jede Klasse"""
# get tweet_id
tweet_id = self.con.execute("select max(rowid) from tweet_master").fetchone()[0]
# tweet_Speichern Sie die Wortliste für jede ID in der Datenbank
for word in wordlist:
self.con.execute( "insert into %s values(?,?)" % (class_table_name), (tweet_id, word) )
self.dbcommit()
def addtoindex_score(self,wordlist):
"""Speichern Sie die Wörter in der Punktetabelle"""
#Wortliste in DB speichern
for word in wordlist:
if self.isindexed(word): continue
else:
self.con.execute( "insert into words_score values(?,?)", (word, self.init_pai) ) #Geben Sie einen temporären Wert in die Punktzahl ein
self.dbcommit()
def addtoindex_classified_table(self, tweet, wordlist, spam_score, label, dtime):
"""Kategorisieren und speichern Sie unbeschriftete Tweets"""
# if self.isindexed(tweet): return
print 'Classifying: ' + tweet
#Speichern Sie die Wortliste für jeden Tweet in der Datenbank
self.clsfdb_con.execute( "insert into tweet_master values(?,?,?,?,?)", \
(tweet, pickle.dumps(wordlist), spam_score, label, dtime) )
self.clsfdb_con.commit()
def isindexed(self,word):
"""Gibt ture zurück, wenn der Tweet bereits indiziert ist"""
u=self.con.execute \
("select word from words_score where word='%s'" % (word)).fetchone()
if u!=None: return True
return False
def update_word_score(self,word, spam_score):
"""Finden Sie die Wahrscheinlichkeit, zu jeder Kategorie für jedes Wort zu gehören"""
self.con.execute("update words_score set spam_score=%f where word='%s'" % \
(spam_score, word))
def createindextables(self):
"""Erstellen Sie eine Datenbanktabelle"""
tnlist = ['tweet_master' ,'spam_words', 'ham_words', 'words_score']
for table_name in tnlist:
sql="SELECT name FROM sqlite_master WHERE type='table' AND name='MYTABLE';" \
.replace('MYTABLE', table_name)
res = self.con.execute(sql).fetchone()
if res is not None: #Bestätigung der Existenz der Tabelle
self.con.execute('drop table %s' % (table_name))
self.con.execute('create table tweet_master(tweet, wordlist, label, create_time)') #Spam ist 1,Schinken ist 0
self.con.execute('create table spam_words(tweet_id, word)')
self.con.execute('create table ham_words(tweet_id, word)')
self.con.execute('create table words_score(word, spam_score)')
self.con.execute('create index tweetidx on tweet_master(tweet)')
self.con.execute('create index spamidx on spam_words(word)')
self.con.execute('create index hamidx on ham_words(word)')
self.con.execute('create index scoreidx on words_score(word)')
self.dbcommit()
def create_classified_indextables(self):
"""Erstellen Sie eine Datenbanktabelle"""
table_name = 'tweet_master'
sql="SELECT name FROM sqlite_master WHERE type='table' AND name='MYTABLE';" \
.replace('MYTABLE', table_name)
res = self.clsfdb_con.execute(sql).fetchone()
if res is not None: #Bestätigung der Existenz der Tabelle
self.clsfdb_con.execute('drop table %s' % (table_name))
self.clsfdb_con.execute('create table tweet_master(tweet, wordlist, spam_score, label, create_time)') #Spam ist 1,Schinken ist 0
self.clsfdb_con.execute('create index tweetidx on tweet_master(tweet)')
self.clsfdb_con.commit()
if __name__=='__main__':
trfname = 'training data file name'
dbname = 'asumama_bf.db'
bf = BF(trfname, dbname, use=0)
bf.train()
tefname = 'test data file name'
dbname = 'asumama_bf.db'
bf = BF(tefname, dbname, use=1)
bf.test(tefname)
clfname = 'classify data filename'
trained_dbname = 'asumama_bf.db'
classify_dbname = 'asumama_bf_classify.db'
bf = BF(clfname, trained_dbname, use=2)
bf.classify(clfname, classify_dbname)
Die zum Lernen verwendeten Daten waren 1.000 (1.000, weil es schwierig war, sie zu kennzeichnen). Es gibt 1.200 Testdaten. Die Genauigkeit betrug 96% und der Rückruf (HAM-Erkennungsrate) betrug 99%. Es gab viele ähnliche Spam-Tweets, und Tweets, die nichts mit dem Ziel zu tun hatten, waren "Kuchen" und "Einkaufen mit dem Auto", und die Ziel-Tweets waren "Aina-chan", "interessant" und "Miura". Ich denke, es war zufällig ein solches Ergebnis, weil die Daten so waren, dass sie relativ einfach klassifiziert werden konnten, wie zum Beispiel "Shohei ~".
Es ist ein Kommentar wie bei einem Grundschüler, aber es ist interessant, mit Twitter-Daten zu spielen. Die Vorverarbeitung ist jedoch mühsam. .. .. Wenn ich in Zukunft Zeit habe, möchte ich etwas mit der Zeitreihe machen. Folgen Sie den touristischen Informationen. (Es gibt noch eine andere Geschichte, die ich zusammenfassen möchte, also möchte ich sie irgendwie zusammenfassen, bevor ich mit der Arbeit am neuen Job beginne ...)
Wir entschuldigen uns für die Unannehmlichkeiten, aber wenn Sie einen Fehler machen, würden wir uns freuen, wenn Sie darauf hinweisen könnten.
Recommended Posts