I implemented it because I often wanted to synchronize Google Drive (hereinafter Drive) and Google Cloud Storage (hereinafter GCS). GCS does not have the concept of directories, so if you know the file path, you can parallelize the copies.
[Drive] [GCS]
root/ gs:root/
├ hoge.txt ├ hoge.txt
├ folderA/ ├ folderA/fuga.txt
│ └ fuga.txt ├ folderB/folderC/hogehoge.txt
├ folderB/ -----> └ piyo.txt
│ └ folderC/
│ └ hogehoge.txt/
└ piyo.txt
*Image where the file path on Drive becomes the file name on GCS
At first, I wrote parallel copy processing on Google App Engine (GAE). However, if the parallel copy tasks are distributed, it is difficult to detect that all copies have been completed. Also, GAE is simply not good at batch processing, and I've been touching Dataflow recently at work. With Dataflow, you can wait for the distributed process to finish. After that, I thought that it would be better to write Pub / Sub or CustomIO and connect it to the subsequent processing.
Copy the hierarchical structure directly under the folder with Drive (hereinafter referred to as the root folder) to GCS in parallel. Files that cannot be copied, such as Spreadsheet, are excluded. What about files with the same file name in the same folder?
Search for the following from the root folder ID and create a list of objects with file IDs and file paths. Distribute the created objects to each task and parallelize the "Download files from Drive and upload to GCS" part.
――Does DriveIO exist as standard?
// *Point 1:It is a miso to make an input with an appropriate value first
p.apply("First Input",Create.of("A")).setCoder(StringUtf8Coder.of())
.apply("Read Drive", ParDo.of(new ReadDriveDoFn(rootFolderId)))
.apply("Write GCS", ParDo.of(new WriteStorageDoFn()));
// *Point 2:I want to wait until all the copy processing is completed, so I am taking the total value of Output
.apply("Combine!", Sum.integersGlobally()))
.apply("The copy is finished, so please do whatever you like with the subsequent processing!")
p.run();
--ReadDriveDoFn: Create a file list directly under the root folder --Search recursively from the root folder ID and create a list with the file ID and its path as objects. --Distribute by looping the list and outputting
public class ReadDriveDoFn extends DoFn<String, File> {
private List<File> file;
@ProcessElement
public void processElement(ProcessContext c) {
recursiveSearch(rootFolderID, filePath); //Create a list
for (File file : fileList) {
c.output(file); //Distribute the list!
}
}
}
--WriteStorageDoFn: Download the file from Drive and upload it to GCS --Obtain the file ID from the object received as Input and download it. Then save to GCS based on the file path --Output here is set to "1" appropriately (anything is fine, but you can know the number of copied files)
public class WriteStorageDoFn extends DoFn<File, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
downloadFromDrive(fileId);
uploadToGCS(filePath);
c.output(1);
}
}
--Sum.integersGlobally: Add the number of Output elements> Here shows the number of copied files --By inserting DoFn, which adds all Output elements, you can wait for the end of file copying, and you can also grasp the number of copied files.
It's more than twice as fast as the process I originally wrote on GAE / Go. However, G Suite (Apps) APIs are insanely fragile, aren't they? This time it is possible to distribute the copy, but when trying to handle a large number of files, a considerable error occurs. Let's write the retry process properly. Dataflow is still not good at details, but I think it has infinite possibilities, so I would like to continue using it for various purposes in the future.
Recommended Posts