Au fait, cette fois, le nom du fragment est "test" et le nombre est un.
Main.py
#-*- coding: utf-8 -*-
from boto import kinesis
auth = {"aws_access_key_id":"Veuillez saisir la CLÉ D'ACCÈS IAM", "aws_secret_access_key":"Veuillez saisir la CLÉ D'ACCÈS SECRET d'IAM"}
if __name__ == '__main__':
# kinesis.connect_to_region('Région',Informations d'identification IAM)
Connection = kinesis.connect_to_region('us-east-1',**auth)
while true:
#Écrire
# Connection.put_record(Nom du flux,Données à écrire, PartitionKey)
put_response = Connection.put_record('test' , "hogehoge" , 'one')
sleep(10)
seulement ça.
Un worker est attaché à chaque fragment afin qu'il puisse être traité même si le nombre de fragments augmente.
show.py
# -*- coding: utf-8 -*-
import time
import base64
import multiprocessing
from boto import kinesis
import threading
auth = {"aws_access_key_id":"Veuillez saisir la CLÉ D'ACCÈS IAM", "aws_secret_access_key":"Veuillez saisir la CLÉ D'ACCÈS SECRET d'IAM"}
#Le nom du flux
STREAM_NAME='test'
def worker(connect, kinesis_iterator):
next_iterator = kinesis_iterator['ShardIterator']
while True:
response = connect.get_records(next_iterator)
next_iterator = response['NextShardIterator']
time.sleep(1)
#Afficher le contenu écrit dans la partition
for data in response['Records']:
print(data)
def get_record():
connect = kinesis.connect_to_region('us-east-1',**auth)
stream = connect.describe_stream(STREAM_NAME)
#Obtenez une liste de fragments
shards = stream['StreamDescription']['Shards']
#Installez un worker pour chaque partition et obtenez des données
for shard in shards:
kinesis_iterator = connect.get_shard_iterator(STREAM_NAME, shard['ShardId'], shard_iterator_type='TRIM_HORIZON')
job = threading.Thread(target=worker, args=(connect, kinesis_iterator))
job.start()
if __name__ == '__main__':
get_record()
J'ai pu aller comme ça.
Recommended Posts