[PYTHON] Experimentieren Sie, um Tweets für eine lange Zeit zu sammeln (Programmvorbereitung (5))

Bis zum letzten Mal

Hinweis: Da ich in Echtzeit schreibe, werde ich beim Schreiben entgleist oder stagniere.

Weitere Funktionen hinzugefügt

Grundsätzlich wurde bestätigt, dass derzeit fast kein Problem im Betrieb ist. In diesem Fall müssen Sie als Nächstes "enden, wenn die Frist abgelaufen ist, um das Vergessen während des Laufens zu verhindern" und "um die Situation zu vermeiden, in der Sie das Ende des Fehlers nicht bemerken, wenden Sie sich an DM" * * Zweite Priorität ** Funktionsimplementierung.

Funktion zum Beenden, wenn die Zeit gekommen ist

Wenn ich datetime.now () jedes Mal las, dachte ich, dass es keinen Trick gibt, aber ich konnte mir keine anderen Mittel vorstellen, und es gab kein Geschwindigkeitsproblem, also hielt ich es einfach.

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

Mit on_status (), das die Zeit so verkürzt und die Verarbeitung zum Zeitpunkt der Erfassung durchführt,

        if(datetime.now() > ExitLimit):
            print("Es ist Zeit zu beenden")
            self.send_message("Endet mit der Zeit")
            self.StopFlg = True
            return False

Hör so auf. Der letzte ist (streng) Überstundenerwerb, aber nein.

Funktion zur Fehlermeldung durch DM

In Stream.py von Tweepy gibt es eine Methode namens API () selbst, aber es scheint, dass das damit erhaltene Objekt nicht für die DM-Übertragung verwendet werden kann. Aus diesem Grund müssen Sie hauptsächlich tweepy.API () aufrufen, um das Objekt abzurufen.

    def send_message(self, message):
        if(message is None) or (self.API is None):
            return
        if(len(DM_DEST) != 0  and len(message) != 0):
            d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
            mes = "[TWEET CRAWLER]\n" + d + "\n" + message
            
            self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
            print('DM-Übertragung:' + message)
        else:
            print('DM nicht gesendet')
        return

Erstellen wir eine Methode wie diese und rufen sie auf, wenn ein Ereignis auftritt.

Andere Bedenken

Was ist, wenn die DB ausfällt?

Ich denke nicht, dass es möglich ist, dass nur MongoDB geschickt fällt, aber nur für den Fall. Fügen Sie den Schreibvorgang zu einer Textdatei hinzu, wenn dies fehlschlägt.

    def insert(self, json_src):
        #Fügen Sie JSON-Daten in Mongo ein.
        try:
            self.Collection.insert(json_src)        #Hier speichern
            self.dbErrCnt = 0
        except pymongo.errors.PyMongoError as exc:
            #mongoDB Mystery Stop
            self.dbErrCnt += 1
            print('Error' + str(self.dbErrCnt) + 'Zeit')
            if(self.dbErrCnt < 3):
                self.send_message('MongoDB-Fehler aufgetreten.')   #Maßnahmen gegen mehrere wiederholte Treffer mit demselben Inhalt
                #dir(exc)

            with open('MongoInsertError.txt','a') as f:
                f.write(str(json_src) + '\n')
                print('Wiederaufbereitung fehlgeschlagen, in Datei geschrieben')
                f.close()

Es ist fast so. Ich habe nach einer Funktion gesucht, um zu überprüfen, ob die Verbindung besteht, aber ich habe mich nicht so gefühlt, also habe ich sie weggelassen. Übrigens, wenn MongoDB wiederhergestellt wird, wird es wieder verbunden, ohne dass eine Neukonfiguration erforderlich ist, und die Datenspeicherung wird wieder aufgenommen. Dies ist wahrscheinlich der Fall (wahrscheinlich nicht).

Wie gebe ich Schlüsselwörter an?

Da der Bereich um das Ereignis (verstecktes Wort) </ sub> von mehreren Schlüsselwörtern aufgerufen wird, muss er durch ODER mehrerer Schlüsselwörter angegeben werden.

