[GO] I investigated Fluentd's http output

In this post, I will tell you about the results of my research on Fluend's http output plugin.

What is Fluentd's http output plugin?

Fluentd allows you to set an output plugin to send logs to various storages such as files and Elasticsearch. The http output plug-in is one of them, but the middleware to send to is not fixed, and it is a versatile player who can send to any middleware that can receive HTTP requests. With http output, you can do something like a webhook. The receiving HTTP server can be implemented in any language, so you can do whatever you want when you receive the log.

out_http is built-in v1.7.0

Out_http is built-in in Fluentd, which means that it can be used without installing a plugin from v1.7.0.

The fluentd / fluentd: latest image published on Docker Hub is v1.3, so it didn't work with it. You can probably use it by installing the out_http plugin.

This time, I decided to use fluentd / fluentd: v1.11-1.

The settings required to install out_http are very simple

Next, I investigated the settings required to introduce out_http with Fluentd. You can see that there are many setting items in [Official Document] out_http, but the minimum settings were as follows.

my_fluentd.conf


<match nginx.access>
  @type http
  endpoint http://go:8000/
</match>

This example sends the log to http: // go: 8000 /. The various settings work with the default values, so the behavior is as follows:

--Method: POST

It's an unfamiliar data format called ndjson, which is a data format that separates JSON values with newline characters. It seems to be the de facto standard data type in the log area.

-What is ndjson?-Qiita

Isn't it too late to send the log after 60 seconds?

I wanted to process the logs in real time as much as possible, so I thought the default 60 seconds later send was too slow. This can be shortened by changing the flush_interval setting of the<buffer>directive.

my_fluentd.conf


<match nginx.access>
  @type http
  endpoint http://go:8000/
  <buffer>
    flush_interval 3s
  </buffer>
</match>

With this setting, the logs will be sent after 3 seconds.

HTTP request sent by out_http

The HTTP request sent by out_http looks like this: The following example is an nginx access log.

POST / HTTP/1.1
Host: go:8000
Accept: */*
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Content-Length: 648
Content-Type: application/x-ndjson
User-Agent: Ruby

{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}

The content of each line seems to be JSON data processed by Fluentd nginx parser.

Filtering logs to send

out_http is designed to send all logs. So how do you send only specific logs?

To send only specific logs, use Filter. Use the grep plugin (https://docs.fluentd.org/filter/grep) to pattern match and narrow down specific fields. The following configuration example is an example of sending only the POST method log with out_http.

my_fluentd.conf


<filter nginx.access>
 @type grep
  <regexp>
    key method
    pattern /^POST$/
  </regexp>
</filter>

<match nginx.access>
  @type http
  endpoint http://go:8000/
  <buffer>
    flush_interval 3s
  </buffer>
</match>

Issues with transmitted log data

Log identity

The log sent by out_http does not have a UUID. Therefore, the receiver cannot determine whether the log is new or resent.

This problem seems to be solved by using the add-uuid plugin.

Nginx allows you to add $ request_id to the log format, which is a unique ID for each request. It is not an ID for each log.

Date and time

If the log does not contain the date and time, out_http will not convey the date and time information. It is necessary to output the date and time on the side that writes the log.

Implementation example of receiving endpoint

Here, we will consider how to process the logs sent by Fluentd. The HTTP server that handles the logs is implemented in Go.

The request sent by Fluentd is a normal HTTP request, so if you implement an HTTP server in Go's net / http module, it can be processed. The following sample code just dumps the request.

main.go


package main

import (
	"fmt"
	"log"
	"net/http"
	"net/http/httputil"
)

func handleRequest(res http.ResponseWriter, req *http.Request) {
	dump, _ := httputil.DumpRequest(req, true)
	fmt.Printf("%s\n\n", dump)
	fmt.Fprintf(res, "OK")
}

func main() {
	http.HandleFunc("/", handleRequest)
	log.Fatal(http.ListenAndServe(":8000", nil))
}

If you are waiting for a request with this implementation, you can see the following execution result.

$ go run main.go
POST / HTTP/1.1
Host: go:8000
Accept: */*
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Content-Length: 648
Content-Type: application/x-ndjson
User-Agent: Ruby

{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}

First, as a minimum check, check if the method is POST and if the Content-Type is ndjson.

	if req.Method != http.MethodPost {
		res.WriteHeader(http.StatusMethodNotAllowed)
		res.Write([]byte("Method not allowed"))
		return
	}

	if req.Header.Get("Content-type") != "application/x-ndjson" {
		res.WriteHeader(http.StatusBadRequest)
		res.Write([]byte("Only application/x-ndjson content is allowed"))
		return
	}

Next we need to parse the request body's ndjson, which can only be done with the ʻencoding / jsonmodule. You can decode line by line by hitting theMore method of json.Decoder. The following sample code is an example of ndjson parsing using json.Decoder`.

ndjsondemo.go


package main

import (
	"encoding/json"
	"fmt"
	"strings"
)

