I want to get Marketo data. Make a note of the sample to get the Leads data in bulk. Can also be used for other data resources.
Getting Started gives you a feel for the REST API REST API
You can grasp the atmosphere by doing the bulk acquisition explanation page Bulk Extract
import requests
import json
import time
import os
import pendulum
from google.cloud import bigquery
#↓ Can be obtained from the management screen
# https://developers.marketo.com/rest-api/
ENDPOINT = 'YOUR_ENDPOINT'
CLIENT_ID = 'YOUR_CLIENT_ID'
CLIENT_SECRET = 'YOUR_CLIENT_SECRET'
class MarketoAPI:
def __init__(self, target_date):
self.target_date = target_date
self.token = self.get_token()
self.header = {
'Authorization': 'Bearer {}'.format(self.token),
'content-type': 'application/json'
}
self.export_id = None #Set when creating a job
# CLIENT_ID and CLIENT_Issue a token from SECRET
#Note that the token issued from the management screen will be expired immediately.
def get_token(self):
URL = '{}/identity/oauth/token?client_id={}&client_secret={}&grant_type=client_credentials'.format(
ENDPOINT, CLIENT_ID, CLIENT_SECRET
)
headers = {'content-type': 'application/json'}
r = requests.get(URL, headers=headers)
if r.status_code != requests.codes.ok:
r.raise_for_status()
return r.json()['access_token']
#This time we will use the Bulk Extract API
#First, create a job to get LEAD
# https://developers.marketo.com/rest-api/bulk-extract/bulk-lead-extract/
def create(self):
URL = '{}/bulk/v1/leads/export/create.json'.format(ENDPOINT)
payload = {
#Specify the field you want to get
#The fields that can be specified are`GET /rest/v1/leads/describe.json`Can be confirmed with the API of
"fields": [
"id",
"email",
"createdAt",
"updatedAt"
],
"format": "CSV",
#Specify a filter
#The fileter that can be specified is written in ↓ doc
# https://developers.marketo.com/rest-api/bulk-extract/bulk-lead-extract/
"filter": {
"createdAt": {
"startAt": self.target_date.isoformat(),
"endAt": self.target_date.end_of('day').isoformat()
}
}
}
r = requests.post(URL, data=json.dumps(payload), headers=self.header)
if r.status_code != requests.codes.ok:
r.raise_for_status()
export_id = r.json()['result'][0]['exportId']
self.export_id = export_id
return export_id
# create()Jobs are not processed just by creating in
#You can make it wait for execution by enqueue the job here
def enqueue(self):
URL = '{}/bulk/v1/leads/export/{}/enqueue.json'.format(
ENDPOINT, self.export_id
)
r = requests.post(URL, headers=self.header)
if r.status_code != requests.codes.ok:
r.raise_for_status()
#Method to polling statusp until processing of enqueued job is completed
def check_until_done(self):
URL = '{}/bulk/v1/leads/export/{}/status.json'.format(
ENDPOINT, self.export_id
)
#Wait 3 minutes before request
#Run 3 times until Completed
for i in range(3):
time.sleep(60 * 3)
r = requests.get(URL, headers=self.header)
if r.status_code == requests.codes.ok:
if r.json()['result'][0]['status'] == 'Completed':
return True
if r.status_code != requests.codes.ok:
r.raise_for_status()
else:
raise('job status is: ' + r.json()['result'][0]['status'])
#Method to download file when job is completed
def dl_file(self):
URL = '{}/bulk/v1/leads/export/{}/file.json'.format(
ENDPOINT, self.export_id
)
r = requests.get(URL, headers=self.header)
if r.status_code != requests.codes.ok:
r.raise_for_status()
with open('./marketo_lead_file_{}.csv'.format(self.target_date.strftime('%Y%m%d')), mode='w') as f:
f.write(r.text)
Under the condition of this create, the following file will be downloaded
id,email,createdAt,updatedAt
111111,[email protected],2020-01-02T15:35:37Z,2020-01-02T15:35:36Z
222222,[email protected],2030-01-02T22:58:07Z,2020-01-02T22:58:07Z
Recommended Posts