Übrigens ist diesmal der Name der Scherbe "Test" und die Nummer ist eins.
Main.py
#-*- coding: utf-8 -*-
from boto import kinesis
auth = {"aws_access_key_id":"Bitte geben Sie den IAM ACCESS KEY ein", "aws_secret_access_key":"Bitte geben Sie den SECRET ACCESS KEY von IAM ein"}
if __name__ == '__main__':
# kinesis.connect_to_region('Region',IAM-Anmeldeinformationen)
Connection = kinesis.connect_to_region('us-east-1',**auth)
while true:
#Schreiben
# Connection.put_record(Stream-Name,Zu schreibende Daten, PartitionKey)
put_response = Connection.put_record('test' , "hogehoge" , 'one')
sleep(10)
nur das.
An jedem Shard ist ein Arbeiter angebracht, damit er auch dann verarbeitet werden kann, wenn die Anzahl der Shards zunimmt.
show.py
# -*- coding: utf-8 -*-
import time
import base64
import multiprocessing
from boto import kinesis
import threading
auth = {"aws_access_key_id":"Bitte geben Sie den IAM ACCESS KEY ein", "aws_secret_access_key":"Bitte geben Sie den SECRET ACCESS KEY von IAM ein"}
#Der Name des Streams
STREAM_NAME='test'
def worker(connect, kinesis_iterator):
next_iterator = kinesis_iterator['ShardIterator']
while True:
response = connect.get_records(next_iterator)
next_iterator = response['NextShardIterator']
time.sleep(1)
#Zeigen Sie den in Shard geschriebenen Inhalt an
for data in response['Records']:
print(data)
def get_record():
connect = kinesis.connect_to_region('us-east-1',**auth)
stream = connect.describe_stream(STREAM_NAME)
#Holen Sie sich eine Liste der Scherben
shards = stream['StreamDescription']['Shards']
#Installieren Sie einen Worker für jeden Shard und rufen Sie Daten ab
for shard in shards:
kinesis_iterator = connect.get_shard_iterator(STREAM_NAME, shard['ShardId'], shard_iterator_type='TRIM_HORIZON')
job = threading.Thread(target=worker, args=(connect, kinesis_iterator))
job.start()
if __name__ == '__main__':
get_record()
Ich konnte so gehen.