[PYTHON] Expérience pour collecter des tweets pendant une longue période (préparation du programme (5))

Jusqu'à la dernière fois

Remarque: comme j'écris en temps réel, je déraille ou stagne en écrivant.

Autres fonctions ajoutées

Fondamentalement, il a été confirmé qu'il n'y a pratiquement aucun problème de fonctionnement à l'heure actuelle. Dans ce cas, la prochaine chose à faire est de "terminer lorsque la date limite arrive pour éviter d'oublier en cours d'exécution" et "d'éviter la situation où vous ne remarquez pas la fin de l'erreur, contactez par DM" * * Deuxième priorité ** Mise en œuvre de la fonction.

Fonction à terminer le moment venu

Quand je lisais datetime.now () à chaque fois, je pensais qu'il n'y avait pas de truc, mais je ne pouvais pas penser à d'autres moyens, et il n'y avait pas de problème de vitesse, donc je suis resté simple.

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

Avec on_status (), qui coupe le temps comme ça et effectue le traitement au moment de l'acquisition,

        if(datetime.now() > ExitLimit):
            print("Il est temps de finir")
            self.send_message("Se termine avec le temps")
            self.StopFlg = True
            return False

Arrêtez comme ça. Le dernier est (strictement) l'acquisition d'heures supplémentaires, mais non.

Fonction de notification d'erreur par DM

