Ich habe die Idee, dass SNS-Informationen wie Twitter zur Überwachung des Risikos des Auftretens neuer Corona-Virus-Infektionscluster verwendet werden können, und ich möchte Tweets per Stichwortsuche wie "Heutige Trinkparty" sammeln. Da die kostenlose Such-API Tweets erst vor einer Woche verfolgen kann, werden wir einen Mechanismus entwickeln, mit dem Daten jeden Tag automatisch erfasst werden, wobei die Möglichkeit in Betracht gezogen wird, sie für zukünftige Forschungen zu verwenden.
Wenn Sie eine gute Vorstellung von einem Suchwort haben, mit dem das Risiko der Entwicklung eines neuen mit Corona-Viren infizierten Clusters bewertet werden kann, kommentieren Sie dies bitte! </ b>
Urheber https://developer.twitter.com/en/docs/twitter-api Sehr leicht verständliche Kommentarseite https://gaaaon.jp/blog/twitterapi Es ist traurig, dass dieser Artikel nicht einmal den "versuchten autarken Code" im obigen Link erreicht hat. In einigen Fällen kann der Artikel daher privat gehalten werden.
Führen Sie die folgende nomikai_tweets.py
aus.
# coding: utf-8
# nomikai_tweets.py
import pandas as pd
import json
import schedule
from time import sleep
from requests_oauthlib import OAuth1Session
import datetime
from datetime import date, timedelta
import pytz
def convert_to_datetime(datetime_str):
"""
Konvertierung des Datums- und Uhrzeitformats
"""
tweet_datetime = datetime.datetime.strptime(datetime_str,'%a %b %d %H:%M:%S %z %Y')
return(tweet_datetime)
def job():
"""
Programm, das sich in main wiederholt
"""
#Suchbegriff Ausgenommen Retweets
keyword = "Trinkparty heute ausschließen:retweets"
#Verzeichnis speichern Zuerst erstellen
DIR = 'nomikai/'
#Informationen, die durch API-Kommunikation und Entwicklerregistrierung erhalten wurden
Consumer_key = 'bT*****************'
Consumer_secret = 've*****************'
Access_token = '25*****************'
Access_secret = 'NT*****************'
url = "https://api.twitter.com/1.1/search/tweets.json"
twitter = OAuth1Session(Consumer_key, Consumer_secret, Access_token, Access_secret)
#Für die Erfassung verwendete Parameter
max_id = -1
count = 100
params = {'q' : keyword, 'count' : count, 'max_id' : max_id, 'lang' : 'ja', 'tweet_mode' : 'extended'}
#Vorbereitung zum Vergleichen von Datumsverarbeitung utc und jst in japanischer Zeit
today =datetime.datetime.now(pytz.timezone('Asia/Tokyo'))
today_beggining_of_day = today.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_beggining_of_day = today_beggining_of_day - timedelta(days=1)
yesterday_str = datetime.datetime.strftime(yesterday_beggining_of_day, '%Y-%m-%d')
#Entspricht der Aufzeichnung in der DF while-Anweisung, in der Tweet-Informationen gespeichert sind
columns = ['time', 'user.id', 'user.location', 'full_text', 'user.followers_count', 'user.friends_count', 'user.description', 'id']
df = pd.DataFrame(index=[], columns=columns)
while(True):
if max_id != -1: #Gehen Sie zurück zu dem Tweet, in dem die Tweet-ID bereits gespeichert ist
params['max_id'] = max_id - 1
req = twitter.get(url, params = params)
if req.status_code == 200: #Wenn Sie es normal bekommen können
search_timeline = json.loads(req.text)
if search_timeline['statuses'] == []: #Nachdem Sie alle Tweets genommen haben
break
for tweet in search_timeline['statuses']:
#Tweet Zeit utc
tweet_datetime = convert_to_datetime(tweet['created_at'])
#Überspringen, wenn nicht der gestrige Tweet
in_jst_yesterday = today_beggining_of_day > tweet_datetime >= yesterday_beggining_of_day
if not in_jst_yesterday: #Überspringen, wenn nicht der gestrige Tweet
continue
#In DF speichern
record = pd.Series([tweet_datetime,
tweet['user']['id'],
tweet['user']['location'],
tweet['full_text'],
tweet['user']['followers_count'],
tweet['user']['friends_count'],
tweet['user']['description'],
tweet['id'],
],index=df.columns)
df = df.append(record, ignore_index=True)
max_id = search_timeline['statuses'][-1]['id']
else: #Warten Sie 15 Minuten, wenn Sie in den Einschränkungen der Zugriffshäufigkeit stecken bleiben
print("Total", df.shape[0], "tweets were extracted", sep=" ")
print('wainting for 15 min ...')
sleep(15*60)
#sparen
df = df.set_index("time")
df.index = df.index.tz_convert('Asia/Tokyo')
df.to_pickle(DIR + yesterday_str + keyword +".pkl")
df.to_csv(DIR + yesterday_str + keyword +".csv")
print(today, "Total", df.shape[0], "tweets were extracted!\nnext start at 01:00 tommorow")
def main():
print("start at 01:00 tommorow")
#Laufen Sie jeden Tag um 01:00 Uhr
schedule.every().day.at("01:00").do(job)
while True:
schedule.run_pending()
sleep(1)
if __name__ == '__main__':
main()
Ich möchte analysieren, sobald die Daten gesammelt sind. Dies liegt daran, dass es nicht möglich ist, auch nur die periodischen Änderungen je nach Tag zu bewerten, indem nur die vergangenen Daten für eine Woche verwendet werden.
Ich habe gelernt, dass ich vorsichtig mit der japanischen Zeit und der Standardzeit umgehen muss, um jeden Tag Daten zu sammeln.
Beachten Sie, dass "datetime.datetime.now ()" von der Umgebung abhängt, in der das Programm ausgeführt wird. Daher funktioniert das Ausführen dieser Quelle auf einem Computer in einem anderen Land nicht ordnungsgemäß. Gleiches gilt für schedule.every (). Day.at (" 01: 00 "). Do (job)
.
Von den Tweets, die die Vergangenheit "heute" und "Trinkparty" enthielten, die extrahiert werden konnten, war "online" in etwa 10% enthalten. Außerdem mögen viele Twitterer keine Firmen-Trinkpartys.
Recommended Posts