[PYTHON] Speichern von CSV-Daten in Amazon Kinesis Streams mit Standardeingabe

Dieser Artikel beschreibt ein Python-Skript, das Kinesis-Streams mit Standardeingaben von einem lokalen Host oder EC2 füllt.

Umgebung

・ Amazon Linux ・ Sprache: Python 2.7, Shell -Eingabedatenformat: CSV

Das in Kinesis Streams zu speichernde Python-Skript wird unten beschrieben. In diesem Skript wird es aggregiert und mit 500 Datensätzen / Sekunde * gespeichert. .. (Bitte beachten Sie, dass die Bedeutung der Aggregation nicht bedeutet, dass KPL zum Aggrigieren eines Datensatzes verwendet wird.)

script

buffer_insert.py


import sys
import json
import random
import boto3
import time

def create_json(buffered_data, streamname):
    jdat = {}
    dat = []
    jdat["StreamName"] = streamname      
    for rec in buffered_data :
        dat.append({"Data" : rec, "PartitionKey" : str(random.randint(1,1000))})
    jdat["Records"] = dat
    return jdat

if __name__ == '__main__':

  args = sys.argv
  streamname=args[1]

  cnt = 0
  buf = []

  client = boto3.client('kinesis')
  while 1:
      if len(buf) == 500:
          ret = client.put_records(**create_json(buf,streamname ))
          time.sleep(1)
          print ret
          buf = []
      line = sys.stdin.readline()
      if not line:
          break
      buf.append(line[:-1])

Da das obige Skript die Anmeldeinformationen für den Zugriffsschlüssel und den geheimen Schlüssel nicht festlegt, legen Sie sie gegebenenfalls in client () fest. Weitere Informationen finden Sie im folgenden boto3-Dokument. http://boto3.readthedocs.io/en/latest/guide/configuration.html

Die Ausführung ist wie folgt. -Erstellt von kinesis_streams_test of Streams ・ Geben Sie die Daten test.csv ein

>Lauf
cat test.csv | python buffer_insert.py kinesis_streams_test

>Ergebnis
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265378143459129484557577879554'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888401204447079899259962654738'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265379352384949099186752585730'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265380561310768713815927291906'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265381770236588328445101998082'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265382979162407943074276704258'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888402413372899513889137360914'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888403622298719128518312067090'}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'e8d1fc47-17eb-1f03-b9e4-17c6595f9a22'}}

Unten finden Sie ein Skript, mit dem überprüft werden kann, ob es in Kinesis Streams gespeichert werden kann. (Tragen wir es seitwärts und von der AWS CLI anstelle von Python ... Sie können es natürlich von Python beziehen.)

get_record.sh


#!/bin/bash

stream_name=$1
shard_array_index=${2:-0}
shard_iterator_type=${3:-TRIM_HORIZON}


shard_id=$(aws kinesis describe-stream --stream-name ${stream_name} --query "StreamDescription.Shards[${shard_array_index}].ShardId" --output text)
echo $shard_id
shard_iterator=$(aws kinesis get-shard-iterator --stream-name ${stream_name} --shard-id ${shard_id} --shard-iterator-type ${shard_iterator_type} --query "ShardIterator" --output text)
echo $shard_iterator
aws kinesis get-records --shard-iterator ${shard_iterator}

