exec_filter Output Plugin (out_ecec_filter) ist im Grunde ein Filterprozess, der mit jedem Programm durchgeführt werden kann. Sie können beispielsweise unnötige Datensätze für den nächsten Prozess löschen oder als Fehler irgendwo speichern. Verwenden wir jetzt ein Python-Programm.
Ich habe es wie folgt versucht.
<match hoge>
type exec_filter
command /path/to/python -m myapp.fluent_stream_filter
time_key time
in_format json
out_format msgpack
buffer_type file
buffer_path /path/to/buffer
buffer_chunk_limit 8m
buffer_queue_limit 64
flush_interval 10s
tag fuga
</match>
Ich führe es als Modul mit "Befehl" aus, damit der Import gut funktioniert. In diesem Fall muss PYTHONPATH nicht angegeben werden.
Ich verwende einen Dateipuffer, weil ich fluend neu starten muss, wenn ich den Code ändere. Im Gegensatz zum Exec Output-Plug-In wartet das Exec_filter Output-Plug-In nach dem Start auf die Standardeingabe für das angegebene Skript.
Wie vom exec_filter Output Plugin angefordert, muss das Skript die Standardeingabe übernehmen und das Ergebnis in die Standardausgabe ausgeben.
fluent_stream_filter.py
# coding=utf-8
"""
fließend aus_exec_Vom Filter aufgerufener Code
Verarbeitet zeilenweise von der Standardeingabe und gibt das Ergebnis an die Standardausgabe aus
"""
import json
import logging
import logging.config
import sys
import traceback
import msgpack
logging.config.fileConfig('/path/to/logging.conf')
logger = logging.getLogger('fluent-exec')
logger.propagate = False
def main():
stdin = sys.stdin
output(convert(validate(parse(readline(stdin)))))
def readline(stdin):
for line in stdin:
yield line
def parse(lines):
for line in lines:
yield json.loads(line)
def validate(rows):
"""Entfernung von Fehlerdaten"""
for row in rows:
try:
#Eine Art Inspektion
except Exception, e:
logger.warn(e)
logger.warn(row)
continue
yield row
def convert(rows):
"""Einige Konvertierungsprozesse"""
for row in rows:
# do something
yield row
def output(rows):
"""Atmen Sie zur Standardausgabe aus"""
for row in rows:
sys.stdout.write(msgpack.packb(row))
if __name__ == '__main__':
logger.info('Start main')
try:
main()
except Exception, e:
logger.error(traceback.format_exc())
raise e
logger.info('End main')
Es ist üblich, "name" an logging.getLogger zu übergeben, aber hier ist der Inhalt "__ main __", da das Skript gestartet wird und der Name des Loggers durch eine Zeichenfolge angegeben wird.
Es ist möglicherweise besser, wenn die Methode, die die Standardeingabe verarbeitet, stdio als Parameter empfängt. Ich konnte keine Möglichkeit finden, die Standardeingabe wie im Test zu verwenden. Daher habe ich eine Instanz von StringIO übergeben, damit die Testdaten gelesen werden können.
# coding=utf-8
import types
from StringIO import StringIO
from nose.tools import ok_, eq_
from myapp import fluent_stream_filter
class TestAll(object):
def test_readline(self):
"""Readline-Test, sys.Übergeben Sie StringIO anstelle von stdin"""
stdin = StringIO()
stdin.write(open('/path/to/test_data.log').read())
stdin.seek(0)
stream = fluent_stream_filter.readline(stdin)
ok_(isinstance(stream, types.GeneratorType))
eq_(stream.next(), u'{Inhalt der ersten Zeile}')
def test_parse(self):
"""Analysetest"""
stream = fluent_stream_filter.parse(iter(['{...}', '{...}']))
ok_(isinstance(stream, types.GeneratorType))
eq_(stream.next(), {...})
eq_(stream.next(), {...})
# (Abkürzung)
Wenn Sie ein anderes Protokoll als das Verarbeitungsergebnis an die Standardausgabe ausspucken, kann es natürlich nicht als MessagePack oder JSON analysiert werden, sodass auf der fließenden Seite ein Fehler auftritt. Um zu verhindern, dass Unfälle wie der Konsolenhandler im Root-Logger festgelegt werden und unbeabsichtigt Protokolle auf die Standardausgabe spucken
logger.propagate = False
Es ist sicher zu tun
Recommended Posts