Dieser Artikel beschreibt ein Python-Skript, das Kinesis-Streams mit Standardeingaben von einem lokalen Host oder EC2 füllt.
・ Amazon Linux ・ Sprache: Python 2.7, Shell -Eingabedatenformat: CSV
Das in Kinesis Streams zu speichernde Python-Skript wird unten beschrieben. In diesem Skript wird es aggregiert und mit 500 Datensätzen / Sekunde * gespeichert. .. (Bitte beachten Sie, dass die Bedeutung der Aggregation nicht bedeutet, dass KPL zum Aggrigieren eines Datensatzes verwendet wird.)
script
buffer_insert.py
import sys
import json
import random
import boto3
import time
def create_json(buffered_data, streamname):
jdat = {}
dat = []
jdat["StreamName"] = streamname
for rec in buffered_data :
dat.append({"Data" : rec, "PartitionKey" : str(random.randint(1,1000))})
jdat["Records"] = dat
return jdat
if __name__ == '__main__':
args = sys.argv
streamname=args[1]
cnt = 0
buf = []
client = boto3.client('kinesis')
while 1:
if len(buf) == 500:
ret = client.put_records(**create_json(buf,streamname ))
time.sleep(1)
print ret
buf = []
line = sys.stdin.readline()
if not line:
break
buf.append(line[:-1])
Da das obige Skript die Anmeldeinformationen für den Zugriffsschlüssel und den geheimen Schlüssel nicht festlegt, legen Sie sie gegebenenfalls in client () fest. Weitere Informationen finden Sie im folgenden boto3-Dokument. http://boto3.readthedocs.io/en/latest/guide/configuration.html
Die Ausführung ist wie folgt. -Erstellt von kinesis_streams_test of Streams ・ Geben Sie die Daten test.csv ein
>Lauf
cat test.csv | python buffer_insert.py kinesis_streams_test
>Ergebnis
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265378143459129484557577879554'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888401204447079899259962654738'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265379352384949099186752585730'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265380561310768713815927291906'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265381770236588328445101998082'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265382979162407943074276704258'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888402413372899513889137360914'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888403622298719128518312067090'}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'e8d1fc47-17eb-1f03-b9e4-17c6595f9a22'}}
Unten finden Sie ein Skript, mit dem überprüft werden kann, ob es in Kinesis Streams gespeichert werden kann. (Tragen wir es seitwärts und von der AWS CLI anstelle von Python ... Sie können es natürlich von Python beziehen.)
get_record.sh
#!/bin/bash
stream_name=$1
shard_array_index=${2:-0}
shard_iterator_type=${3:-TRIM_HORIZON}
shard_id=$(aws kinesis describe-stream --stream-name ${stream_name} --query "StreamDescription.Shards[${shard_array_index}].ShardId" --output text)
echo $shard_id
shard_iterator=$(aws kinesis get-shard-iterator --stream-name ${stream_name} --shard-id ${shard_id} --shard-iterator-type ${shard_iterator_type} --query "ShardIterator" --output text)
echo $shard_iterator
aws kinesis get-records --shard-iterator ${shard_iterator}
Da die put_recorded-Daten Base64-codiert sind, muss die Decodierungsverarbeitung auf der Verbraucherseite durchgeführt werden. Die AWS CLI unterstützt base64 nicht, daher müssen Sie einen Base64-Decoder verwenden (z. B. https://www.base64decode.org/).
Nächstes Mal werde ich ein Skript einführen, das Daten mithilfe von KPL aggregiert und in Kinesis Streams speichert.
Recommended Posts