Continue to retrieve tweets containing specific keywords using the Streaming API in Python

background

The Twitter API has usage restrictions and cannot be used more than the specified number of times during a specific period. In the following article, tweets are searched using the search / tweets API, but the maximum number that can be acquired with one API is 100, and it can only be executed 180 times in 15 minutes.

** Search Twitter using Python ** http://qiita.com/mima_ita/items/ba59a18440790b12d97e

This is fine if there is little data, but it is not suitable for the usage of "continuing to search for hashtags including #general elections during election programs".

Therefore, Twitter provides the Streaming API as a way to constantly acquire data.

The Streaming APIs https://dev.twitter.com/streaming/overview

Streaming API The Streaming API may keep developers getting Twitter information with little delay.

There are three main types of Streaming API.

name Description
Public streams You can get the public Twitter data.
Can be filtered by keyword or locationfilterIs available
User streams Gets the data and events for a particular authenticated user.
Site streams Get data for multiple users. Currently in beta. Access is restricted to whitelist accounts.

Python sample

Purpose

Continue to register specific keywords in the DB with Python.

Prerequisite library

python_twitter https://code.google.com/p/python-twitter/ A library for operating Twitter with Python.

peewee https://github.com/coleifer/peewee An ORM that can use sqlite, postgres, and mysql.

Sample code

# -*- coding: utf-8 -*-
# easy_install python_twitter
import twitter
import sys
import codecs
import dateutil.parser
import datetime
import time
from peewee import *


db = SqliteDatabase('twitter_stream.sqlite')


class Twitte(Model):
    createAt = DateTimeField(index=True)
    idStr = CharField(index=True)
    contents = CharField()

    class Meta:
        database = db


# easy_If you don't have GetStreamFilter in the latest version of install, add the following code
# https://github.com/bear/python-twitter/blob/master/twitter/api.py
def GetStreamFilter(api,
                    follow=None,
                    track=None,
                    locations=None,
                    delimited=None,
                    stall_warnings=None):
    '''Returns a filtered view of public statuses.

    Args:
      follow:
        A list of user IDs to track. [Optional]
      track:
        A list of expressions to track. [Optional]
      locations:
        A list of Latitude,Longitude pairs (as strings) specifying
        bounding boxes for the tweets' origin. [Optional]
      delimited:
        Specifies a message length. [Optional]
      stall_warnings:
        Set to True to have Twitter deliver stall warnings. [Optional]

    Returns:
      A twitter stream
    '''
    if all((follow is None, track is None, locations is None)):
        raise ValueError({'message': "No filter parameters specified."})
    url = '%s/statuses/filter.json' % api.stream_url
    data = {}
    if follow is not None:
        data['follow'] = ','.join(follow)
    if track is not None:
        data['track'] = ','.join(track)
    if locations is not None:
        data['locations'] = ','.join(locations)
    if delimited is not None:
        data['delimited'] = str(delimited)
    if stall_warnings is not None:
        data['stall_warnings'] = str(stall_warnings)

    json = api._RequestStream(url, 'POST', data=data)
    for line in json.iter_lines():
        if line:
            data = api._ParseAndCheckTwitter(line)
            yield data


def main(argvs, argc):
    if argc != 6:
        print ("Usage #python %s consumer_key consumer_secret access_token_key access_token_secret #tag1,#tag2 " % argvs[0])
        return 1
    consumer_key = argvs[1]
    consumer_secret = argvs[2]
    access_token_key = argvs[3]
    access_token_secret = argvs[4]
    #The character code to be converted to UNICODE is matched to the target terminal.
    track = argvs[5].decode('cp932').split(',')

    db.create_tables([Twitte], True)

    api = twitter.Api(base_url="https://api.twitter.com/1.1",
                      consumer_key=consumer_key,
                      consumer_secret=consumer_secret,
                      access_token_key=access_token_key,
                      access_token_secret=access_token_secret)
    for item in GetStreamFilter(api, track=track):
        print '---------------------'
        if 'text' in item:
            print (item['id_str'])
            print (dateutil.parser.parse(item['created_at']))
            print (item['text'])
            print (item['place'])
            row = Twitte(createAt=dateutil.parser.parse(item['created_at']),
                         idStr=item['id_str'],
                         contents=item['text'])
            row.save()
            row = None

