Verwenden Sie Python-Programme mit dem Ausführungs-Plugin exec_filter von fluentd

exec_filter Output Plugin (out_ecec_filter) ist im Grunde ein Filterprozess, der mit jedem Programm durchgeführt werden kann. Sie können beispielsweise unnötige Datensätze für den nächsten Prozess löschen oder als Fehler irgendwo speichern. Verwenden wir jetzt ein Python-Programm.

Aufbau

Ich habe es wie folgt versucht.

<match hoge>
  type       exec_filter
  command    /path/to/python -m myapp.fluent_stream_filter

  time_key   time
  in_format  json
  out_format msgpack

  buffer_type file
  buffer_path /path/to/buffer
  buffer_chunk_limit 8m
  buffer_queue_limit 64
  flush_interval 10s

  tag        fuga
</match>

Ich führe es als Modul mit "Befehl" aus, damit der Import gut funktioniert. In diesem Fall muss PYTHONPATH nicht angegeben werden.

Ich verwende einen Dateipuffer, weil ich fluend neu starten muss, wenn ich den Code ändere. Im Gegensatz zum Exec Output-Plug-In wartet das Exec_filter Output-Plug-In nach dem Start auf die Standardeingabe für das angegebene Skript.

Skript

Wie vom exec_filter Output Plugin angefordert, muss das Skript die Standardeingabe übernehmen und das Ergebnis in die Standardausgabe ausgeben.

fluent_stream_filter.py


# coding=utf-8
"""
fließend aus_exec_Vom Filter aufgerufener Code

Verarbeitet zeilenweise von der Standardeingabe und gibt das Ergebnis an die Standardausgabe aus
"""
import json
import logging
import logging.config
import sys
import traceback

import msgpack

logging.config.fileConfig('/path/to/logging.conf')
logger = logging.getLogger('fluent-exec')
logger.propagate = False

def main():
    stdin = sys.stdin
    output(convert(validate(parse(readline(stdin)))))

def readline(stdin):
    for line in stdin:
        yield line

def parse(lines):
    for line in lines:
        yield json.loads(line)

def validate(rows):
    """Entfernung von Fehlerdaten"""
    for row in rows:
        try:
		     #Eine Art Inspektion
        except Exception, e:
            logger.warn(e)
            logger.warn(row)
            continue
        yield row

def convert(rows):
    """Einige Konvertierungsprozesse"""
    for row in rows:
        # do something
        yield row

def output(rows):
	"""Atmen Sie zur Standardausgabe aus"""
    for row in rows:
        sys.stdout.write(msgpack.packb(row))

if __name__ == '__main__':
    logger.info('Start main')
    try:
        main()
    except Exception, e:
        logger.error(traceback.format_exc())
        raise e
    logger.info('End main')

Es ist üblich, "name" an logging.getLogger zu übergeben, aber hier ist der Inhalt "__ main __", da das Skript gestartet wird und der Name des Loggers durch eine Zeichenfolge angegeben wird.

Testcode

Es ist möglicherweise besser, wenn die Methode, die die Standardeingabe verarbeitet, stdio als Parameter empfängt. Ich konnte keine Möglichkeit finden, die Standardeingabe wie im Test zu verwenden. Daher habe ich eine Instanz von StringIO übergeben, damit die Testdaten gelesen werden können.

# coding=utf-8
import types
from StringIO import StringIO

from nose.tools import ok_, eq_

from myapp import fluent_stream_filter


class TestAll(object):
    def test_readline(self):
        """Readline-Test, sys.Übergeben Sie StringIO anstelle von stdin"""
        stdin = StringIO()
        stdin.write(open('/path/to/test_data.log').read())
        stdin.seek(0)

        stream = fluent_stream_filter.readline(stdin)
        ok_(isinstance(stream, types.GeneratorType))
        eq_(stream.next(), u'{Inhalt der ersten Zeile}')

    def test_parse(self):
        """Analysetest"""
        stream = fluent_stream_filter.parse(iter(['{...}', '{...}']))
        ok_(isinstance(stream, types.GeneratorType))
        eq_(stream.next(), {...})
        eq_(stream.next(), {...})

    # (Abkürzung)

Achtung in der Protokollausgabe

