I've always wanted to do something with twitter data. So, I decided to make a simple classifier using SQLite from the place where MeCab and cabocha are inserted. (I just tried it for fun (ry) Below, I will summarize what I have done.
Please refer to Install mecab-python on Windows for installation method etc. Please. This site is also written in bold, but when rewriting mecab.h, be sure to open it with administrator privileges and make changes. Otherwise, it will not be recognized as changed. I was also addicted to it. .. ..
ZAORIKU-san's way can also use MeCab itself, but if you can't use lattice, you can't use Cabocha, so use MeCab 0.996. I think it's better to be installed.
When retrieving data with the search API, I think it is better to use cursor for the time being and check what kind of things are returned with doc.
If you installed it with the windows installer, even if you select utf-8, the csv file that is the source of the dictionary is still shift-jis (even though the system dictionary is utf-8 ...). Therefore, please refer to the following command when adding to the system dictionary.
python
mecab-dict-index -f shift-jis -t utf-8
You can specify the character code of the csv file with -f and the character code of the system dictionary with -t, so you can register the contents of the csv file of shift-jis in the system dictionary of utf-8 with the above command. (Or, it may be easiest to unify all files to utf-8.)
So far, we have installed the necessary apps, acquired data using the twitter API, and prepared the dictionary necessary for analyzing tweets. The rest is the subject to be classified. What came to my mind was the TV program "Tomorrow, I don't have a mom." It seems that noises such as "Tomorrow, shopping with mom" or "Tomorrow, I don't have a mom, so I'm away from home" will be included in the search results. So I set the search keyword to "Mama tomorrow" and got the data. By the way, please note that the streaming API does not yet support searching for languages with ambiguous delimiters such as Japanese.
After that, label tweets about the program "Tomorrow, I don't have a mom" and those that don't, and let them learn. Please devise the pre-processing method (unicode normalization, capitalization, full-width, etc.), the thing to be acquired (Hash tag, URL, words, ...) according to your own purpose. So this time, I will omit the code about the pre-processing method.
Word is saved in the DB for each tweet so that the forgetting coefficient can be used in the future. Also, as a simple variable selection method, I try not to use words with a spam probability of 0.4 to 0.6. After that, I use Laplace smoothing. This is to reduce noise and prevent 0 values. Then, the code is described below.
python
# coding: utf-8
import sqlite3 as sqlite
import pickle
from math import log, exp
from separatewords import MecabTokenize #I will not put the code, but I am also correcting the notation fluctuation here
class BF(object):
"""Train and test Bayesian classifiers
If the table exists by default, it will be deleted, so
When using an existing DB, create_table=Add 0 to the argument
"""
def __init__(self, fname, dbname, use=0):
"""use in training=0
use in test=1
use in classify=2
"""
self.fname = fname # input file name
self.con = sqlite.connect(dbname)
self.con.text_factory = str # utf-Specify str to use 8
if use==0:
self.createindextables() #Creating a table
self.spam_denominator = 0.0
self.ham_denominator = 0.0
self.ham_weight = 1.0
self.init_pai = 0.4
self.threshold = 0.1
def __del__(self):
self.con.close()
def dbcommit(self):
self.con.commit()
def train(self):
"""Excludes tweets with less than 10 characters"""
with open(self.fname,'r', 1000) as trainf:
for line in trainf:
tid, dtime, aid, tweet, y = line.strip().split('\t')
wordlist = self.get_wordlist(tweet)
#If the sentence is less than 10 characters, mecab will be buggy, so skip it
if wordlist == True: print 'skip: %s' % (tweet); continue
y = int(0) if int(y)<1 else int(1) # spam=1, ham=Unify to 0
self.addtoindex_tweet(tweet, wordlist, y, dtime)
if y==1: self.addtoindex_class(wordlist,'spam_words')
else: self.addtoindex_class(wordlist,'ham_words')
self.addtoindex_score(wordlist)
self.calc_denominator()
self.calc_word_prob()
self.predict()
def test(self, ifname):
"""Perform cross-validation using a trained DB
Excludes tweets with less than 10 characters
"""
with open(ifname, 'r', 1000) as testf:
prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
log_prior_spam = log(prior_spam)
log_prior_ham = log(prior_ham)
res = []
ans = [0.0, 0.0, 0.0, 0.0]
for line in testf:
tid, dtime, aid, tweet, y = line.strip().split('\t')
print 'testing:', tweet
wordlist = self.get_wordlist(tweet)
#If the sentence is less than 10 characters, mecab will be buggy, so skip it
if wordlist == True: print 'skip: %s' % (tweet); continue
y = int(0) if int(y)<1 else int(1) # spam=1, ham=Unify to 0
spam_score = self.pred_score(wordlist,log_prior_spam,log_prior_ham)
res = 1 if spam_score > 0.5 else 0
#Calculation of result table
ans = self.get_ans(ans, y, res)
print ans
def classify(self,clfname,classify_dbname):
"""Excludes tweets with less than 10 characters"""
self.clsfdb_con = sqlite.connect(classify_dbname)
self.create_classified_indextables()
self.clsfdb_con.text_factory = str # utf-Specify str to use 8
with open(clfname, 'r', 1000) as testf:
prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
log_prior_spam = log(prior_spam)
log_prior_ham = log(prior_ham)
for line in testf:
tid, dtime, aid, tweet = line.strip().split('\t')
wordlist = self.get_wordlist(tweet)
#If the sentence is less than 10 characters, mecab will be buggy, so skip it
if wordlist == True: print 'skip: %s' % (tweet); continue
spam_score = self.pred_score(wordlist,log_prior_spam,log_prior_ham)
label = 1 if spam_score > 0.5 else 0
self.addtoindex_classified_table(tweet, wordlist, spam_score, label, dtime)
def pred_score(self,wordlist,log_prior_spam,log_prior_ham):
"""spam_estimate score"""
m = len(wordlist) - 1
psm = m*log_prior_spam
phm = m*log_prior_ham
denom_prior = phm - psm
denom_score = 0.0
for word in wordlist:
w_score = self.con.execute("select spam_score from words_score where word='%s'" % (word)).fetchone()
if w_score is None: w_score = self.init_pai
else: w_score = w_score[0]
if abs(w_score-0.5) > self.threshold:
denom_score += log(1-w_score) - log(w_score)
denom = exp(denom_prior + denom_score)
denom += 1
prob_spam = float(1.0)/denom
print 'spam_probability:', prob_spam
return prob_spam
# return 1 if prob_spam > 0.5 else 0
def get_wordlist(self, tweet):
#If the sentence is less than 10 characters, mecab will be buggy, so skip it
if len(tweet.decode('utf-8')) < 10: return True
wordlist = MecabTokenize.tokenize(tweet)
if wordlist is None: return True
else: return wordlist
def get_ans(self,ans,y,res):
if y==1 and res==1: #True positive
ans[0] += 1
elif y==1 and res==0: #False negative
ans[1] += 1
elif y==0 and res==1: #false positive
ans[2] += 1
else: #True negative
ans[3] += 1
return ans
def predict(self):
"""Find the category affiliation probability of document and determine the category to which it belongs
p(category|document)
"""
#Box for accuracy confirmation
ans = [0.0, 0.0, 0.0, 0.0]
prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
log_prior_spam = log(prior_spam)
log_prior_ham = log(prior_ham)
wordlists = self.con.execute("select wordlist from tweet_master")
true_labels = self.con.execute("select label from tweet_master")
res = []
while 1:
tmp = wordlists.fetchone()
if tmp == None: break
wordlist = pickle.loads( tmp[0] )
m = len(wordlist) - 1
psm = m*log_prior_spam
phm = m*log_prior_ham
denom_prior = phm - psm
denom_score = 0.0
for word in wordlist:
w_score = self.con.execute("select spam_score from words_score where word='%s'" % (word)).fetchone()
if w_score is None: w_score = self.init_pai
else: w_score = w_score[0]
if abs(w_score-0.5) > self.threshold:
denom_score += log(1-w_score) - log(w_score)
denom = exp(denom_prior + denom_score)
denom += 1
prob_spam = float(1.0)/denom
print 'spam_probability:', prob_spam
label = 1 if prob_spam > 0.5 else 0
res.append(label)
ans = self.get_ans(ans, true_labels.fetchone()[0], label)
print ans
print res
def calc_word_prob(self):
"""Score of words in the category(probability)Seeking
p(word_i|category)
"""
#Use Laplace smoothing for calculations
wordlist = self.con.execute("select word from words_score")
while 1:
word = wordlist.fetchone()
if word == None: break
word = word[0]
w_cnt_spam, w_cnt_ham = self.cnt_word_of_cat(word)
spam_prob = float(w_cnt_spam+1)/self.spam_denominator #Plus 1 for Laplace smoothing
ham_prob = min(1, self.ham_weight*float(w_cnt_ham+1)/self.ham_denominator)
spam_score = spam_prob/(spam_prob+ham_prob)
self.update_word_score(word, spam_score)
self.dbcommit()
def calc_denominator(self):
"""Score of words in the category(probability)Find the denominator for the calculation to find
"""
#Use Laplace smoothing for calculations
uniq_cnt_spam, uniq_cnt_ham = self.cnt_uniq_word_of_cat()
total_cnt_spam, total_cnt_ham = self.cnt_total_word_of_cat()
self.spam_denominator = total_cnt_spam + uniq_cnt_spam #Count unique numbers for Laplace smoothing
self.ham_denominator = total_cnt_ham + uniq_cnt_ham
def cnt_word_of_cat(self,word):
"""Count the number of specific words in each category
T(cat,word_i)
"""
w_cnt_spam = self.con.execute("select count(*) from spam_words where word ='%s'" % (word)).fetchone()[0]
w_cnt_ham = self.con.execute("select count(*) from ham_words where word ='%s'" % (word)).fetchone()[0]
if w_cnt_spam is None: w_cnt_spam = 0
if w_cnt_ham is None: w_cnt_ham = 0
return w_cnt_spam, w_cnt_ham
def cnt_uniq_word_of_cat(self):
"""Count the total number of words in each category
p(word_i|cat)Of the denominator|V|
"""
uniq_cnt_spam = self.con.execute("select count(distinct word) from spam_words").fetchone()[0]
uniq_cnt_ham = self.con.execute("select count(distinct word) from ham_words").fetchone()[0]
return uniq_cnt_spam, uniq_cnt_ham
def cnt_total_word_of_cat(self):
"""Sum of the number of occurrences of all words in each category
ΣT(cat, word')
"""
total_cnt_spam = self.con.execute("select count(*) from spam_words").fetchone()[0]
total_cnt_ham = self.con.execute("select count(*) from ham_words").fetchone()[0]
return total_cnt_spam, total_cnt_ham
def calc_cat_prob(self):
""" p(categories)Calculation of"""
cnt_spam_tweet = self.con.execute("select count(*) from tweet_master where label=1").fetchone()[0]
cnt_total_tweet = self.con.execute("select count(*) from tweet_master").fetchone()[0]
cat_prob_spam = float(cnt_spam_tweet)/cnt_total_tweet
return cat_prob_spam, 1.0-cat_prob_spam
def addtoindex_tweet(self, tweet, wordlist, label, dtime):
"""Store tweet"""
# if self.isindexed(tweet): return
print 'Indexing: ' + tweet
#Store word list in DB for each tweet
self.con.execute( "insert into tweet_master values(?,?,?,?)", \
(tweet, pickle.dumps(wordlist), label, dtime) )
self.dbcommit()
def addtoindex_class(self, wordlist, class_table_name):
"""Store words for each class"""
# get tweet_id
tweet_id = self.con.execute("select max(rowid) from tweet_master").fetchone()[0]
# tweet_Store word list for each id in DB
for word in wordlist:
self.con.execute( "insert into %s values(?,?)" % (class_table_name), (tweet_id, word) )
self.dbcommit()
def addtoindex_score(self,wordlist):
"""Save words in score table"""
#Store word list in DB
for word in wordlist:
if self.isindexed(word): continue
else:
self.con.execute( "insert into words_score values(?,?)", (word, self.init_pai) ) #Put a temporary value in score
self.dbcommit()
def addtoindex_classified_table(self, tweet, wordlist, spam_score, label, dtime):
"""Categorize and store unlabeled tweets"""
# if self.isindexed(tweet): return
print 'Classifying: ' + tweet
#Store word list in DB for each tweet
self.clsfdb_con.execute( "insert into tweet_master values(?,?,?,?,?)", \
(tweet, pickle.dumps(wordlist), spam_score, label, dtime) )
self.clsfdb_con.commit()
def isindexed(self,word):
"""Returns ture if tweet is already indexed"""
u=self.con.execute \
("select word from words_score where word='%s'" % (word)).fetchone()
if u!=None: return True
return False
def update_word_score(self,word, spam_score):
"""Find the probability of belonging to each category for each word"""
self.con.execute("update words_score set spam_score=%f where word='%s'" % \
(spam_score, word))
def createindextables(self):
"""Create a database table"""
tnlist = ['tweet_master' ,'spam_words', 'ham_words', 'words_score']
for table_name in tnlist:
sql="SELECT name FROM sqlite_master WHERE type='table' AND name='MYTABLE';" \
.replace('MYTABLE', table_name)
res = self.con.execute(sql).fetchone()
if res is not None: #Confirmation of existence of table
self.con.execute('drop table %s' % (table_name))
self.con.execute('create table tweet_master(tweet, wordlist, label, create_time)') #spam is 1,ham is 0
self.con.execute('create table spam_words(tweet_id, word)')
self.con.execute('create table ham_words(tweet_id, word)')
self.con.execute('create table words_score(word, spam_score)')
self.con.execute('create index tweetidx on tweet_master(tweet)')
self.con.execute('create index spamidx on spam_words(word)')
self.con.execute('create index hamidx on ham_words(word)')
self.con.execute('create index scoreidx on words_score(word)')
self.dbcommit()
def create_classified_indextables(self):
"""Create a database table"""
table_name = 'tweet_master'
sql="SELECT name FROM sqlite_master WHERE type='table' AND name='MYTABLE';" \
.replace('MYTABLE', table_name)
res = self.clsfdb_con.execute(sql).fetchone()
if res is not None: #Confirmation of existence of table
self.clsfdb_con.execute('drop table %s' % (table_name))
self.clsfdb_con.execute('create table tweet_master(tweet, wordlist, spam_score, label, create_time)') #spam is 1,ham is 0
self.clsfdb_con.execute('create index tweetidx on tweet_master(tweet)')
self.clsfdb_con.commit()
if __name__=='__main__':
trfname = 'training data file name'
dbname = 'asumama_bf.db'
bf = BF(trfname, dbname, use=0)
bf.train()
tefname = 'test data file name'
dbname = 'asumama_bf.db'
bf = BF(tefname, dbname, use=1)
bf.test(tefname)
clfname = 'classify data filename'
trained_dbname = 'asumama_bf.db'
classify_dbname = 'asumama_bf_classify.db'
bf = BF(clfname, trained_dbname, use=2)
bf.classify(clfname, classify_dbname)
The data used for learning was 1,000 (1,000 because it was difficult to label). There are 1,200 test data. The accuracy was 96% and the recall (HAM detection rate) was 99%. There were many similar spam tweets, and tweets that had nothing to do with the target were "cakes" and "shopping by car", and the target tweets were "Aina-chan", "interesting", and "Miura". I think that it happened to be such a result because the data was such that it could be classified relatively simply, such as "Shohei ~".
It's a comment like an elementary school student, but it's interesting to play with twitter data. Pre-processing is troublesome, though. .. .. In the future, if I have time, I would like to do something using time series. Follow tourist information. (There is one other story I want to summarize, so I want to somehow summarize it before the start of work at the new job ...)
We apologize for the inconvenience, but if you make a mistake, we would appreciate it if you could point it out.
Recommended Posts