Traitement ETL pour un grand nombre de fichiers GTFS Realtime (édition Python)

Grand ancêtre

Déjà Professor Ito sur Comment fusionner des fichiers GTFS et les mettre dans PostgreSQL + PostGIS , Comment utiliser l'outil pour entrer des données dans PostgreSQL est publié.

Cette motivation

Maintenant que j'ai l'information que les données japonaises GTFS sont stockées dans GCS, je me prépare à jouer avec ces données.

Que faire cette fois

Un grand nombre de fichiers au format de tampons de protocole stockés dans Google Cloud Strage, qui est un service de fichiers de Google Clound Platform, sont téléchargés à la fois et convertis en un lot de données. スクリーンショット 2020-08-08 21.28.41.png

ÉTAPE 1: Téléchargez les données de GCS à l'aide de gsutil ÉTAPE2: Convertissez des fichiers en données structurées à l'aide de python. Il existe deux types de cibles de conversion: le format de trame de données et le CSV.

la mise en oeuvre

À propos de l'environnement

Ce code est censé être utilisé en le collant dans le laboratoire Jupyter, etc. Il ne peut pas être utilisé comme un outil en soi, ni comme une bibliothèque. L'environnement implémenté est Mac.

STEP1 Copie de données à l'aide de gsutil

GCP dispose également d'un outil pour python, mais comme la configuration du compte, etc. est compliquée, utilisez l'outil de ligne de commande gsutil Et téléchargez tous les fichiers du dossier localement.

dowload.sh


gsutil cp -r gs://BACKET_NAME/realtime/unobus.co.jp/vehicle_position/2020/20200801/ ~/dev/unobus/

Commentaire

-- gsutil cp est similaire à la commande UNIX cp. ---r Option pour traiter récursivement --gs: // BACKET_NAME / realtime / un ....Ceci est la source de la copie. Cette fois, une chaîne de caractères factice --~ / dev / unobus /Copier la destination

En général, gsutil -m cp fonctionne en multithread et s'accélère. Cependant, cela n'a pas fonctionné en raison du problème d'autorisation d'accès du compartiment cette fois.

STEP2 Lire et structurer les fichiers

Convertir en bloc de données

pq2df.py


from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()

start = time.time()
i = 0;
temp_dict = {}
for file in files:
    with open(path+'/'+file, 'rb') as f:
        data = f.read()
        feed.ParseFromString(data)
        for entity in feed.entity:
            temp_dict[i] = [
                  entity.id,                         #ID du véhicule
                  entity.vehicle.vehicle.id,         #Numéro de véhicule
                  entity.vehicle.trip.trip_id,       #Numéro de route?
                  entity.vehicle.timestamp,          #Temps de véhicule
                  entity.vehicle.position.longitude, #Latitude du véhicule
                  entity.vehicle.position.latitude,  #Longitude du véhicule
                  entity.vehicle.occupancy_status #Degré de congestion
            ]
            i +=1
df = pd.DataFrame.from_dict(temp_dict, orient='index',columns=['id' , 'vehicle_id', 'trip_id','vehicle_timestamp','longitude','latitude','occupancy_status'])
elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

Sortie au format CSV

pq2csv.py


from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
csvfilename = 'unobus_20200801.csv'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()
with open(csvfilename, 'a') as csv :
    start = time.time()
    for file in files:
        with open(path+'/'+file, 'rb') as f:
            data = f.read()
            feed.ParseFromString(data)
            for entity in feed.entity:
                print(
                    entity.id,                         #ID du véhicule
                    entity.vehicle.vehicle.id,         #Numéro de véhicule
                    entity.vehicle.trip.trip_id,       #Numéro de route?
                    entity.vehicle.timestamp,          #Temps de véhicule
                    entity.vehicle.position.longitude, #Latitude du véhicule
                    entity.vehicle.position.latitude,  #Longitude du véhicule
                    entity.vehicle.occupancy_status,   #Degré de congestion
                    sep=',',file=csv)
    elapsed_time = time.time() - start
    print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