En regardant Stream.py de Tweepy, il existe une méthode appelée API () elle-même, mais il semble que l'objet obtenu avec cela ne puisse pas être utilisé pour la transmission DM. C'est pourquoi le processus principal consiste à appeler tweepy.API () pour obtenir l'objet.

    def send_message(self, message):
        if(message is None) or (self.API is None):
            return
        if(len(DM_DEST) != 0  and len(message) != 0):
            d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
            mes = "[TWEET CRAWLER]\n" + d + "\n" + message
            
            self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
            print('Transmission DM:' + message)
        else:
            print('N'a pas envoyé de DM')
        return

Créons une méthode comme celle-ci et appelons-la lorsqu'un événement se produit.

Autres préoccupations

Et si la base de données tombe en panne?

Je ne pense pas qu'il soit possible que seul MongoDB tombe adroitement, mais juste au cas où. Ajoutez le processus d'écriture dans un fichier texte en cas d'échec.

    def insert(self, json_src):
        #Insérez des données JSON dans Mongo.
        try:
            self.Collection.insert(json_src)        #Stocker ici
            self.dbErrCnt = 0
        except pymongo.errors.PyMongoError as exc:
            #arrêt mystère mongoDB
            self.dbErrCnt += 1
            print('Erreur' + str(self.dbErrCnt) + 'Temps')
            if(self.dbErrCnt < 3):
                self.send_message('Une erreur MongoDB s'est produite.')   #Mesures contre plusieurs hits répétés de même contenu
                #dir(exc)

            with open('MongoInsertError.txt','a') as f:
                f.write(str(json_src) + '\n')
                print('Le retraitement a échoué, écrit dans le fichier')
                f.close()

C'est presque comme ça. J'ai cherché une fonction pour vérifier si la connexion était active, mais je ne me sentais pas comme ça, alors je l'ai omise. En passant, si MongoDB est restauré, il sera reconnecté sans aucune reconfiguration et le stockage des données reprendra, donc c'est probablement le cas (probablement pas)

Comment spécifier des mots clés?

Puisque la zone autour de l'événement (mot caché) </ sub> est appelée par plusieurs mots-clés, il est nécessaire de la spécifier par OU de plusieurs mots-clés.

Si vous définissez la zone, vous pourrez la couvrir. Certains ont à la fois des balises de hachage et non balisées. Selon l'explication de l'API Twitter Streaming, les mots clés japonais non marqués ne seront pas touchés à moins qu'ils ne soient séparés par des espaces, il est donc normal de ne pas avoir de mots clés non marqués ...

TRACKWORD = ['# D51, # locomotive, # locomotive à vapeur, #hill steam, # locomotive, locomotive, steam locomotive'] Spécifiez-le comme ceci (les mots clés sont falsifiés de manière appropriée) </ sub>

stream.filter(track=TRACKWORD)

L'appel est fixé comme ça. Si vous entrez trop de mots-clés, vous serez préoccupé par les messages de spam. En fait, si vous vérifiez avec les mots-clés pour la production, il y a beaucoup de robots d'enchères, et il y a aussi des choses comme "Même si vous dites tout sauvegarder, c'est ennuyeux" (car c'est le moment le plus calme maintenant ) Je comprends.

Puisque le bot utilise généralement un client spécifique, il semble qu'il soit possible de faire référence à la clé "souce" dans le json acquis et de la supprimer en une seule fois. Eh bien, "le nombre de messages du bot" semble être une histoire, je vais donc l'omettre car je le supprimerai plus tard.

Source de production

Voici la version complète que Python de amateur a réalisée en un mois (environ une semaine de travail réel).

TweetCrawler.py


#!/usr/bin/env python
# -*- coding:utf-8 -*-

import tweepy
from tweepy.api import API

import pymongo
from pymongo import MongoClient
import json

from datetime import datetime, timedelta

import sys

#Variables liées à l'accès Twitter
CK = ''                            # Consumer Key
CS = ''   # Consumer Secret
AT = ''   # Access Token
AS = ''        # Accesss Token Secert

TRACKWORD = ['#(Langue cachée)']  #Mots clés lors de l'utilisation du filtre de flux public

DM_DEST = ''      #Destination DM

#Variables liées à la connexion MongoDB
HOST = 'mongo'      #hôte
PORT = 27017            #Port(Défaut:27017)
DB_NAME = 'TwitterDB'   #Nom de la base de données
COL_NAME= 'Twitter'    #Nom de la collection

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

class Listener(tweepy.StreamListener):
    def __init__(self, api=None):   #constructeur
        tweepy.StreamListener.__init__(self)    #Constructeur de classe parent
        print('constructeur')

        self.StopFlg = False          #Drapeau d'arrêt.
        self.mongo_init()

        self.API = None
        
        self.dbErrCnt = 0
        
        print(ExitLimit)

        return

    def mongo_init(self):           #Initialisation de MongoDB
        try:
            Client = MongoClient(HOST, PORT)
            db = Client[DB_NAME]
            self.Collection = db[COL_NAME]
            print('DB prêt')
        except pymongo.errors.PyMongoError as exc:
            #Erreur de connexion
            print('Erreur de connexion à la base de données')
        return

    def on_status(self, status):
        #print('Tweet(' + str(self.TweetCnt) + ')')
        
        self.insert(status._json)
        
        if(datetime.now() > ExitLimit):
            print("Il est temps de finir")
            self.send_message("Se termine avec le temps")
            self.StopFlg = True
            return False

        return True

    def on_error(self, status_code):
        print('Erreur est survenue: ' + str(status_code))
        self.send_message("ERR:" + str(status_code))
        return True

    def on_connect(self):
        print('Connecté')

        self.send_message('Connecté')
        return

    def on_disconnect(self, notice):
        print('Débranché:' + str(notice.code))
        self.send_message("DISCCONECT:" + str(notice.code))
        return

    def on_limit(self, track):
        print('La limite de réception s'est produite:' + str(track))
        self.send_message("RCV_LIMIT:" + str(track))
        return

    def on_timeout(self):
        print('temps libre')
        self.send_message("TIMEOUT")
        return True

    def on_warning(self, notice):
        print('Message d'alerte:' + str(notice.message))
        self.send_message("WARN:" + str(notice.message))
        return

    def on_exception(self, exception):
        print('Erreur d'exception:' + str(exception))
        self.send_message("EXCEPTION:" + str(exception))
        return
        
    def send_message(self, message):
        #Méthode pour envoyer des DM
        if(message is None) or (self.API is None):
            return
        if(len(DM_DEST) != 0  and len(message) != 0):
            d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
            mes = "[TWEET CRAWLER]\n" + d + "\n" + message
            
            self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
            print('Transmission DM:' + message)
        else:
            print('N'a pas envoyé de DM')
        return

    def insert(self, json_src):
        #Entrée JSON dans Mongo.
        try:
            self.Collection.insert(json_src)        #Stocker ici
            self.dbErrCnt = 0
        except pymongo.errors.PyMongoError as exc:
            #Erreur est survenue
            self.dbErrCnt += 1
            print('Erreur' + str(self.dbErrCnt) + 'Temps')
            if(self.dbErrCnt < 3):
                self.send_message('Une erreur MongoDB s'est produite.')   #Empêcher plusieurs notifications avec le même contenu

            #Exporter vers un fichier
            with open('MongoInsertError.txt','a') as f:
                f.write(str(json_src) + '\n')
                print('Le retraitement a échoué, écrit dans le fichier')
                f.close()

#Traitement principal d'ici
auth = tweepy.OAuthHandler(CK, CS)
auth.set_access_token(AT, AS)

ExitCode = 255

while (True):     #boucle infinie
    try:
        listener = Listener()
        stream = tweepy.Stream(auth, listener)
        listener.API = tweepy.API(auth)

        #Sélectionnez-en un et décommentez-le.
        print(TRACKWORD)
        stream.filter(track=TRACKWORD)
        #stream.sample()
        #stream.userstream()

        #Arrêter le jugement par le drapeau d'arrêt
        if(listener.StopFlg == True):
            ExitCode = 0
            break

    except KeyboardInterrupt:
        # CTRL+C
        print('CTRL +Terminez par C.')
        ExitCode = 0
        break
    except:
        print('Erreur de sortie')
        pass    #Ignorer toutes les exceptions et boucle

sys.exit(ExitCode)

C'est une source réservée aux amateurs, mais elle doit être déplacée dans (flou) </ sub> pendant quelques minutes et quelques jours, et même si elle est déplacée pendant un certain temps, aucune erreur ne se produira, donc c'est effrayant, mais cela Je vais le déplacer avec.

$ nohup python TweetCrawler.py >normal.log 2>error.log &

Ça va être un peu comme ça.

Recrutement urgent

** Les points forts et les points à retravailler. ** **

(Continuer.)

Recommended Posts

Expérience de collecte de tweets pendant une longue période (préparation du programme (3))
Expérience pour collecter des tweets pendant une longue période (préparation du programme (1))
Expérience pour collecter des tweets pendant une longue période (préparation du programme (2))
Expérience pour collecter des tweets pendant une longue période (préparation du programme (5))
Expérimentez pour collecter des tweets pendant une longue période (agrégation et confirmation du contenu)
Une méthode d'étude pour les débutants pour apprendre l'analyse des séries chronologiques
Je veux créer un Dockerfile pour le moment.
[Profile] Identifiez les domaines où le programme prend beaucoup de temps (google-perftool)
[Python] Il était très pratique d'utiliser la classe Python pour le programme ROS.
Comment arrêter le programme jusqu'à une date et une heure spécifiques en python
J'ai essayé de créer un linebot (préparation)
Introduction à discord.py (1er jour) -Préparation pour discord.py-
Une solution de contournement simple pour que les robots essaient de publier des tweets avec le même contenu
CentOS 7 avec la configuration LVM prend beaucoup de temps à s'arrêter.