Using a Python program with fluentd's exec_filter Output Plugin

exec_filter Output Plugin (out_ecec_filter) is basically a filter that can be filtered using any program. For example, you can drop an unnecessary record for the next process, or save it somewhere as an error. Now let's use a Python program.

Setting

I tried as follows.

<match hoge>
  type       exec_filter
  command    /path/to/python -m myapp.fluent_stream_filter

  time_key   time
  in_format  json
  out_format msgpack

  buffer_type file
  buffer_path /path/to/buffer
  buffer_chunk_limit 8m
  buffer_queue_limit 64
  flush_interval 10s

  tag        fuga
</match>

I'm running it as a module with command so that import works well. In this case, it is not necessary to specify PYTHONPATH.

I'm using a file buffer because I need to restart fluend when I modify the code. The exec_filter Output plug-in is different from the exec Output plug-in, and the specified script waits for standard input after startup.

script

As requested by the exec_filter Output Plugin, the script must take standard input and output the result to standard output.

fluent_stream_filter.py


# coding=utf-8
"""
fluentd out_exec_Code called by filter

Processes line by line from standard input and outputs the result to standard output
"""
import json
import logging
import logging.config
import sys
import traceback

import msgpack

logging.config.fileConfig('/path/to/logging.conf')
logger = logging.getLogger('fluent-exec')
logger.propagate = False

def main():
    stdin = sys.stdin
    output(convert(validate(parse(readline(stdin)))))

def readline(stdin):
    for line in stdin:
        yield line

def parse(lines):
    for line in lines:
        yield json.loads(line)

def validate(rows):
    """Error data removal"""
    for row in rows:
        try:
		     #Some kind of inspection
        except Exception, e:
            logger.warn(e)
            logger.warn(row)
            continue
        yield row

def convert(rows):
    """Some conversion process"""
    for row in rows:
        # do something
        yield row

def output(rows):
	"""Exhale to standard output"""
    for row in rows:
        sys.stdout.write(msgpack.packb(row))

if __name__ == '__main__':
    logger.info('Start main')
    try:
        main()
    except Exception, e:
        logger.error(traceback.format_exc())
        raise e
    logger.info('End main')

It is customary to pass __name__ to logging.getLogger, but here the content is'__main__' because the script is started, so the name of the logger is specified by a character string.

Test code

It may be better for methods that handle standard input to receive stdio as a parameter. I couldn't find a way to use the standard input as it is in the test, so I passed an instance of StringIO so that I could read the test data.

# coding=utf-8
import types
from StringIO import StringIO

from nose.tools import ok_, eq_

from myapp import fluent_stream_filter


class TestAll(object):
    def test_readline(self):
        """readline test, sys.Pass StringIO instead of stdin"""
        stdin = StringIO()
        stdin.write(open('/path/to/test_data.log').read())
        stdin.seek(0)

        stream = fluent_stream_filter.readline(stdin)
        ok_(isinstance(stream, types.GeneratorType))
        eq_(stream.next(), u'{Contents of the first line}')

    def test_parse(self):
        """parse test"""
        stream = fluent_stream_filter.parse(iter(['{...}', '{...}']))
        ok_(isinstance(stream, types.GeneratorType))
        eq_(stream.next(), {...})
        eq_(stream.next(), {...})

    # (Abbreviation)

Attention in log output

If you spit out a log other than the processing result to the standard output, for example, you cannot parse it as MessagePack or JSON, so an error will occur on the fluentd side. To prevent accidents such as the ConsoleHandler being set in the root logger and unintentionally spitting logs to standard output

logger.propagate = False

It is safe to do

Recommended Posts

Using a Python program with fluentd's exec_filter Output Plugin
Using a python program with fluentd's exec Output Plugin
[Python] A program that creates stairs with #
A program that plays rock-paper-scissors using Python
[Python] Create a ValueObject with a complete constructor using dataclasses
Try embedding Python in a C ++ program with pybind11
Register a ticket with redmine API using python requests
From buying a computer to running a program with python
A program to write Lattice Hinge with Rhinoceros with Python
[Python] Create a Tkinter program distribution file with cx_Freeze
Create a company name extractor with python using JCLdic
Create a Wox plugin (Python)
[S3] CRUD with S3 using Python [Python]
Using Quaternion with Python ~ numpy-quaternion ~
Try Python output with Haxe 3.2
[Python] Using OpenCV with Python (Basic)
Using a printer with Debian 10
Make a fortune with Python
Create a directory with python
Using OpenCV with Python @Mac
Send using Python with Gmail
I wrote a program quickly to study DI with Python ①
[Python] Chapter 01-03 About Python (Write and execute a program using PyCharm)
Complement python with emacs using company-jedi
Convert to a string while outputting standard output with Python subprocess
Harmonic mean with Python Harmonic mean (using SciPy)
[Python] What is a with statement?
How to batch start a python program created with Jupyter notebook
Solve ABC163 A ~ C with Python
Control the motor with a motor driver using python on Raspberry Pi 3!
Operate a receipt printer with python
A python graphing manual with Matplotlib.
[Python] Using OpenCV with Python (Image Filtering)
I made a Line-bot using Python!
Create a python GUI using tkinter
[Python] Using OpenCV with Python (Image transformation)
Let's make a GUI with python.
Drawing a silverstone curve using python
Solve ABC166 A ~ D with Python
[Python] Using OpenCV with Python (Edge Detection)
Output to csv file with Python
Input / output with Python (Python learning memo ⑤)
Debug python multiprocess program with VSCode
Create a virtual environment with Python!
I made a fortune with Python.
Building a virtual environment with Python 3
Install the Python plugin with Netbeans 8.0.2
Solve ABC168 A ~ C with Python
[Note] Hello world output with python
Unit test log output with python
Make a recommender system with python
[Ev3dev] Create a program that captures the LCD (screen) using python
Read a file in Python with a relative path from the program
Write a vim plugin in Python
[Python] Generate a password with Slackbot
Solve ABC162 A ~ C with Python
I tried to make a todo application using bottle with python
Notes on using rstrip with python.
Solve ABC167 A ~ C with Python
Let's make a graph with python! !!
Build a Python execution environment using GPU with GCP Compute engine