In Dataflow implementiert, um die hierarchische Struktur von Google Drive in Google Cloud Storage zu kopieren

Einführung

Ich habe es implementiert, weil ich häufig Google Drive (im Folgenden: Drive) und Google Cloud Storage (im Folgenden: GCS) synchronisieren wollte. Da GCS nicht über das Konzept eines Verzeichnisses verfügt, kann das Kopieren parallelisiert werden, wenn der Dateipfad bekannt ist.

[Drive]                            [GCS]

root/                              gs:root/
 ├ hoge.txt                         ├ hoge.txt
 ├ folderA/                         ├ folderA/fuga.txt
 │  └ fuga.txt                      ├ folderB/folderC/hogehoge.txt 
 ├ folderB/               ----->    └ piyo.txt
 │  └ folderC/                      
 │   └ hogehoge.txt/             
 └ piyo.txt

*Bild, in dem der Dateipfad auf dem Laufwerk zum Dateinamen in GCS wird

Warum Datenfluss?

Zuerst habe ich die parallele Kopierverarbeitung mit Google App Engine (im Folgenden: GAE) geschrieben. Wenn jedoch die parallelen Kopieraufgaben verteilt sind, ist es schwierig zu erkennen, dass alle Kopien abgeschlossen wurden. Außerdem ist GAE einfach nicht gut in der Stapelverarbeitung, und ich war kürzlich bei der Arbeit dem Datenfluss ausgesetzt. Mit Dataflow können Sie warten, bis der verteilte Prozess abgeschlossen ist. Danach dachte ich, dass es besser wäre, Pub / Sub oder CustomIO zu schreiben und es mit der nachfolgenden Verarbeitung zu verbinden.

Bedarf

Kopieren Sie die hierarchische Struktur direkt unter dem Ordner mit Drive (im Folgenden als Stammordner bezeichnet) parallel nach GCS. Dateien, die nicht kopiert werden können, z. B. Tabellenkalkulation, sind ausgeschlossen. Was ist mit Dateien mit demselben Dateinamen im selben Ordner?

Implementierungsübersicht

Suchen Sie in der Stammordner-ID nach Folgendem und erstellen Sie eine Liste von Objekten mit Datei-IDs und Dateipfaden. Verteilen Sie die erstellten Objekte auf jede Aufgabe und parallelisieren Sie den Teil "Dateien vom Laufwerk herunterladen und auf GCS hochladen".

Pipeline-Design

―― Gibt es DriveIO als Standard?

Einfacher Pipeline-Code

// *Punkt 1:Es ist ein Fehler, zuerst eine Eingabe mit einem geeigneten Wert vorzunehmen
p.apply("First Input",Create.of("A")).setCoder(StringUtf8Coder.of())
 
 .apply("Read Drive", ParDo.of(new ReadDriveDoFn(rootFolderId)))
 .apply("Write GCS", ParDo.of(new WriteStorageDoFn()));
 
 // *Punkt 2:Ich möchte warten, bis die gesamte Kopierverarbeitung abgeschlossen ist, also nehme ich den Gesamtwert der Ausgabe
 .apply("Combine!", Sum.integersGlobally()))
 
 .apply("Die Kopie ist fertig, also machen Sie bitte mit der anschließenden Bearbeitung, was Sie wollen!")

p.run();

--ReadDriveDoFn: Erstellen Sie eine Dateiliste direkt unter dem Stammordner --Suchen Sie rekursiv nach der Stammordner-ID und erstellen Sie eine Liste mit der Datei-ID und ihrem Pfad als Objekte.

	public class ReadDriveDoFn extends DoFn<String, File> {

		private List<File> file;

	    @ProcessElement
	    public void processElement(ProcessContext c) {
	        recursiveSearch(rootFolderID, filePath); //Erstelle eine Liste
	        for (File file : fileList) {
	            c.output(file); //Verteile die Liste!
	        }
	    }
	}

--WriteStorageDoFn: Laden Sie die Datei von Drive herunter und laden Sie sie auf GCS hoch

	public class WriteStorageDoFn extends DoFn<File, Integer> {
	    @ProcessElement
	    public void processElement(ProcessContext c) {
	    	downloadFromDrive(fileId);
	    	uploadToGCS(filePath);
	    	c.output(1);
	    }
	}

--Sum.integersGlobally: Anzahl der Ausgabeelemente hinzufügen> Hier wird die Anzahl der kopierten Dateien angezeigt

schließlich

Es ist mehr als doppelt so schnell wie der Prozess, den ich ursprünglich in GAE / Go geschrieben habe. APIs vom Typ G Suite (Apps) sind jedoch äußerst fragil, nicht wahr? Es ist jetzt möglich, die Kopie zu verteilen, aber beim Versuch, eine große Anzahl von Dateien zu verarbeiten, tritt ein erheblicher Fehler auf. Lassen Sie uns den Wiederholungsvorgang richtig schreiben. Der Datenfluss ist immer noch nicht gut im Detail, aber ich denke, er hat unendlich viele Möglichkeiten, daher möchte ich ihn auch in Zukunft für verschiedene Zwecke verwenden.

Recommended Posts

In Dataflow implementiert, um die hierarchische Struktur von Google Drive in Google Cloud Storage zu kopieren
Kopieren Sie Daten von Amazon S3 mit Python (boto) in Google Cloud Storage.
Ich habe das in Google Cloud Dataflow vorinstallierte Python-Paket überprüft
So beheben Sie die Probleme beim Lesen von Google Cloud Storage-Bildern von Django, die auf GAE bereitgestellt wurden
Abrufen der Google Cloud Storage-Objektliste in Java
Verwendung der Google Cloud Translation API
Memorandum ((1) Kopieren und Einfügen aus einem anderen Buch (2) Siehe Vergleichstabelle mit openpyxl)
Erstellen Sie eine Kopie einer Google Drive-Datei aus Python
Senden Sie Protokolldaten vom Server an Splunk Cloud
So laden Sie Dateien in Google Drive mit Google Colaboratory
Senden Sie mithilfe von Google Cloud Messaging für Chrome eine Nachricht vom Server an die Chrome-Erweiterung
[Python] Ändern Sie die Cache-Steuerung von Objekten, die in den Cloud-Speicher hochgeladen wurden
Melden Sie sich von Selenium aus beim Fortigate (6.0) -Verwaltungsbildschirm an, um sich abzumelden
So melden Sie sich automatisch wie 1Password von der CLI an
Duplizieren Sie die in Google Drive erstellte Dokumentvorlage mit PyDrive2
Was ist Google Cloud Dataflow?
[GCP] So veröffentlichen Sie eine mit Cloud Storage signierte URL (temporäre URL) in Python
Senden Sie eine Nachricht von IBM Cloud Functions an Slack in Python
Herstellen einer Verbindung zum Cloud Firestore über Google Cloud-Funktionen mit Python-Code
Skript zum Sichern von Ordnern auf dem Server in Google Drive
Ändern Sie die aktive Version in Pyenv von Anaconda in einfaches Python
Geben Sie auf der AWS Cloud-Produktseite den Namen des AWS-Dienstes in csv ein
Laden Sie die Bilder und Videos herunter, die in den Tweets enthalten sind, die Ihnen auf Twitter gefallen haben, und laden Sie sie auf Google Drive hoch
Kopieren und Einfügen des Inhalts eines Blattes im JSON-Format mit einer Google-Tabelle (mithilfe von Google Colab)