Es ist eine Woche her, seit ich angefangen habe, es in echten Schlachten einzusetzen, aber fließend ist sehr praktisch. Versuchen Sie dann, die Daten mit dem exec Output Plugin an das Python-Programm zu übergeben.
Hier ist ein Beispiel für Python in der Dokumentation (http://docs.fluentd.org/articles/out_exec):
td-agent.conf
<match fizzbuzz>
type exec
command python /path/to/fizzbuzz.py
buffer_path /path/to/buffer_path
format tsv
keys fizzbuzz
flush_interval 5s # for debugging/checking
</match>
Wenn ich es jedoch tatsächlich benutze, möchte ich den Python angeben, den ich in virtualenv eingefügt habe, und ich habe mich gefragt, was mit PYTHONPATH passiert ist, also habe ich versucht, `` `command``` anzugeben.
td-agent.conf
#Python gibt an, was Sie in virtualenv einfügen
#Mein Code myapp ist eingerichtet.Schreiben Sie py und installieren Sie das Paket im Voraus
command /path/to/myapp/bin/python −m myapp.command.xxxx arg1
Jetzt müssen Sie sich keine Sorgen mehr um PYTHONPATH machen und keinen langen Dateipfad schreiben. Lassen Sie uns setup.py ohne Probleme schreiben.
Es wird versucht, eine Umgebungsvariable festzulegen
td-agent.conf
#Geben Sie die Umgebungsvariable an(Erhalten Sie einen Fehler)
command HOGE=FUGA /path/to/myapp/bin/python -m myapp.command.xxx arg1
Dann habe ich folgenden Fehler bekommen. Wenn Sie etwas übergeben möchten, verwenden Sie das Argument. Ich habe 32512 zum ersten Mal gesehen.
td-agent.log
2014-08-03 17:59:54 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2014-08-03 17:59:57 +0900 error_class="RuntimeError" error="command returns 32512: HOGE=FUGA /path/to/myapp/bin/python /path/to/myapp/myapp/command/xxx.py /path/to/buffer_path/.20140803.q4ffb5d85bf81f0d4.log"
Der Pfad der Pufferdatei wird am Ende des Arguments übergeben. Der Code, der Zeile für Zeile liest und verarbeitet, sieht folgendermaßen aus. Wenn Sie jeden Prozess aus dem Testcode aufrufen möchten, trennen Sie die Methoden. Wenn Sie die Stream-Verarbeitung vom Lesen der Datei bis zur endgültigen Verarbeitung durchführen, können Sie wunderbar schreiben, und mit PyMongo können Sie den Generator so wie er ist an die Einfügemethode der Sammlung übergeben, sodass auch die Leistung gut ist.
/path/to/myapp/myapp/command/xxx.py
# coding=utf-8
import json
import logging
import logging.config
import traceback
logging.config.fileConfig('logging.conf')
logger = logging.getLogger('fluent-exec-out')
def main():
file_path = parse_args()
do_something(exclude_error_row(parse(readline(file_path))))
def parse_args():
return sys.argv[-1]
def readline(file_path):
with file(file_path) as input:
for line in input:
yield line
def parse(lines):
#Wenn das Eingabeformat json ist
for line in lines:
yield json.loads(line)
def exclude_error_row(rows):
for row in rows:
#Validierung und Protokollausgabe
if xxxxxx:
logger.warn("Invalid line found %s", row)
continue
yield row
def do_something(rows):
#Etwas tun
if __name__ == '__main__':
logger.info('Start')
try:
main()
except:
logger.error(traceback.format_exc())
logger.info('End')
Um die Hauptmethode zu testen, erstellen Sie eine Datei im selben Format wie die von fluentd übergebene Pufferdatei und übergeben Sie sie als Argument.
test_xxx.py
# coding=utf-8
from nose.tools import ok_, eq_
from myapp.command import xxx
original_argv = sys.argv
class TestAll(object):
def teardown(self):
sys.argv = original_argv
def test_main(self):
sys.argv.append('./tests/data/fluentd_buffer.log')
xxx.main()
#Teste etwas
def test_readline(self):
gen = xxx.readline('./tests/data/fluentd_buffer.log'):
#Teste etwas
Es könnte cool sein, etwas wie fluent-logger-python zu verwenden und das Protokoll an fluentd zu übergeben.
Im obigen Code wird auch gefiltert, aber es kann für Fluentd schön sein, dies mit der vorherigen Übereinstimmung mit dem Exec_filter Output Plugin zu tun. In jedem Fall ist es Geschmackssache.
Ich denke, es ist in Ordnung, eine Exec-Ausgabe zum Filtern zu verwenden, aber wenn Sie zu viele Prozesse packen, werden Sie sich mehr Sorgen darüber machen, was zu tun ist, wenn sich die Prozesse in der Mitte befinden. Es wäre besser, die Exec-Ausgabe in separate Codeaufrufe zu unterteilen.
Recommended Posts