Using a python program with fluentd's exec Output Plugin

It's been a week since I started using it in actual battles, but fluentd is very convenient. Then pass the data to the Python program with the exec Output Plugin.

Start command

Here is an example of Python in the Documentation (http://docs.fluentd.org/articles/out_exec):

td-agent.conf


<match fizzbuzz>
  type exec
  command python /path/to/fizzbuzz.py
  buffer_path    /path/to/buffer_path
  format tsv
  keys fizzbuzz
  flush_interval 5s # for debugging/checking
</match>

However, when I actually use it, I want to specify Python that I put in virtualenv, and I was wondering what happened to PYTHONPATH, so I tried to specify `` `command```.

td-agent.conf


  #python specifies what you put in virtualenv
  #My code myapp is setup.Write py and install the package in advance
  command /path/to/myapp/bin/python −m myapp.command.xxxx arg1

Now you don't have to worry about PYTHONPATH and you don't have to write a long file path. Let's write setup.py without any hassle.

Trying to set an environment variable

td-agent.conf


  #Specify environment variables(Get an error)
  command HOGE=FUGA /path/to/myapp/bin/python -m myapp.command.xxx arg1

Then I got the following error. If you want to pass something, use the argument. I saw 32512 for the first time.

td-agent.log


2014-08-03 17:59:54 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2014-08-03 17:59:57 +0900 error_class="RuntimeError" error="command returns 32512: HOGE=FUGA /path/to/myapp/bin/python /path/to/myapp/myapp/command/xxx.py /path/to/buffer_path/.20140803.q4ffb5d85bf81f0d4.log"

Executable file

The path of the buffer file is passed at the end of the argument. The code that reads and processes line by line looks like this. If you want to call each process from the test code, separate the methods. You can write beautifully by stream processing from file reading to the final processing, and with PyMongo you can pass the generator to the insert method of Collection as it is, so performance is also good.

/path/to/myapp/myapp/command/xxx.py


# coding=utf-8

import json
import logging
import logging.config
import traceback

logging.config.fileConfig('logging.conf')
logger = logging.getLogger('fluent-exec-out')

def main():
	file_path = parse_args()
    do_something(exclude_error_row(parse(readline(file_path))))

def parse_args():
	return sys.argv[-1]

def readline(file_path):
    with file(file_path) as input:
        for line in input:
            yield line

def parse(lines):
	#When the input format is json
    for line in lines:
        yield json.loads(line)

def exclude_error_row(rows):
    for row in rows:
        #Validation and log output
        if xxxxxx:
			logger.warn("Invalid line found %s", row)
			continue
       yield row

def do_something(rows):
	#Do something


if __name__ == '__main__':
	logger.info('Start')
	try:
		main()
	except:
		logger.error(traceback.format_exc())
	logger.info('End')

Test code

To test the main method, create a file in the same format as the buffer file passed from fluentd and pass it as an argument.

test_xxx.py


# coding=utf-8

from nose.tools import ok_, eq_

from myapp.command import xxx

original_argv = sys.argv

class TestAll(object):
	def teardown(self):
		sys.argv = original_argv

	def test_main(self):
		sys.argv.append('./tests/data/fluentd_buffer.log')
		xxx.main()
		#Test something

	def test_readline(self):
		gen = xxx.readline('./tests/data/fluentd_buffer.log'):
		#Test something

Log output

It might be cool to use something like fluent-logger-python and pass the log to fluentd.

Filtering

In the above code, filtering is also done, but it may be beautiful for fluentd to do it with the previous match using exec_filter Output Plugin. Either way, it's a matter of taste.

If it's about filtering, I think it's okay to combine them with one exec Output, but if you pack too many processes, you will be more worried about what to do if the processes are messed up in the middle. It would be better to separate the exec Output into separate code calls.

Recommended Posts

Using a python program with fluentd's exec Output Plugin
Using a Python program with fluentd's exec_filter Output Plugin
[Python] A program that creates stairs with #
A program that plays rock-paper-scissors using Python
[Python] Create a ValueObject with a complete constructor using dataclasses
Register a ticket with redmine API using python requests
A program to write Lattice Hinge with Rhinoceros with Python
[Python] Create a Tkinter program distribution file with cx_Freeze
Create a company name extractor with python using JCLdic
Create a Wox plugin (Python)
[S3] CRUD with S3 using Python [Python]
Using Quaternion with Python ~ numpy-quaternion ~
Try Python output with Haxe 3.2
[Python] Using OpenCV with Python (Basic)
Using a printer with Debian 10
Make a fortune with Python
Create a directory with python
Using OpenCV with Python @Mac
Send using Python with Gmail
I wrote a program quickly to study DI with Python ①
[Python] Chapter 01-03 About Python (Write and execute a program using PyCharm)
Complement python with emacs using company-jedi
Convert to a string while outputting standard output with Python subprocess
Harmonic mean with Python Harmonic mean (using SciPy)
[Python] What is a with statement?
How to batch start a python program created with Jupyter notebook
Solve ABC163 A ~ C with Python
Control the motor with a motor driver using python on Raspberry Pi 3!
Operate a receipt printer with python
A python graphing manual with Matplotlib.
[Python] Using OpenCV with Python (Image Filtering)
I made a Line-bot using Python!
Using Rstan from Python with PypeR
Create a python GUI using tkinter
[Python] Using OpenCV with Python (Image transformation)
Let's make a GUI with python.
Drawing a silverstone curve using python
Solve ABC166 A ~ D with Python
[Python] Using OpenCV with Python (Edge Detection)
Output to csv file with Python
Input / output with Python (Python learning memo ⑤)
Debug python multiprocess program with VSCode
Building a virtual environment with Python 3
Install the Python plugin with Netbeans 8.0.2
Solve ABC168 A ~ C with Python
[Note] Hello world output with python
Unit test log output with python
Make a recommender system with python
[Ev3dev] Create a program that captures the LCD (screen) using python
Read a file in Python with a relative path from the program
Write a vim plugin in Python
[Python] Generate a password with Slackbot
I tried to make a todo application using bottle with python
Solve ABC167 A ~ C with Python
Solve ABC158 A ~ C with Python
[Python] Read a csv file with a large data size using a generator
[Note] Using 16x2-digit character LCD (1602A) from Python with Raspberry Pi
Interactively output BPE using python curses
Build a Python execution environment using GPU with GCP Compute engine
I made a poker game server chat-holdem using websocket with python
Let's make a web chat using WebSocket with AWS serverless (Python)!