if __name__ == '__main__':
    sys.stdout = codecs.getwriter(sys.stdout.encoding)(sys.stdout, errors='backslashreplace')
    argvs = sys.argv
    argc = len(argvs)
    sys.exit(main(argvs, argc))

How to use

python twitter_stream.py consumer_key consumer_secret access_token_key access_token_secret  #election,#House of Representatives election,election

If you specify the access token information and keywords, a SQLITE database called twitter_stream.sqlite will be created in the current directory.

Description

-The latest code provides the GetStreamFilter method, but it doesn't exist in the version that can be obtained with easy_install in Python 2.7. I'm implementing the same code here.

-Since it is running on Windows here, the track variable is converted with cp932, but please match this with the character code of the terminal.

-It doesn't matter when a large amount of data is flowing, but there is a delay of several minutes in acquiring the data to be filtered last tweeted.

-Basically, it is better to register the database and perform time-consuming processing in a separate process.

-Past data cannot be obtained, so it is necessary to make it a resident process.

-Since created_at is UTC time, it is 9 hours later than Japanese time. The time registered here plus 9 hours will be the time in Japan.

Recommended Posts

Continue to retrieve tweets containing specific keywords using the Streaming API in Python
Get tweets containing keywords using Python Tweepy
Try using the Kraken API in Python
Tweet using the Twitter API in Python
Regularly upload files to Google Drive using the Google Drive API in Python
Try using the BitFlyer Ligntning API in Python
Try using the DropBox Core API in Python
To automatically send an email with an attachment using the Gmail API in Python
How to retrieve the nth largest value in Python
Initial settings when using the foursquare API in python
Try to delete tweets in bulk using Twitter API
How to retrieve multiple arrays using slice in python.
Using the National Diet Library Search API in Python
Get tweets with arbitrary keywords using Twitter's Streaming API
How to unit test a function containing the current time using freezegun in python
A script that transfers tweets containing specific Twitter keywords to Slack in real time
Tweet Now Playing to Twitter using the Spotify API. [Python]
A Python program that collects tweets containing specific keywords daily and saves them in csv
Getting the arXiv API in Python
Hit the Sesami API in Python
Hit the web API in Python
Access the Twitter API in Python
An easy way to hit the Amazon Product API in Python
How to get followers and followers from python using the Mastodon API
Hit the New Relic API in Python to get the server status
Mouse operation using Windows API in Python
Log in to Slack using requests in Python
Sample code to get the Twitter API oauth_token and oauth_token_secret in Python 2.7
Convert the cURL API to a Python script (using IBM Cloud object storage)
How to use the C library in Python
Procedure to use TeamGant's WEB API (using python)
Get image URL using Flickr API in Python
Twitter streaming client to enjoy in the terminal
To dynamically replace the next method in python
Draw graphs in Julia ... Leave the graphs to Python
Tips for hitting the ATND API in Python
Let's judge emotions using Emotion API in Python
The trick to write flatten concisely in python
How to get the files in the [Python] folder
Hit the Firebase Dynamic Links API in Python
Try using ChatWork API and Qiita API in Python
I want to display the progress in Python!
A story about a Python beginner trying to get Google search results using the API
Save your heart rate to SpreadSheets in real time (?) Using Python x fitbit API!
I tried to graph the packages installed in Python
How to get the variable name itself in python
How to get the number of digits in Python
Upload JPG file using Google Drive API in Python
How to know the current directory in Python in Blender
[AWS IoT] Register things in AWS IoT using the AWS IoT Python SDK
Push notifications from Python to Android using Google's API
Determine the threshold using the P tile method in python
Get LEAD data using Marketo's REST API in Python
Convert the image in .zip to PDF with Python
Send and receive Gmail via the Gmail API using Python
Write data to KINTONE using the Python requests module
OpenVINO using Inference Engine Python API in PC environment
I want to write in Python! (3) Utilize the mock
How to exit when using Python in Terminal (Mac)
How to use the model learned in Lobe in Python
How to execute a command using subprocess in Python