By the way, this time the name of shard is "test" and the number is one.
Main.py
#-*- coding: utf-8 -*-
from boto import kinesis
auth = {"aws_access_key_id":"Please enter the IAM ACCESS KEY", "aws_secret_access_key":"Please enter the IAM SECRET ACCESS KEY"}
if __name__ == '__main__':
# kinesis.connect_to_region('region',IAM credentials)
Connection = kinesis.connect_to_region('us-east-1',**auth)
while true:
#Write
# Connection.put_record(Stream name,Data to write, PartitionKey)
put_response = Connection.put_record('test' , "hogehoge" , 'one')
sleep(10)
only this.
A worker is attached to each shard to read the data so that it can be processed even if the shard increases.
show.py
# -*- coding: utf-8 -*-
import time
import base64
import multiprocessing
from boto import kinesis
import threading
auth = {"aws_access_key_id":"Please enter the IAM ACCESS KEY", "aws_secret_access_key":"Please enter the IAM SECRET ACCESS KEY"}
#The name of the stream
STREAM_NAME='test'
def worker(connect, kinesis_iterator):
next_iterator = kinesis_iterator['ShardIterator']
while True:
response = connect.get_records(next_iterator)
next_iterator = response['NextShardIterator']
time.sleep(1)
#Display the contents written in shard
for data in response['Records']:
print(data)
def get_record():
connect = kinesis.connect_to_region('us-east-1',**auth)
stream = connect.describe_stream(STREAM_NAME)
#Get a list of shards
shards = stream['StreamDescription']['Shards']
#Install a worker for each shard and get data
for shard in shards:
kinesis_iterator = connect.get_shard_iterator(STREAM_NAME, shard['ShardId'], shard_iterator_type='TRIM_HORIZON')
job = threading.Thread(target=worker, args=(connect, kinesis_iterator))
job.start()
if __name__ == '__main__':
get_record()
I was able to go like this.
Recommended Posts