ETL processing for a large number of GTFS Realtime files (Python edition)

Great ancestor

Already Professor Ito in How to merge GTFS files and put them into PostgreSQL + PostGIS , How to use tools to input data to PostgreSQL is posted.

This motivation

Now that I've got information that Japanese GTFS data is stored in GCS, I'm ready to play with that data.

What to do this time

A large number of protocol buffers format files stored in Google Cloud Strage, which is a file service of Google Clound Platform, are downloaded at once and converted into a set of data. スクリーンショット 2020-08-08 21.28.41.png

STEP1: Download data from GCS using gsutil STEP2: Convert files to structured data using python. There are two types of conversion targets: Data Frame format and CSV.

Implementation

About the environment

This code is supposed to be used by pasting it in Jupyter lab etc. It cannot be used as a tool by itself, nor as a library. The implemented environment is Mac.

STEP1 Data copy using gsutil

GCP also has a tool for python, but since account setting etc. is complicated, use the command line tool gsutil And download all the files in the folder locally.

dowload.sh


gsutil cp -r gs://BACKET_NAME/realtime/unobus.co.jp/vehicle_position/2020/20200801/ ~/dev/unobus/

Commentary

-- gsutil cp is similar to the UNIX cp command. ---r Options to process recursively --gs: // BACKET_NAME / realtime / un ....This is the copy source. This time a dummy string --~ / dev / unobus / Copy destination

In general, gsutil -m cp works multithreaded and speeds up. However, it didn't work because of the access authority problem of the bucket this time.

STEP2 File reading and structuring

Convert to data frame

pq2df.py


from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()

start = time.time()
i = 0;
temp_dict = {}
for file in files:
    with open(path+'/'+file, 'rb') as f:
        data = f.read()
        feed.ParseFromString(data)
        for entity in feed.entity:
            temp_dict[i] = [
                  entity.id,                         #Vehicle ID
                  entity.vehicle.vehicle.id,         #Vehicle number
                  entity.vehicle.trip.trip_id,       #Route number?
                  entity.vehicle.timestamp,          #Vehicle time
                  entity.vehicle.position.longitude, #Vehicle latitude
                  entity.vehicle.position.latitude,  #Vehicle longitude
                  entity.vehicle.occupancy_status #Congestion degree
            ]
            i +=1
df = pd.DataFrame.from_dict(temp_dict, orient='index',columns=['id' , 'vehicle_id', 'trip_id','vehicle_timestamp','longitude','latitude','occupancy_status'])
elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

Output to CSV

pq2csv.py


from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
csvfilename = 'unobus_20200801.csv'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()
with open(csvfilename, 'a') as csv :
    start = time.time()
    for file in files:
        with open(path+'/'+file, 'rb') as f:
            data = f.read()
            feed.ParseFromString(data)
            for entity in feed.entity:
                print(
                    entity.id,                         #Vehicle ID
                    entity.vehicle.vehicle.id,         #Vehicle number
                    entity.vehicle.trip.trip_id,       #Route number?
                    entity.vehicle.timestamp,          #Vehicle time
                    entity.vehicle.position.longitude, #Vehicle latitude
                    entity.vehicle.position.latitude,  #Vehicle longitude
                    entity.vehicle.occupancy_status,   #Congestion degree
                    sep=',',file=csv)
    elapsed_time = time.time() - start
    print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

Convert to failed data frame (ultra-slow)

The code is 220 times slower than pq2df.py. The cause is df.append.

pq2df_VeryLowSpeed.py


from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()

df = pd.DataFrame(columns=['id' , 'vehicle_id', 'trip_id','vehicle_timestamp','longitude','latitude','occupancy_status'])

start = time.time()
for file in files:
    with open(path+'/'+file, 'rb') as f:
        data = f.read()
        feed.ParseFromString(data)
        for entity in feed.entity:
            tmp_se = pd.Series( [
                  entity.id,                         #Vehicle ID
                  entity.vehicle.vehicle.id,         #Vehicle number
                  entity.vehicle.trip.trip_id,       #Route number?
                  entity.vehicle.timestamp,          #Vehicle time
                  entity.vehicle.position.longitude, #Vehicle latitude
                  entity.vehicle.position.latitude,  #Vehicle longitude
                  entity.vehicle.occupancy_status #Congestion degree
            ], index=df.columns )
            df = df.append( tmp_se, ignore_index=True ) #This is no good! !!

elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

At the end

We carried out the process from downloading to converting GTFS past data stored in the cloud to structured data. The process of adding data with DataFrame was very slow depending on the writing method, and I had a hard time.

It's a little power, but I hope it can contribute to the utilization of bus data.

Recommended Posts

ETL processing for a large number of GTFS Realtime files (Python edition)
Organize a large number of files into folders
[Python] Randomly generate a large number of English names
Consolidate a large number of CSV files in folders with python (data without header)
Paste a large number of image files into PowerPoint [python-pptx]
Scrapy-Redis is recommended for crawling a large number of domains
Executing a large number of Python3 Executor.submit may consume a lot of memory
I made a lot of files for RDP connection with Python
Convert a large number of PDF files to text files using pdfminer
Connect a large number of videos together!
The story of blackjack A processing (python)
One-liner to create a large number of test files at once on Linux
Upload a large number of images to Wordpress
Image processing? The story of starting Python for
Check the processing time and the number of calls for each process in python (cProfile)
Accelerate a large number of simple queries with MySQL
[Python] A program that counts the number of valleys
Python: Get a list of methods for an object
A * algorithm (Python edition)
Various processing of Python
Lambda + Python is good at restricting access with a large number of IP address lists
Get the number of specific elements in a python list
A set of script files that do wordcloud in Python3
Turn an array of strings with a for statement (Python3)
[Python] Easy reading of serial number image files with OpenCV
Post processing of python (NG)
[Python] Iterative processing (for, while)
A memorandum of understanding for the Python package management tool ez_setup
Build and test a CI environment for multiple versions of Python
A brief summary of Graphviz in python (explained only for mac)
TensorFlow To learn from a large number of images ... ~ (almost) solution ~
The story of making a standard driver for db with python.
A function that measures the processing time of a method in python
[python] I created a follow-up correlation diagram for twitter (Gremlin edition)
[Python] Correlation is below a certain level ・ Maximum number of features
Get a list of files in a folder with python without a path
Get the number of readers of a treatise on Mendeley in Python