ETL-Verarbeitung für eine große Anzahl von GTFS-Echtzeitdateien (Python Edition)

Großer Vorfahr

Bereits Professor Ito unter Zusammenführen von GTFS-Dateien und Senden an PostgreSQL + PostGIS , Wie man das Tool zur Eingabe von Daten in PostgreSQL verwendet, wird veröffentlicht.

Diese Motivation

Nachdem ich die Informationen habe, dass japanische GTFS-Daten in GCS gespeichert sind, bereite ich mich darauf vor, mit diesen Daten zu spielen.

Was ist diesmal zu tun?

Eine große Anzahl von Dateien im Protokollpufferformat, die in Google Cloud Strage, einem Dateidienst der Google Clound Platform, gespeichert sind, werden sofort heruntergeladen und in einen Datenstapel konvertiert. スクリーンショット 2020-08-08 21.28.41.png

SCHRITT 1: Laden Sie Daten mit gsutil von GCS herunter SCHRITT 2: Konvertieren Sie Dateien mit Python in strukturierte Daten. Es gibt zwei Arten von Konvertierungszielen: Datenrahmenformat und CSV.

Implementierung

Über die Umwelt

Dieser Code soll durch Einfügen in Jupyter Lab usw. verwendet werden. Es kann weder als eigenständiges Werkzeug noch als Bibliothek verwendet werden. Die implementierte Umgebung ist Mac.

SCHRITT 1 Datenkopie mit gsutil

GCP hat auch ein Tool für Python, aber da die Kontoeinstellung usw. kompliziert ist, verwenden Sie das Befehlszeilentool gsutil. Laden Sie alle Dateien im Ordner lokal herunter.

dowload.sh


gsutil cp -r gs://BACKET_NAME/realtime/unobus.co.jp/vehicle_position/2020/20200801/ ~/dev/unobus/

Kommentar

-- gsutil cp ähnelt dem UNIX-Befehl cp. ---r Option zur rekursiven Verarbeitung --gs: // BACKET_NAME / realtime / un ....Dies ist die Kopierquelle. Diesmal eine Dummy-Zeichenkette --~ / dev / unobus /Ziel kopieren

Im Allgemeinen arbeitet gsutil -m cp multithreaded und beschleunigt. Diesmal funktionierte es jedoch aufgrund des Zugriffsberechtigungsproblems des Buckets nicht.

SCHRITT 2 Dateien lesen und strukturieren

In Datenrahmen konvertieren

pq2df.py


from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()

start = time.time()
i = 0;
temp_dict = {}
for file in files:
    with open(path+'/'+file, 'rb') as f:
        data = f.read()
        feed.ParseFromString(data)
        for entity in feed.entity:
            temp_dict[i] = [
                  entity.id,                         #Fahrzeugidentifikation
                  entity.vehicle.vehicle.id,         #Fahrzeugnummer
                  entity.vehicle.trip.trip_id,       #Routennummer?
                  entity.vehicle.timestamp,          #Fahrzeugzeit
                  entity.vehicle.position.longitude, #Fahrzeugspielraum
                  entity.vehicle.position.latitude,  #Fahrzeuglänge
                  entity.vehicle.occupancy_status #Überlastungsgrad
            ]
            i +=1
df = pd.DataFrame.from_dict(temp_dict, orient='index',columns=['id' , 'vehicle_id', 'trip_id','vehicle_timestamp','longitude','latitude','occupancy_status'])
elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

Ausgabe an CSV

pq2csv.py


from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
csvfilename = 'unobus_20200801.csv'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()
with open(csvfilename, 'a') as csv :
    start = time.time()
    for file in files:
        with open(path+'/'+file, 'rb') as f:
            data = f.read()
            feed.ParseFromString(data)
            for entity in feed.entity:
                print(
                    entity.id,                         #Fahrzeugidentifikation
                    entity.vehicle.vehicle.id,         #Fahrzeugnummer
                    entity.vehicle.trip.trip_id,       #Routennummer?
                    entity.vehicle.timestamp,          #Fahrzeugzeit
                    entity.vehicle.position.longitude, #Fahrzeugspielraum
                    entity.vehicle.position.latitude,  #Fahrzeuglänge
                    entity.vehicle.occupancy_status,   #Überlastungsgrad
                    sep=',',file=csv)
    elapsed_time = time.time() - start
    print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

In fehlerhaften Datenrahmen konvertieren (extrem langsam)

