In the process of creating a system that uses fluentd, I was wondering how to test that data was sent to fluentd in the first place, so I investigated various things.
In the process of creating a system that uses fluentd, I want to write a quick test to confirm that data is being sent to fluentd.
I want to shorten the test time as much as possible when performing CI, so I want to link the data thrown by FluentLogger to output in real time as much as possible.
I used to write the code to open the socket and wait by myself, but since the version went up and SSL became involved and it became troublesome, I would like to do something with plugin as much as possible.
Since I often use docker to set up surrounding middleware when turning CI, I tried putting Elasticsearch in the middleware kit once before, but I purposely built Elasticsearch that I did not call directly and elasticsearch-ruby for testing It's also lazy to put it in and connect it. .. .. Especially if it is an official image, it does not happen on a single node, and it is troublesome to follow and update with all microservices when the version changes, so I would like to test without additional middleware as much as possible.
Fluentd buffering is always done while tuning the value of flush_interval
, but for testing it seems that you can go with flush_mode = immediate
This time I thought about whether I could make good use of Redis, which is also used in ActiveJob etc., but I found a good plugin in the Official plugin list, so I tried it. Try.
There was also MySQL, but the data structure sent from FluentLogger is rather different, so the redis plugin that puts it in as it is was more suitable.
We will proceed with the necessary Gem installation and fluentd settings while considering the method.
As for gem, it is enough if there is a plugin of fluent-plugin-redis-store in addition to the main body, so I will put it all together
command
gem install fluentd fluent-plugin-redis-store --no-document
I will omit the installation of Redis, but I will build redis using brew or docker.
Since fluentd is received from forward and sent to redis_store, fluent.conf looks like this.
fluent.conf
<source>
@type forward
@id input1
@label @mainstream
port 24224
</source>
<label @mainstream>
<filter **>
@type record_transformer
<record>
tag ${tag}
</record>
</filter>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type redis_store
key_path tag
<buffer>
flush_mode immediate
</buffer>
</store>
</match>
</label>
What I'm doing is like this.
@mainstream
@ mainstream
as a filter@mainstream
as match to stdout and redis_storeflush_mode immediate
It's a bonus to put it on stdout, but if you put it out, it's easy to debug later.
Write this to ./fluent.conf
and start fluentd
fluentd -c fluent.conf
Now that I'm ready, I'd like to throw in the data, but it's easier to understand the movement if you monitor Redis, so open another window and monitor it.
redis-cli monitor
Where to send data to fluentd, the actual application sends data using fluent-logger-ruby, but this time fluent-cat Try using
as a substitute
echo '{ "key" : "sample" }' | bundle exe fluent-cat debug.test
This is a command that sends the data with the tag debug.test
and the time stamp of the current time to the data{"key": "sample"}
. If not set, it will be addressed to localhost, so the data will be sent to the fluentd server built above.
Then, the following output will be output to the standard output on the fluentd side.
fluentd run window
2020-11-03 09:37:28.016591000 +0900 debug.test: {"key":"sample","tag":"debug.test"}
This is output by @type stdout
set in fluent.conf
, and it can be confirmed that the filter is tagged and has this form after actually receiving the data.
Furthermore, looking at the window that monitors Redis, it is displayed as follows.
redis-cli run window
1604363848.025602 [0 172.28.0.1:40998] "zadd" "debug.test" "1604363848.016591" "{\"key\":\"sample\",\"tag\":\"debug.test\"}"
It is zadded as described in the readme of the plugin. The setting of key_path tag
is effective for key, and debug.test
, which is the value of tag included in the data, is set.
Try to get the data from Redis.
redis-cli zrange debug.test 0 -1 withscores
1) "{\"key\":\"sample\",\"tag\":\"debug.test\"}"
2) "1604363848.016591"
Since the time stamp is a score, it is sorted so that you can put in a lot.
By the way, it looks like this with ruby.
require 'redis'
Redis.new.zrange 'debug.test', 0, -1, withscores: true
output
=> [["{\"key\":\"sample\",\"tag\":\"debug.test\"}", 1604363848.016591]]
For the time being, the value can be taken simply, so it seems to be relatively easy to use.
When executing tests in parallel using parallel_tests
or test-queue
, it becomes necessary to determine which test process is the log. At that time, you can include the process ID in the data to be sent and use it to set the key.
I want to put the process ID, time, and request ID in the Rails log file, so I don't think it will cause any harm.
In that case, the data to be sent will be as follows
{ "key" : "sample", "pid" : 123 }
Along with this, change fluent.conf
as follows.
<label @mainstream>
<filter **>
@type record_transformer
<record>
tag ${tag}
+ tag_with_pid '${tag}.${record["pid"]}'
</record>
</filter>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type redis_store
- key_path tag
+ key_path tag_with_pid
<buffer>
flush_mode immediate
</buffer>
</store>
</match>
</label>
Now that the process ID is entered in the key when it is registered in Redis, use that process ID to identify the log of your own process and evaluate it to determine whether data was actually sent. You will be able to do it.
It seems that you can change the setting in the following form as described in the README
<label @mainstream>
<filter **>
@type record_transformer
<record>
tag ${tag}
</record>
</filter>
<match **>
@type copy
<store>
@type stdout
</store>
<store>
@type redis_store
key_path tag
+ host 10.0.0.1
+ db 11
<buffer>
flush_mode immediate
</buffer>
</store>
</match>
</label>
When actually using it in a test, if you make a helper like this, it will be easy to use
# frozen_string_literal: true
require 'redis'
module FluentdLogHelper
def fetch_fluentd_log_by(tag:, pid: nil)
redis_key = pid ? "#{tag}.#{pid}" : tag
redis.zrange redis_key, 0, -1
end
def redis(options = {})
options[:db] ||= 0
@redis ||= Redis.new(options)
end
end
Simple name
fetch_fluentd_log_by tag: 'debug.test'
Calling with process ID
fetch_fluentd_log_by tag: 'debug.test', pid: Process.pid
Recommended Posts