Wenn Sie den Bereich festlegen, können Sie ihn abdecken. Einige haben sowohl Hash-Tags als auch Tags ohne Tags. Laut der Erklärung der Twitter Streaming API werden japanische Keywords ohne Tags nur dann getroffen, wenn sie durch Leerzeichen getrennt sind. Es ist also in Ordnung, keine Keywords ohne Tags zu haben ...

TRACKWORD = ['# D51, # Lokomotive, # Dampflokomotive, # Bergdampf, # Lokomotive, Lokomotive, Dampflokomotive'] Geben Sie es wie folgt an: (Schlüsselwörter werden entsprechend manipuliert) </ sub>

stream.filter(track=TRACKWORD)

Der Anruf wird so behoben. Wenn Sie zu viele Keywords eingeben, werden Sie über Spam-Posts besorgt sein. In der Tat, wenn Sie mit den Schlüsselwörtern für die Produktion überprüfen, gibt es viele Auktions-Bots, und es gibt auch Dinge wie "Auch wenn Sie sagen, alles speichern, ist das ärgerlich" (weil es jetzt die ruhigste Zeit ist) ) Ich verstehe.

Da der Bot im Allgemeinen einen bestimmten Client verwendet, scheint es möglich zu sein, auf den Schlüssel "souce" im erworbenen json zu verweisen und alles auf einmal zu löschen. Nun, "die Anzahl der Beiträge von Bot" scheint eine Geschichte zu sein, also werde ich sie weglassen, weil ich sie später löschen werde.

Produktionsquelle

Hier ist die fertige Version, die Python de Amateur in einem Monat erstellt hat (ungefähr eine Woche Arbeit).

TweetCrawler.py


#!/usr/bin/env python
# -*- coding:utf-8 -*-

import tweepy
from tweepy.api import API

import pymongo
from pymongo import MongoClient
import json

from datetime import datetime, timedelta

import sys

#Variablen für den Twitter-Zugriff
CK = ''                            # Consumer Key
CS = ''   # Consumer Secret
AT = ''   # Access Token
AS = ''        # Accesss Token Secert

TRACKWORD = ['#(Versteckte Sprache)']  #Schlüsselwörter beim Betrieb des Public Stream-Filters

DM_DEST = ''      #DM-Ziel

#MongoDB verbindungsbezogene Variablen
HOST = 'mongo'      #Gastgeber
PORT = 27017            #Hafen(Standard:27017)
DB_NAME = 'TwitterDB'   #DB-Name
COL_NAME= 'Twitter'    #Sammlungsname

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

class Listener(tweepy.StreamListener):
    def __init__(self, api=None):   #Konstrukteur
        tweepy.StreamListener.__init__(self)    #Übergeordneter Klassenkonstruktor
        print('Konstrukteur')

        self.StopFlg = False          #Stopp-Flagge.
        self.mongo_init()

        self.API = None
        
        self.dbErrCnt = 0
        
        print(ExitLimit)

        return

    def mongo_init(self):           #Initialisierung von MongoDB
        try:
            Client = MongoClient(HOST, PORT)
            db = Client[DB_NAME]
            self.Collection = db[COL_NAME]
            print('DB bereit')
        except pymongo.errors.PyMongoError as exc:
            #Verbindungsfehler
            print('DB-Verbindungsfehler')
        return

    def on_status(self, status):
        #print('Tweet(' + str(self.TweetCnt) + ')')
        
        self.insert(status._json)
        
        if(datetime.now() > ExitLimit):
            print("Es ist Zeit zu beenden")
            self.send_message("Endet mit der Zeit")
            self.StopFlg = True
            return False

        return True

    def on_error(self, status_code):
        print('Ein Fehler ist aufgetreten: ' + str(status_code))
        self.send_message("ERR:" + str(status_code))
        return True

    def on_connect(self):
        print('In Verbindung gebracht')

        self.send_message('In Verbindung gebracht')
        return

    def on_disconnect(self, notice):
        print('Getrennt:' + str(notice.code))
        self.send_message("DISCCONECT:" + str(notice.code))
        return

    def on_limit(self, track):
        print('Empfangslimit ist aufgetreten:' + str(track))
        self.send_message("RCV_LIMIT:" + str(track))
        return

    def on_timeout(self):
        print('Auszeit')
        self.send_message("TIMEOUT")
        return True

    def on_warning(self, notice):
        print('Warnmeldung:' + str(notice.message))
        self.send_message("WARN:" + str(notice.message))
        return

    def on_exception(self, exception):
        print('Ausnahmefehler:' + str(exception))
        self.send_message("EXCEPTION:" + str(exception))
        return
        
    def send_message(self, message):
        #Methode zum Senden von DM
        if(message is None) or (self.API is None):
            return
        if(len(DM_DEST) != 0  and len(message) != 0):
            d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
            mes = "[TWEET CRAWLER]\n" + d + "\n" + message
            
            self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
            print('DM-Übertragung:' + message)
        else:
            print('DM nicht gesendet')
        return

    def insert(self, json_src):
        #JSON-Eingabe in Mongo.
        try:
            self.Collection.insert(json_src)        #Hier speichern
            self.dbErrCnt = 0
        except pymongo.errors.PyMongoError as exc:
            #Ein Fehler ist aufgetreten
            self.dbErrCnt += 1
            print('Error' + str(self.dbErrCnt) + 'Zeit')
            if(self.dbErrCnt < 3):
                self.send_message('MongoDB-Fehler aufgetreten.')   #Verhindern Sie mehrere Benachrichtigungen mit demselben Inhalt

            #In eine Datei exportieren
            with open('MongoInsertError.txt','a') as f:
                f.write(str(json_src) + '\n')
                print('Wiederaufbereitung fehlgeschlagen, in Datei geschrieben')
                f.close()

