Ich habe es implementiert, weil ich häufig Google Drive (im Folgenden: Drive) und Google Cloud Storage (im Folgenden: GCS) synchronisieren wollte. Da GCS nicht über das Konzept eines Verzeichnisses verfügt, kann das Kopieren parallelisiert werden, wenn der Dateipfad bekannt ist.
[Drive] [GCS]
root/ gs:root/
├ hoge.txt ├ hoge.txt
├ folderA/ ├ folderA/fuga.txt
│ └ fuga.txt ├ folderB/folderC/hogehoge.txt
├ folderB/ -----> └ piyo.txt
│ └ folderC/
│ └ hogehoge.txt/
└ piyo.txt
*Bild, in dem der Dateipfad auf dem Laufwerk zum Dateinamen in GCS wird
Zuerst habe ich die parallele Kopierverarbeitung mit Google App Engine (im Folgenden: GAE) geschrieben. Wenn jedoch die parallelen Kopieraufgaben verteilt sind, ist es schwierig zu erkennen, dass alle Kopien abgeschlossen wurden. Außerdem ist GAE einfach nicht gut in der Stapelverarbeitung, und ich war kürzlich bei der Arbeit dem Datenfluss ausgesetzt. Mit Dataflow können Sie warten, bis der verteilte Prozess abgeschlossen ist. Danach dachte ich, dass es besser wäre, Pub / Sub oder CustomIO zu schreiben und es mit der nachfolgenden Verarbeitung zu verbinden.
Kopieren Sie die hierarchische Struktur direkt unter dem Ordner mit Drive (im Folgenden als Stammordner bezeichnet) parallel nach GCS. Dateien, die nicht kopiert werden können, z. B. Tabellenkalkulation, sind ausgeschlossen. Was ist mit Dateien mit demselben Dateinamen im selben Ordner?
Suchen Sie in der Stammordner-ID nach Folgendem und erstellen Sie eine Liste von Objekten mit Datei-IDs und Dateipfaden. Verteilen Sie die erstellten Objekte auf jede Aufgabe und parallelisieren Sie den Teil "Dateien vom Laufwerk herunterladen und auf GCS hochladen".
―― Gibt es DriveIO als Standard?
// *Punkt 1:Es ist ein Fehler, zuerst eine Eingabe mit einem geeigneten Wert vorzunehmen
p.apply("First Input",Create.of("A")).setCoder(StringUtf8Coder.of())
.apply("Read Drive", ParDo.of(new ReadDriveDoFn(rootFolderId)))
.apply("Write GCS", ParDo.of(new WriteStorageDoFn()));
// *Punkt 2:Ich möchte warten, bis die gesamte Kopierverarbeitung abgeschlossen ist, also nehme ich den Gesamtwert der Ausgabe
.apply("Combine!", Sum.integersGlobally()))
.apply("Die Kopie ist fertig, also machen Sie bitte mit der anschließenden Bearbeitung, was Sie wollen!")
p.run();
--ReadDriveDoFn: Erstellen Sie eine Dateiliste direkt unter dem Stammordner --Suchen Sie rekursiv nach der Stammordner-ID und erstellen Sie eine Liste mit der Datei-ID und ihrem Pfad als Objekte.
public class ReadDriveDoFn extends DoFn<String, File> {
private List<File> file;
@ProcessElement
public void processElement(ProcessContext c) {
recursiveSearch(rootFolderID, filePath); //Erstelle eine Liste
for (File file : fileList) {
c.output(file); //Verteile die Liste!
}
}
}
--WriteStorageDoFn: Laden Sie die Datei von Drive herunter und laden Sie sie auf GCS hoch
public class WriteStorageDoFn extends DoFn<File, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
downloadFromDrive(fileId);
uploadToGCS(filePath);
c.output(1);
}
}
--Sum.integersGlobally: Anzahl der Ausgabeelemente hinzufügen> Hier wird die Anzahl der kopierten Dateien angezeigt
Es ist mehr als doppelt so schnell wie der Prozess, den ich ursprünglich in GAE / Go geschrieben habe. APIs vom Typ G Suite (Apps) sind jedoch äußerst fragil, nicht wahr? Es ist jetzt möglich, die Kopie zu verteilen, aber beim Versuch, eine große Anzahl von Dateien zu verarbeiten, tritt ein erheblicher Fehler auf. Lassen Sie uns den Wiederholungsvorgang richtig schreiben. Der Datenfluss ist immer noch nicht gut im Detail, aber ich denke, er hat unendlich viele Möglichkeiten, daher möchte ich ihn auch in Zukunft für verschiedene Zwecke verwenden.
Recommended Posts