Implemented in Dataflow to copy the hierarchy from Google Drive to Google Cloud Storage

Introduction

I implemented it because I often wanted to synchronize Google Drive (hereinafter Drive) and Google Cloud Storage (hereinafter GCS). GCS does not have the concept of directories, so if you know the file path, you can parallelize the copies.

[Drive]                            [GCS]

root/                              gs:root/
 ├ hoge.txt                         ├ hoge.txt
 ├ folderA/                         ├ folderA/fuga.txt
 │  └ fuga.txt                      ├ folderB/folderC/hogehoge.txt 
 ├ folderB/               ----->    └ piyo.txt
 │  └ folderC/                      
 │   └ hogehoge.txt/             
 └ piyo.txt

*Image where the file path on Drive becomes the file name on GCS

Why Dataflow?

At first, I wrote parallel copy processing on Google App Engine (GAE). However, if the parallel copy tasks are distributed, it is difficult to detect that all copies have been completed. Also, GAE is simply not good at batch processing, and I've been touching Dataflow recently at work. With Dataflow, you can wait for the distributed process to finish. After that, I thought that it would be better to write Pub / Sub or CustomIO and connect it to the subsequent processing.

Requirements

Copy the hierarchical structure directly under the folder with Drive (hereinafter referred to as the root folder) to GCS in parallel. Files that cannot be copied, such as Spreadsheet, are excluded. What about files with the same file name in the same folder?

Implementation overview

Search for the following from the root folder ID and create a list of objects with file IDs and file paths. Distribute the created objects to each task and parallelize the "Download files from Drive and upload to GCS" part.

Pipeline design

――Does DriveIO exist as standard?

Simple pipeline code

// *Point 1:It is a miso to make an input with an appropriate value first
p.apply("First Input",Create.of("A")).setCoder(StringUtf8Coder.of())
 
 .apply("Read Drive", ParDo.of(new ReadDriveDoFn(rootFolderId)))
 .apply("Write GCS", ParDo.of(new WriteStorageDoFn()));
 
 // *Point 2:I want to wait until all the copy processing is completed, so I am taking the total value of Output
 .apply("Combine!", Sum.integersGlobally()))
 
 .apply("The copy is finished, so please do whatever you like with the subsequent processing!")

p.run();

--ReadDriveDoFn: Create a file list directly under the root folder --Search recursively from the root folder ID and create a list with the file ID and its path as objects. --Distribute by looping the list and outputting

	public class ReadDriveDoFn extends DoFn<String, File> {

		private List<File> file;

	    @ProcessElement
	    public void processElement(ProcessContext c) {
	        recursiveSearch(rootFolderID, filePath); //Create a list
	        for (File file : fileList) {
	            c.output(file); //Distribute the list!
	        }
	    }
	}

--WriteStorageDoFn: Download the file from Drive and upload it to GCS --Obtain the file ID from the object received as Input and download it. Then save to GCS based on the file path --Output here is set to "1" appropriately (anything is fine, but you can know the number of copied files)

	public class WriteStorageDoFn extends DoFn<File, Integer> {
	    @ProcessElement
	    public void processElement(ProcessContext c) {
	    	downloadFromDrive(fileId);
	    	uploadToGCS(filePath);
	    	c.output(1);
	    }
	}

--Sum.integersGlobally: Add the number of Output elements> Here shows the number of copied files --By inserting DoFn, which adds all Output elements, you can wait for the end of file copying, and you can also grasp the number of copied files.

at the end

It's more than twice as fast as the process I originally wrote on GAE / Go. However, G Suite (Apps) APIs are insanely fragile, aren't they? This time it is possible to distribute the copy, but when trying to handle a large number of files, a considerable error occurs. Let's write the retry process properly. Dataflow is still not good at details, but I think it has infinite possibilities, so I would like to continue using it for various purposes in the future.

Recommended Posts

Implemented in Dataflow to copy the hierarchy from Google Drive to Google Cloud Storage
Copy data from Amazon S3 to Google Cloud Storage with Python (boto)
I checked the Python package pre-installed in Google Cloud Dataflow
Regularly upload files to Google Drive using the Google Drive API in Python
How to fix the shit heavy when reading Google Cloud Storage images from Django deployed on GAE
Get Google Cloud Storage object list in Java
How to use the Google Cloud Translation API
Memorandum (in openpyxl ① copy and paste from another book ② refer to the comparison table)
Make a copy of a Google Drive file from Python
Send log data from the server to Splunk Cloud
How to load files in Google Drive with Google Colaboratory
From python to running instance on google cloud platform
Send a message from the server to your Chrome extension using Google Cloud Messaging for Chrome
[Python] Change the Cache-Control of the object uploaded to Cloud Storage
Log in to the fortigate (6.0) management screen from selenium-try to log out
How to log in automatically like 1Password from the CLI
Duplicate the document template prepared in Google Drive with PyDrive2
What is Google Cloud Dataflow?
[GCP] How to publish Cloud Storage signed URLs (temporary URLs) in Python
Post a message from IBM Cloud Functions to Slack in Python
[Cloudian # 2] Try to display the object storage bucket in Python (boto3)
How to connect to Cloud Firestore from Google Cloud Functions with python code
Script for backing up folders on the server to Google Drive
Change the active version in Pyenv from anaconda to plain Python
Attempt to extend a function in the library (add copy function to pathlib)
From the AWS cloud product page, put the AWS service name in csv
Download the images and videos included in the tweets you liked on Twitter and upload them to Google Drive
How to copy and paste the contents of a sheet in Google Spreadsheet in JSON format (using Google Colab)