In this post, I will tell you about the results of my research on Fluend's http output plugin.
Fluentd allows you to set an output plugin to send logs to various storages such as files and Elasticsearch. The http output plug-in is one of them, but the middleware to send to is not fixed, and it is a versatile player who can send to any middleware that can receive HTTP requests. With http output, you can do something like a webhook. The receiving HTTP server can be implemented in any language, so you can do whatever you want when you receive the log.
Out_http is built-in in Fluentd, which means that it can be used without installing a plugin from v1.7.0.
The fluentd / fluentd: latest image published on Docker Hub is v1.3, so it didn't work with it. You can probably use it by installing the out_http plugin.
This time, I decided to use fluentd / fluentd: v1.11-1.
Next, I investigated the settings required to introduce out_http with Fluentd. You can see that there are many setting items in [Official Document] out_http, but the minimum settings were as follows.
my_fluentd.conf
<match nginx.access>
@type http
endpoint http://go:8000/
</match>
This example sends the log to http: // go: 8000 /
. The various settings work with the default values, so the behavior is as follows:
--Method: POST
application/x-ndjson
--Sending timing: 60 seconds laterIt's an unfamiliar data format called ndjson, which is a data format that separates JSON values with newline characters. It seems to be the de facto standard data type in the log area.
I wanted to process the logs in real time as much as possible, so I thought the default 60 seconds later send was too slow. This can be shortened by changing the flush_interval
setting of the<buffer>
directive.
my_fluentd.conf
<match nginx.access>
@type http
endpoint http://go:8000/
<buffer>
flush_interval 3s
</buffer>
</match>
With this setting, the logs will be sent after 3 seconds.
The HTTP request sent by out_http looks like this: The following example is an nginx access log.
POST / HTTP/1.1
Host: go:8000
Accept: */*
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Content-Length: 648
Content-Type: application/x-ndjson
User-Agent: Ruby
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
The content of each line seems to be JSON data processed by Fluentd nginx parser.
out_http is designed to send all logs. So how do you send only specific logs?
To send only specific logs, use Filter. Use the grep plugin (https://docs.fluentd.org/filter/grep) to pattern match and narrow down specific fields. The following configuration example is an example of sending only the POST method log with out_http.
my_fluentd.conf
<filter nginx.access>
@type grep
<regexp>
key method
pattern /^POST$/
</regexp>
</filter>
<match nginx.access>
@type http
endpoint http://go:8000/
<buffer>
flush_interval 3s
</buffer>
</match>
The log sent by out_http does not have a UUID. Therefore, the receiver cannot determine whether the log is new or resent.
This problem seems to be solved by using the add-uuid plugin.
Nginx allows you to add $ request_id
to the log format, which is a unique ID for each request. It is not an ID for each log.
If the log does not contain the date and time, out_http will not convey the date and time information. It is necessary to output the date and time on the side that writes the log.
Here, we will consider how to process the logs sent by Fluentd. The HTTP server that handles the logs is implemented in Go.
The request sent by Fluentd is a normal HTTP request, so if you implement an HTTP server in Go's net / http
module, it can be processed. The following sample code just dumps the request.
main.go
package main
import (
"fmt"
"log"
"net/http"
"net/http/httputil"
)
func handleRequest(res http.ResponseWriter, req *http.Request) {
dump, _ := httputil.DumpRequest(req, true)
fmt.Printf("%s\n\n", dump)
fmt.Fprintf(res, "OK")
}
func main() {
http.HandleFunc("/", handleRequest)
log.Fatal(http.ListenAndServe(":8000", nil))
}
If you are waiting for a request with this implementation, you can see the following execution result.
$ go run main.go
POST / HTTP/1.1
Host: go:8000
Accept: */*
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Content-Length: 648
Content-Type: application/x-ndjson
User-Agent: Ruby
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
First, as a minimum check, check if the method is POST
and if the Content-Type is ndjson.
if req.Method != http.MethodPost {
res.WriteHeader(http.StatusMethodNotAllowed)
res.Write([]byte("Method not allowed"))
return
}
if req.Header.Get("Content-type") != "application/x-ndjson" {
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte("Only application/x-ndjson content is allowed"))
return
}
Next we need to parse the request body's ndjson, which can only be done with the ʻencoding / jsonmodule. You can decode line by line by hitting the
More method of
json.Decoder. The following sample code is an example of ndjson parsing using
json.Decoder`.
ndjsondemo.go
package main
import (
"encoding/json"
"fmt"
"strings"
)
func main() {
data := `{"base": "white rice", "proteins": ["tofu"]}
{"base": "salad", "proteins": ["tuna", "salmon"]}
`
decoder := json.NewDecoder(strings.NewReader(data))
for decoder.More() {
var value interface{}
if err := decoder.Decode(&value); err != nil {
fmt.Errorf("parse error: %w", err)
return
}
fmt.Printf("value: %#v\n", value)
}
}
The following sample code is the server implementation with ndjson parsing.
main.go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
)
func handleRequest(res http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
res.WriteHeader(http.StatusMethodNotAllowed)
res.Write([]byte("Method not allowed"))
return
}
if req.Header.Get("Content-type") != "application/x-ndjson" {
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte("Only application/x-ndjson content is allowed"))
return
}
decoder := json.NewDecoder(req.Body)
for decoder.More() {
var value interface{}
if err := decoder.Decode(&value); err != nil {
fmt.Errorf("parse error: %w\n", err)
} else {
fmt.Printf("value: %#v\n", value)
}
}
fmt.Fprintf(res, "OK")
}
func main() {
http.HandleFunc("/", handleRequest)
log.Fatal(http.ListenAndServe(":8000", nil))
}
If you start this server and Fluentd sends logs to this server, you will see output similar to the following:
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
From this output result, you can see that JSON is parsed for each line of ndjson.
If the endpoint that receives logs from Fluentd is down, what happens to the logs sent during that time?
To confirm this, stop the Go server and let Fluentd send the logs.
Then, the following warning was displayed in the Fluentd log. As far as I can read, this seems to be a warning about the go: 8000 TCP connection not being opened. In addition, this log took retry transmission after 1 second, 2 seconds, 4 seconds, 6 seconds and exponential function seconds until retry_time = 7.
2020-11-02 07:19:39 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-11-02 07:19:40 +0000 chunk="5b31a91682a46b9ed00331d272b9caf7" error_class=SocketError error="Failed to open TCP connection to go:8000 (getaddrinfo: Name does not resolve)"
Wait a while after checking the warning and try starting the Go server. What will happen?
A few seconds after the Go server started, Fluentd sent me a log created when it went down. Fluentd retries seem to be repeated as many times as set by retry_limit. This time it seems that the transmission was successful in the 8th retry.
If you set it to about 18, it will retry for more than a day. However, since the retry interval becomes larger and larger, it will continue to be difficult to send even after the destination is restored. Therefore, I think that it is necessary to adjust it so that the retry interval does not become large in combination with other options, or send a USR1 signal to forcibly flush it. About typical options of BufferedOutput plugin --Qiita
Above we have verified the case where the log receiver is completely stopped. But what if the log receiver becomes unstable? For example, it is a case where the response code of 500 series is continuously returned.
Try rewriting the Go server implementation to always return a 500 status. Then let Fluentd send the logs.
main.go
func handleRequest(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(http.StatusInternalServerError)
return
// ...
}
The following warning was displayed in the Fluentd log. In this case, unlike the server down, resending after exponential seconds does not seem to occur.
2020-11-02 07:27:25 +0000 [warn]: #0 got unrecoverable error in primary and no secondary error_class=Fluent::UnrecoverableError error="500 Internal Server Error "
Try fixing the Go server code and restarting the Go server. What will happen?
The log was not resent after a while.
As far as I read the docs, it seems that Fluentd's out_http's retryable_response_codes
needs to be set. If this is set, it seems that it will try to resend the log when the status code is specified. Set this setting as follows:
my_fluentd.conf
<match nginx.access>
@type http
endpoint http://go:8000/
retryable_response_codes [500, 503]
<buffer>
flush_interval 3s
</buffer>
</match>
After adding this setting, try the same verification again. Then, when the Go server returned 500 response, Fluentd's log contents changed as follows. You can see that retries are now being performed.
2020-11-02 07:33:31 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-11-02 07:33:32 +0000 chunk="5b31ac31236dc81f666960c6649cbfdc" error_class=Fluent::Plugin::HTTPOutput::RetryableResponse error="500 Internal Server Error "
After a while, I fixed the code on the Go server and restarted the Go server. Then, the log was resent and arrived at the Go server.
The validation code used for this study can be found on GitHub: https://github.com/suin/golang_playground/tree/7623374f54c509e9e02360184f0c196183fefb28/fluentd_http_out