Cet article décrit un script python qui remplit Kinesis Streams avec une entrée standard d'un hôte local ou EC2.
・ Amazon Linux ・ Langage: python 2.7, shell -Format des données d'entrée: CSV
Le script python à stocker dans Kinesis Streams est décrit ci-dessous. Dans ce script, il est agrégé et stocké à 500 enregistrements / seconde *. .. (Veuillez noter que le sens de l'agrégation ne signifie pas que KPL est utilisé pour agréger un enregistrement.)
script
buffer_insert.py
import sys
import json
import random
import boto3
import time
def create_json(buffered_data, streamname):
jdat = {}
dat = []
jdat["StreamName"] = streamname
for rec in buffered_data :
dat.append({"Data" : rec, "PartitionKey" : str(random.randint(1,1000))})
jdat["Records"] = dat
return jdat
if __name__ == '__main__':
args = sys.argv
streamname=args[1]
cnt = 0
buf = []
client = boto3.client('kinesis')
while 1:
if len(buf) == 500:
ret = client.put_records(**create_json(buf,streamname ))
time.sleep(1)
print ret
buf = []
line = sys.stdin.readline()
if not line:
break
buf.append(line[:-1])
Puisque les informations critiques de la clé d'accès et de la clé secrète ne sont pas définies dans le script ci-dessus, veuillez les définir dans client () si nécessaire. Voir le document boto3 ci-dessous pour plus de détails. http://boto3.readthedocs.io/en/latest/guide/configuration.html
L'exécution est la suivante. -Créé par kinesis_streams_test of Streams ・ Entrez les données test.csv
>Courir
cat test.csv | python buffer_insert.py kinesis_streams_test
>résultat
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265378143459129484557577879554'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888401204447079899259962654738'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265379352384949099186752585730'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265380561310768713815927291906'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265381770236588328445101998082'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265382979162407943074276704258'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888402413372899513889137360914'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888403622298719128518312067090'}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'e8d1fc47-17eb-1f03-b9e4-17c6595f9a22'}}
Vous trouverez ci-dessous un script pour vérifier s'il peut être stocké dans Kinesis Streams. (Portons-le sur le côté et à partir de l'AWS CLI au lieu de python ... vous pouvez bien sûr l'obtenir à partir de python.)
get_record.sh
#!/bin/bash
stream_name=$1
shard_array_index=${2:-0}
shard_iterator_type=${3:-TRIM_HORIZON}
shard_id=$(aws kinesis describe-stream --stream-name ${stream_name} --query "StreamDescription.Shards[${shard_array_index}].ShardId" --output text)
echo $shard_id
shard_iterator=$(aws kinesis get-shard-iterator --stream-name ${stream_name} --shard-id ${shard_id} --shard-iterator-type ${shard_iterator_type} --query "ShardIterator" --output text)
echo $shard_iterator
aws kinesis get-records --shard-iterator ${shard_iterator}
Puisque les données put_recorded sont codées en Base64, il est nécessaire d'effectuer un traitement de décodage côté consommateur. L'AWS CLI ne prend pas en charge base64, vous devez donc utiliser un décodeur Base64 (tel que https://www.base64decode.org/).
La prochaine fois, je présenterai un script qui agrège les données à l'aide de KPL et les stocke dans Kinesis Streams.
Recommended Posts