Der Code ist 220-mal langsamer als pq2df.py. Die Ursache ist "df.append".

pq2df_VeryLowSpeed.py


from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()

df = pd.DataFrame(columns=['id' , 'vehicle_id', 'trip_id','vehicle_timestamp','longitude','latitude','occupancy_status'])

start = time.time()
for file in files:
    with open(path+'/'+file, 'rb') as f:
        data = f.read()
        feed.ParseFromString(data)
        for entity in feed.entity:
            tmp_se = pd.Series( [
                  entity.id,                         #Fahrzeugidentifikation
                  entity.vehicle.vehicle.id,         #Fahrzeugnummer
                  entity.vehicle.trip.trip_id,       #Routennummer?
                  entity.vehicle.timestamp,          #Fahrzeugzeit
                  entity.vehicle.position.longitude, #Fahrzeugspielraum
                  entity.vehicle.position.latitude,  #Fahrzeuglänge
                  entity.vehicle.occupancy_status #Überlastungsgrad
            ], index=df.columns )
            df = df.append( tmp_se, ignore_index=True ) #Das ist nicht gut! !!

elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

Am Ende

Wir haben den Prozess vom Herunterladen bis zur Konvertierung von in der Cloud gespeicherten GTFS-Daten in strukturierte Daten durchgeführt. Das Hinzufügen von Daten mit DataFrame war je nach Schreibmethode sehr langsam, und es fiel mir schwer.

Es ist ein wenig Leistung, aber ich hoffe, es kann zur Nutzung von Busdaten beitragen.

Recommended Posts

ETL-Verarbeitung für eine große Anzahl von GTFS-Echtzeitdateien (Python Edition)
Organisieren Sie eine große Anzahl von Dateien in Ordnern
[Python] Generieren Sie zufällig eine große Anzahl englischer Personennamen
Konsolidieren Sie eine große Anzahl von CSV-Dateien in Ordnern mit Python (Daten ohne Header).
Scrapy-Redis wird zum Crawlen einer großen Anzahl von Domänen empfohlen
Das Ausführen einer großen Anzahl von Python3 Executor.submit verbraucht möglicherweise viel Speicher.
Ich habe viele Dateien für die RDP-Verbindung mit Python erstellt
Konvertieren Sie eine große Anzahl von PDF-Dateien mit pdfminer in Textdateien
Verbinde eine große Anzahl von Videos miteinander!
Die Geschichte der Verarbeitung A von Blackjack (Python)
Einzeiler, der unter Linux eine große Anzahl von Testdateien gleichzeitig erstellt
Laden Sie eine große Anzahl von Bildern in Wordpress hoch
Bildverarbeitung? Die Geschichte, Python für zu starten
Überprüfen Sie die Verarbeitungszeit und die Anzahl der Aufrufe für jeden Prozess mit Python (cProfile).
Beschleunigen Sie eine große Anzahl einfacher Abfragen in MySQL
[Python] Ein Programm, das die Anzahl der Täler zählt
Python: Ruft eine Liste der Methoden für ein Objekt ab
Ein * Algorithmus (Python Edition)
Verschiedene Verarbeitung von Python
Lambda + Python kann den Zugriff auf eine große Anzahl von IP-Adresslisten gut einschränken
Holen Sie sich die Anzahl der spezifischen Elemente in der Python-Liste
Eine Reihe von Skriptdateien, die Wordcloud mit Python3 ausführen
Drehen Sie ein Array von Zeichenfolgen mit einer for-Anweisung (Python3).
[Python] Mit OpenCV können Sie problemlos Bilddateien mit Seriennummern lesen
Nachbearbeitung von Python (NG)
[Python] Iterative Verarbeitung (für, während)
Memorandum des Python-Paketverwaltungstools ez_setup
Erstellen und testen Sie eine CI-Umgebung für mehrere Versionen von Python
Eine kurze Zusammenfassung von Graphviz in Python (nur für Mac erklärt)
TensorFlow Aus einer großen Anzahl von Bildern lernen ... ~ (fast) Lösung ~
Die Geschichte, einen Standardtreiber für db mit Python zu erstellen.
Eine Funktion, die die Verarbeitungszeit einer Methode in Python misst
[Python] Ich habe ein Follow-Korrelationsdiagramm für Twitter erstellt (Gremlin-Ausgabe)
Ruft eine Liste der Dateien in einem Ordner mit Python ohne Pfad ab
Holen Sie sich die Anzahl der Leser von Artikeln über Mendeley in Python