Wenn Sie ein anderes Protokoll als das Verarbeitungsergebnis an die Standardausgabe ausspucken, kann es natürlich nicht als MessagePack oder JSON analysiert werden, sodass auf der fließenden Seite ein Fehler auftritt. Um zu verhindern, dass Unfälle wie der Konsolenhandler im Root-Logger festgelegt werden und unbeabsichtigt Protokolle auf die Standardausgabe spucken

logger.propagate = False

Es ist sicher zu tun

Recommended Posts

Verwenden Sie Python-Programme mit dem Ausführungs-Plugin exec_filter von fluentd
Verwenden Sie Python-Programme mit dem exec Output Plugin von fluentd
[Python] Ein Programm, das Treppen mit # erstellt
Ein Programm, das Python zum Abspielen von Junk verwendet
[Python] Generieren Sie ValueObject mit dem vollständigen Konstruktor mithilfe von Datenklassen
Versuchen Sie, Python mit pybind11 in ein C ++ - Programm einzubetten
Registrieren Sie Tickets mit der Redmine-API mithilfe von Python-Anforderungen
Vom Kauf eines Computers bis zur Ausführung eines Programms auf Python
[Python] Erstellen Sie mit cx_Freeze eine Verteilungsdatei für das Tkinter-Programm
Erstellen Sie ein Wox-Plugin (Python)
[S3] CRUD mit S3 unter Verwendung von Python [Python]
Verwenden von Quaternion mit Python ~ numpy-quaternion ~
Probieren Sie die Python-Ausgabe mit Haxe 3.2 aus
[Python] Verwenden von OpenCV mit Python (Basic)
Verwenden eines Druckers mit Debian 10
Machen Sie eine Lotterie mit Python
Erstellen Sie ein Verzeichnis mit Python
Verwenden von OpenCV mit Python @Mac
Senden Sie mit Python mit Google Mail
Ich habe schnell ein Programm geschrieben, um DI mit Python zu lernen
[Python] Kapitel 01-03 Über Python (Schreiben und Ausführen eines Programms mit PyCharm)
Vervollständigung von Python mit Emacs mit Company-Jedi
Konvertieren Sie in eine Zeichenfolge, während Sie die Standardausgabe mit dem Python-Unterprozess ausgeben
Harmonischer Mittelwert von Python (mit SciPy)
[Python] Was ist eine with-Anweisung?
Starten eines mit Jupyter Notebook erstellten Python-Programms
Löse ABC163 A ~ C mit Python
Steuern Sie den Motor mit einem Motortreiber mit Python auf Raspberry Pi 3!
Bedienen Sie den Belegdrucker mit Python
Python-Grafikhandbuch mit Matplotlib.
[Python] Verwenden von OpenCV mit Python (Bildfilterung)
Ich habe einen Line-Bot mit Python gemacht!
Erstellen Sie mit tkinter eine Python-GUI
[Python] Verwenden von OpenCV mit Python (Bildtransformation)
Lassen Sie uns eine GUI mit Python erstellen.
Zeichnen einer Silbersteinkurve mit Python
Löse ABC166 A ~ D mit Python
[Python] Verwenden von OpenCV mit Python (Kantenerkennung)
Ausgabe in eine CSV-Datei mit Python
Eingabe / Ausgabe mit Python (Python-Lernnotiz ⑤)
Debuggen Sie das Python-Multiprozessprogramm mit VSCode
Erstellen Sie eine virtuelle Umgebung mit Python!
Ich habe mit Python eine Lotterie gemacht.
Erstellen einer virtuellen Umgebung mit Python 3
Installieren Sie das Python-Plug-In mit Netbeans 8.0.2
Löse ABC168 A ~ C mit Python
[Hinweis] Hallo Weltausgabe mit Python
Unit Test Log Ausgabe mit Python
Erstellen Sie ein Empfehlungssystem mit Python
[Ev3dev] Erstellen Sie ein Programm, das das LCD (Bildschirm) mit Python erfasst
Lesen Sie die Datei in Python mit einem relativen Pfad aus dem Programm
Schreiben Sie das Vim-Plugin in Python
[Python] Generiere ein Passwort mit Slackbot
Löse ABC162 A ~ C mit Python
Ich habe versucht, eine ToDo-App mit einer Flasche mit Python zu erstellen
Hinweise zur Verwendung von rstrip mit Python.
Löse ABC167 A ~ C mit Python
Lassen Sie uns ein Diagramm mit Python erstellen! !!
Erstellen Sie eine Python-Ausführungsumgebung mithilfe der GPU mit der GCP Compute Engine