Conversion en trame de données échouée (ultra-lente)

Le code est 220 fois plus lent que pq2df.py. La cause est «df.append».

pq2df_VeryLowSpeed.py


from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()

df = pd.DataFrame(columns=['id' , 'vehicle_id', 'trip_id','vehicle_timestamp','longitude','latitude','occupancy_status'])

start = time.time()
for file in files:
    with open(path+'/'+file, 'rb') as f:
        data = f.read()
        feed.ParseFromString(data)
        for entity in feed.entity:
            tmp_se = pd.Series( [
                  entity.id,                         #ID du véhicule
                  entity.vehicle.vehicle.id,         #Numéro de véhicule
                  entity.vehicle.trip.trip_id,       #Numéro de route?
                  entity.vehicle.timestamp,          #Temps de véhicule
                  entity.vehicle.position.longitude, #Latitude du véhicule
                  entity.vehicle.position.latitude,  #Longitude du véhicule
                  entity.vehicle.occupancy_status #Degré de congestion
            ], index=df.columns )
            df = df.append( tmp_se, ignore_index=True ) #Ce n'est pas bien! !!

elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

À la fin

Nous avons effectué le processus du téléchargement à la conversion des anciennes données GTFS stockées sur le cloud en données structurées. Le processus d'ajout de données avec DataFrame était très lent selon la méthode d'écriture, et j'ai eu du mal.

C'est un peu de puissance, mais j'espère que cela peut contribuer à l'utilisation des données du bus.

Recommended Posts

Traitement ETL pour un grand nombre de fichiers GTFS Realtime (édition Python)
Organisez un grand nombre de fichiers dans des dossiers
[Python] Générer de manière aléatoire un grand nombre de noms de personne en anglais
Consolider un grand nombre de fichiers CSV dans des dossiers avec python (données sans en-tête)
Scrapy-Redis est recommandé pour l'exploration d'un grand nombre de domaines
L'exécution d'un grand nombre de Python3 Executor.submit peut consommer beaucoup de mémoire.
J'ai créé beaucoup de fichiers pour la connexion RDP avec Python
Convertissez un grand nombre de fichiers PDF en fichiers texte à l'aide de pdfminer
Connectez un grand nombre de vidéos ensemble!
L'histoire du traitement A du blackjack (python)
One-liner qui crée un grand nombre de fichiers de test à la fois sous Linux
Téléchargez un grand nombre d'images sur Wordpress
Traitement d'image? L'histoire du démarrage de Python pour
Vérifiez le temps de traitement et le nombre d'appels pour chaque processus avec python (cProfile)
Accélérez un grand nombre de requêtes simples dans MySQL
[Python] Un programme qui compte le nombre de vallées
Python: obtenir une liste de méthodes pour un objet
Algorithme A * (édition Python)
Divers traitements de Python
Lambda + Python est efficace pour restreindre l'accès à un grand nombre de listes d'adresses IP
Obtenez le nombre d'éléments spécifiques dans la liste python
Un ensemble de fichiers de script qui font wordcloud avec Python3
Tourner un tableau de chaînes avec une instruction for (Python3)
[Python] Lecture facile des fichiers image du numéro de série avec OpenCV
Post-traitement de python (NG)
[Python] Traitement itératif (for, while)
Mémorandum de l'outil de gestion de paquets Python ez_setup
Créer et tester un environnement CI pour plusieurs versions de Python
Un bref résumé de Graphviz en python (expliqué uniquement pour mac)
TensorFlow Pour apprendre d'un grand nombre d'images ... ~ (presque) solution ~
L'histoire de la création d'un pilote standard pour db avec python.
Une fonction qui mesure le temps de traitement d'une méthode en python
[python] J'ai créé un diagramme de corrélation de suivi pour Twitter (édition Gremlin)
Obtenez une liste de fichiers dans un dossier avec python sans chemin
Obtenez le nombre de lecteurs d'articles sur Mendeley en Python