func main() {
	data := `{"base": "white rice", "proteins": ["tofu"]}
{"base": "salad", "proteins": ["tuna", "salmon"]}
`
	decoder := json.NewDecoder(strings.NewReader(data))
	for decoder.More() {
		var value interface{}
		if err := decoder.Decode(&value); err != nil {
			fmt.Errorf("parse error: %w", err)
			return
		}
		fmt.Printf("value: %#v\n", value)
	}
}

The following sample code is the server implementation with ndjson parsing.

main.go


package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
)

func handleRequest(res http.ResponseWriter, req *http.Request) {
	if req.Method != http.MethodPost {
		res.WriteHeader(http.StatusMethodNotAllowed)
		res.Write([]byte("Method not allowed"))
		return
	}

	if req.Header.Get("Content-type") != "application/x-ndjson" {
		res.WriteHeader(http.StatusBadRequest)
		res.Write([]byte("Only application/x-ndjson content is allowed"))
		return
	}

	decoder := json.NewDecoder(req.Body)
	for decoder.More() {
		var value interface{}
		if err := decoder.Decode(&value); err != nil {
			fmt.Errorf("parse error: %w\n", err)
		} else {
			fmt.Printf("value: %#v\n", value)
		}
	}

	fmt.Fprintf(res, "OK")
}

func main() {
	http.HandleFunc("/", handleRequest)
	log.Fatal(http.ListenAndServe(":8000", nil))
}

If you start this server and Fluentd sends logs to this server, you will see output similar to the following:

value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}

From this output result, you can see that JSON is parsed for each line of ndjson.

What if the log receiving server is down?

If the endpoint that receives logs from Fluentd is down, what happens to the logs sent during that time?

To confirm this, stop the Go server and let Fluentd send the logs.

Then, the following warning was displayed in the Fluentd log. As far as I can read, this seems to be a warning about the go: 8000 TCP connection not being opened. In addition, this log took retry transmission after 1 second, 2 seconds, 4 seconds, 6 seconds and exponential function seconds until retry_time = 7.

2020-11-02 07:19:39 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-11-02 07:19:40 +0000 chunk="5b31a91682a46b9ed00331d272b9caf7" error_class=SocketError error="Failed to open TCP connection to go:8000 (getaddrinfo: Name does not resolve)"

Wait a while after checking the warning and try starting the Go server. What will happen?

A few seconds after the Go server started, Fluentd sent me a log created when it went down. Fluentd retries seem to be repeated as many times as set by retry_limit. This time it seems that the transmission was successful in the 8th retry.

If you set it to about 18, it will retry for more than a day. However, since the retry interval becomes larger and larger, it will continue to be difficult to send even after the destination is restored. Therefore, I think that it is necessary to adjust it so that the retry interval does not become large in combination with other options, or send a USR1 signal to forcibly flush it. About typical options of BufferedOutput plugin --Qiita

What if the log receiver throws a 500 series error?

Above we have verified the case where the log receiver is completely stopped. But what if the log receiver becomes unstable? For example, it is a case where the response code of 500 series is continuously returned.

Try rewriting the Go server implementation to always return a 500 status. Then let Fluentd send the logs.

main.go


func handleRequest(res http.ResponseWriter, req *http.Request) {
	res.WriteHeader(http.StatusInternalServerError)
	return
    // ...
}

The following warning was displayed in the Fluentd log. In this case, unlike the server down, resending after exponential seconds does not seem to occur.

2020-11-02 07:27:25 +0000 [warn]: #0 got unrecoverable error in primary and no secondary error_class=Fluent::UnrecoverableError error="500 Internal Server Error "

Try fixing the Go server code and restarting the Go server. What will happen?

The log was not resent after a while.

As far as I read the docs, it seems that Fluentd's out_http's retryable_response_codes needs to be set. If this is set, it seems that it will try to resend the log when the status code is specified. Set this setting as follows:

my_fluentd.conf


<match nginx.access>
  @type http
  endpoint http://go:8000/
  retryable_response_codes [500, 503]
  <buffer>
    flush_interval 3s
  </buffer>
</match>

After adding this setting, try the same verification again. Then, when the Go server returned 500 response, Fluentd's log contents changed as follows. You can see that retries are now being performed.

2020-11-02 07:33:31 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-11-02 07:33:32 +0000 chunk="5b31ac31236dc81f666960c6649cbfdc" error_class=Fluent::Plugin::HTTPOutput::RetryableResponse error="500 Internal Server Error "

After a while, I fixed the code on the Go server and restarted the Go server. Then, the log was resent and arrived at the Go server.

verification code

The validation code used for this study can be found on GitHub: https://github.com/suin/golang_playground/tree/7623374f54c509e9e02360184f0c196183fefb28/fluentd_http_out

Recommended Posts

I investigated Fluentd's http output
I investigated the mechanism of flask-login!
I investigated how the scope looks
I investigated the device tree Overlay