Below is the architecture image of this time.
・ Two Topic in Pub / Sub ┗ Name them "topic_1" and "topic_2"
・ Cloud Scheduler ┗ Set the topic to "topic_1" and the payload to "hello"
・ Two Functions with Cloud Functions ┗ Name them "function_1" and "function_2" Trigger of "function_1" sets "topic_1" of Pub / Sub Trigger of "function_2" sets "topic_2" of Pub / Sub
In the following "event_message", the character string such as the payload "hello" set by Cloud Scheduler via topic_1 is stored.
If N Cloud Schedulers are changed only in payload and set to the same settings, event_message can be judged in function_1 and subsequent processing can be performed.
def main(event, context):
event_message = base64.b64decode(event['data']).decode('utf-8')
Then suppose you pass a list to function_2 Pub / Sub can only pass text, so you need to encode it
from google.cloud import pubsub_v1
PROJECT_ID = os.getenv('GCP_PROJECT')
client = pubsub_v1.PublisherClient()
topic_id = "topic_2" #Set the topic to pass next
topic_path = client.topic_path(PROJECT_ID, topic_id)
pub_text = ["Apple", "gorilla", "rap"]
data = pub_text.encode() #Encode here
client.publish(topic_path, data=data) #This is topic_2 to pub_text is pushed
In the last line above,'[" apple", "gorilla", "trumpet"]' is passed to topic_2 and function_2 is ignited.
In the event_message below,'[" apple", "gorilla", "trumpet"]' is included, so eval it and return it to the python list. What is the subsequent processing?
def main(event, context):
event_message = base64.b64decode(event['data']).decode('utf-8')
fruit_lst = eval(event_message)
Cloud Functions can be started in parallel as shown in the official document below. https://cloud.google.com/functions/quotas?hl=ja#scalability
Therefore, if you publish function_1 to Pub / Sub while looping as follows, function_2 will get 3 fruits.
function_1.py
from google.cloud import pubsub_v1
PROJECT_ID = os.getenv('GCP_PROJECT')
client = pubsub_v1.PublisherClient()
topic_id = "topic_2"
topic_path = client.topic_path(PROJECT_ID, topic_id)
fruit_lst = ["Apple", "gorilla", "rap"]
for fruit in fruit_lst:
data = fruit.encode()
client.publish(topic_path, data=data)
https://cloud.google.com/solutions/streaming-data-from-cloud-storage-into-bigquery-using-cloud-functions?hl=ja https://cloud.google.com/functions/quotas?hl=ja#scalability
If you also encode () the list and dict, pass it to Pub / Sub, and eval it on the receiving side, it will be restored, so it was convenient when you want to perform multiple light processes on a regular basis.
Recommended Posts