#Hauptverarbeitung von hier
auth = tweepy.OAuthHandler(CK, CS)
auth.set_access_token(AT, AS)

ExitCode = 255

while (True):     #Endlosschleife
    try:
        listener = Listener()
        stream = tweepy.Stream(auth, listener)
        listener.API = tweepy.API(auth)

        #Wählen Sie eine aus und kommentieren Sie sie aus.
        print(TRACKWORD)
        stream.filter(track=TRACKWORD)
        #stream.sample()
        #stream.userstream()

        #Stoppt das Urteil mit der Stoppflagge
        if(listener.StopFlg == True):
            ExitCode = 0
            break

    except KeyboardInterrupt:
        # CTRL+C
        print('CTRL +Beenden Sie mit C.')
        ExitCode = 0
        break
    except:
        print('Fehler beenden')
        pass    #Ignoriere alle Ausnahmen und Schleife

sys.exit(ExitCode)

Es ist eine Quelle, die selbst Amateuren nicht helfen kann, aber es ist beängstigend, weil sie für einige Minuten und einige Tage in (Unschärfe) </ sub> verschoben werden muss und kein Fehler auftritt, selbst wenn sie für längere Zeit verschoben wird. Ich werde es mit bewegen.

$ nohup python TweetCrawler.py >normal.log 2>error.log &

Es wird ein bisschen so sein.

Dringende Rekrutierung

** Die Highlights und die Punkte, die überarbeitet werden müssen. ** ** **

(Fortsetzen.)

Recommended Posts

Experimentieren Sie, um Tweets für eine lange Zeit zu sammeln (Programmvorbereitung (3))
Experimentieren Sie, um Tweets für eine lange Zeit zu sammeln (Programmvorbereitung (1))
Experimentieren Sie, um Tweets für eine lange Zeit zu sammeln (Programmvorbereitung (2))
Experimentieren Sie, um Tweets für eine lange Zeit zu sammeln (Programmvorbereitung (5))
Experimentieren Sie, um Tweets über einen längeren Zeitraum zu sammeln (Aggregation und Bestätigung des Inhalts).
Eine Lernmethode für Anfänger zum Erlernen der Zeitreihenanalyse
Ich möchte vorerst eine Docker-Datei erstellen.
[Profil] Identifizieren Sie, wo das Programm lange dauert (google-perftool)
[Python] Es war sehr praktisch, die Python-Klasse für das ROS-Programm zu verwenden.
So stoppen Sie das Programm bis zu einem bestimmten Datum und einer bestimmten Uhrzeit in Python
Ich habe versucht, einen Linebot zu erstellen (Vorbereitung)
Einführung in discord.py (1. Tag) -Preparation for discord.py-
Eine einfache Problemumgehung für Bots, um zu versuchen, Tweets mit demselben Inhalt zu veröffentlichen
Das Herunterfahren von CentOS 7 mit LVM-Konfiguration dauert lange.