Da die put_recorded-Daten Base64-codiert sind, muss die Decodierungsverarbeitung auf der Verbraucherseite durchgeführt werden. Die AWS CLI unterstützt base64 nicht, daher müssen Sie einen Base64-Decoder verwenden (z. B. https://www.base64decode.org/).

Nächstes Mal werde ich ein Skript einführen, das Daten mithilfe von KPL aggregiert und in Kinesis Streams speichert.

Recommended Posts

Speichern von CSV-Daten in Amazon Kinesis Streams mit Standardeingabe
Erstellen von CSV-Beispieldaten mit Hypothese
Geben Sie Zaim-Daten mit Logstash in den Amazon Elasticsearch Service ein
[Django] Wie man Eingabewerte im Voraus mit ModelForm angibt
Umgang mit unausgeglichenen Daten
Aufblasen von Daten (Datenerweiterung) mit PyTorch
[Python] Speichern einer CSV-Datei als eindimensionale Array-Daten
Dateneingabe / -ausgabe in Python (CSV, JSON)
So konvertieren Sie csv in tsv in CLI
So arbeiten Sie mit BigQuery in Python
Lesen von CSV-Dateien mit Pandas
Wie man Problemdaten mit Paiza liest
Umgang mit Speicherlecks in matplotlib.pyplot
[REAPER] Wie man Reascript mit Python spielt
Zusammenfassung zum Lesen numerischer Daten mit Python [CSV, NetCDF, Fortran Binary]
Umgang mit Laufzeitfehlern in subprocess.call
So kratzen Sie Pferderenndaten mit Beautiful Soup
So erstellen Sie Daten für CNN (Chainer)
Wie man tkinter mit Python in Pyenv benutzt
Lesen von Zeitreihendaten in PyTorch
Schreiben Sie CSV-Daten mit AWS-Lambda + Python in AWS-S3
So geben Sie "Ketsumaimo" standardmäßig in Python aus
So verbergen Sie Benutzereingaben in der PySimple-Benutzeroberfläche
Verwendung von Fixture in Django zur Eingabe von Beispieldaten für das Benutzermodell
So verbessern Sie die Überwachung von Modellmetriken mit Amazon SageMaker
So konvertieren / wiederherstellen Sie einen String mit [] in Python
Verwendung von xgboost: Mehrklassenklassifizierung mit Irisdaten
So wenden Sie mit matplotlib Marker nur auf bestimmte Daten an
So kratzen Sie Bilddaten von Flickr mit Python
So führen Sie eine Hash-Berechnung mit Salt in Python durch
So konvertieren Sie horizontal gehaltene Daten mit Pandas in vertikal gehaltene Daten
Umgang mit Pyenv-Initialisierungsfehlern bei Fischen 3.1.0
So führen Sie mit OpenCV ein Null-Padding in einer Zeile durch
So führen Sie Tests zusammen mit Python unittest aus
So laden Sie Dateien in Google Drive mit Google Colaboratory
Zugriff mit dem Cache beim Lesen von_json mit Pandas
So erhalten Sie mit SQLAlchemy + MySQLdb mehr als 1000 Daten
So extrahieren Sie mit Pandas Daten, denen der Wert nan nicht fehlt
So geben Sie die CSV eines mehrzeiligen Headers mit Pandas aus
So konvertieren Sie eine JSON-Datei mit Python Pandas in eine CSV-Datei
Wie klicke ich mit der rechten Maustaste über die Tastatureingabe in RPA?
Lesen von CSVs, die in Python nur Ganzzahlen enthalten
Vorgehensweise beim Ausführen von Transaktionen: In Anaconda fehlgeschlagen
So extrahieren Sie mit Pandas Daten, denen der Wert nan nicht fehlt
Lesen von Text mit Standardeingabe oder Dateinamen wie cat in Python
So berechnen Sie die Summe oder den Durchschnitt von Zeitreihen-CSV-Daten in einem Augenblick
[Python / Ruby] Mit Code verstehen Wie man Daten aus dem Internet abruft und in CSV schreibt
[TensorFlow 2 / Keras] Ausführen des Lernens mit CTC Loss in Keras
[Go language] So erhalten Sie Terminaleingaben in Echtzeit
So debuggen Sie eine Standard-Python-Bibliothek in Visual Studio
So geben Sie ein Dokument im PDF-Format mit Sphinx aus
So extrahieren Sie einen Termin in Google Kalender mit Python
So überprüfen Sie das Verhalten von ORM mit einer Datei mit django
Speichern Sie auf Japanisch in StringProperty im Google App Engine-Datenspeicher
[Für Anfänger] Zusammenfassung der Standardeingabe in Python (mit Erklärung)
Einfallsreichtum beim speichersparenden Umgang mit Daten mit Pandas
So manipulieren Sie das DOM im Iframe mit Selen
[AWS] Umgang mit dem Fehler "Ungültiger Codepunkt" in CloudSearch
Für Anfänger, wie man mit häufigen Fehlern in Keras umgeht