[GO] Ich habe die http-Ausgabe von Fluentd untersucht

In diesem Beitrag werde ich Ihnen die Ergebnisse meiner Forschung zum http-Ausgabe-Plug-In von Fluend erläutern.

Was ist das http-Ausgabe-Plug-In von Fluentd?

Mit Fluentd können Sie ein Ausgabe-Plug-In festlegen, um Protokolle an verschiedene Speicher wie Dateien und Elasticsearch zu senden. Das http-Ausgabe-Plug-In ist eines davon, aber die Middleware, an die gesendet werden soll, ist nicht festgelegt. Es ist ein vielseitiger Player, der an jede Middleware senden kann, die HTTP-Anforderungen empfangen kann. Sie können Webhook-ähnliche Dinge mit http-Ausgabe tun. Der empfangende HTTP-Server kann in jeder Sprache implementiert werden, sodass Sie beim Empfang des Protokolls alles tun können, was Sie möchten.

out_http ist in Version 1.7.0 integriert

Out_http ist in Fluentd integriert, dh es kann verwendet werden, ohne das Plug-In ab Version 1.7.0 zu installieren.

Das aktuell auf Docker Hub veröffentlichte Bild fluentd / fluentd: ist v1.3, daher hat es nicht funktioniert. Sie können es wahrscheinlich verwenden, indem Sie das Plug-In out_http installieren.

Dieses Mal habe ich mich für fluentd / fluentd: v1.11-1 entschieden.

Die für die Installation von out_http erforderlichen Einstellungen sind sehr einfach

Als nächstes untersuchte ich die Einstellungen, die erforderlich sind, um out_http mit Fluentd einzuführen. Sie können sehen, dass es in [Offizielles Dokument] out_http viele Einstellungselemente gibt, aber die Mindesteinstellungen waren wie folgt.

my_fluentd.conf


<match nginx.access>
  @type http
  endpoint http://go:8000/
</match>

In diesem Beispiel wird das Protokoll an "http: // go: 8000 /" gesendet. Die verschiedenen Einstellungen funktionieren mit den Standardwerten, daher ist das Verhalten wie folgt:

--Methode: POST

Dies ist ein unbekanntes Datenformat namens ndjson, ein Datenformat, bei dem JSON-Werte durch Zeilenvorschubzeichen getrennt sind. Es scheint der De-facto-Standarddatentyp im Protokollbereich zu sein.

Ist es nicht zu spät, das Protokoll nach 60 Sekunden zu senden?

Ich wollte die Protokolle so weit wie möglich in Echtzeit verarbeiten, daher dachte ich, dass die Standardeinstellung 60 Sekunden später zu langsam wäre. Dies kann verkürzt werden, indem die Einstellung flush_interval des Direktors <buffer> geändert wird.

my_fluentd.conf


<match nginx.access>
  @type http
  endpoint http://go:8000/
  <buffer>
    flush_interval 3s
  </buffer>
</match>

Mit dieser Einstellung wird das Protokoll nach 3 Sekunden gesendet.

HTTP-Anfrage von out_http gesendet

Die von out_http gesendete HTTP-Anfrage sieht folgendermaßen aus: Das folgende Beispiel ist ein Nginx-Zugriffsprotokoll.

POST / HTTP/1.1
Host: go:8000
Accept: */*
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Content-Length: 648
Content-Type: application/x-ndjson
User-Agent: Ruby

{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}

Der Inhalt jeder Zeile scheint die JSON-Daten zu sein, die von Fluentds Nginx-Parser verarbeitet werden.

Zu sendende Protokolle filtern

out_http dient zum Senden aller Protokolle. Wie senden Sie nur ein bestimmtes Protokoll?

Verwenden Sie Filter, um nur bestimmte Protokolle zu senden. Verwenden Sie das grep-Plug-In (https://docs.fluentd.org/filter/grep), um Musterübereinstimmungen vorzunehmen und bestimmte Felder einzugrenzen. Das folgende Konfigurationsbeispiel ist ein Beispiel für das Senden nur des POST-Methodenprotokolls mit out_http.

my_fluentd.conf


<filter nginx.access>
 @type grep
  <regexp>
    key method
    pattern /^POST$/
  </regexp>
</filter>

<match nginx.access>
  @type http
  endpoint http://go:8000/
  <buffer>
    flush_interval 3s
  </buffer>
</match>

Probleme mit zu sendenden Protokolldaten

Identität protokollieren

Die UUID wird dem von out_http gesendeten Protokoll nicht zugewiesen. Daher kann der Empfänger nicht feststellen, ob das Protokoll neu ist oder erneut gesendet wird.

Dieses Problem scheint mit dem Add-UUID-Plug-In gelöst zu werden.

Mit Nginx können Sie dem Protokollformat "$ request_id" hinzufügen. Dies ist eine eindeutige ID für jede Anforderung. Es ist nicht eine ID für jedes Protokoll.

Datum (und Uhrzeit

Wenn das Protokoll Datum und Uhrzeit nicht enthält, übermittelt out_http keine Datums- und Uhrzeitinformationen. Es ist erforderlich, Datum und Uhrzeit auf der Seite auszugeben, auf der das Protokoll geschrieben wird.

Implementierungsbeispiel für den Empfang von Endpunkten

Hier werden wir uns überlegen, wie das von Fluentd gesendete Protokoll verarbeitet wird. Der HTTP-Server, der Protokolle verarbeitet, ist in Go implementiert.

Die von Fluentd gesendete Anforderung ist eine normale HTTP-Anforderung. Wenn Sie also einen HTTP-Server im Go-Modul "net / http" implementieren, kann er verarbeitet werden. Der folgende Beispielcode gibt die Anforderung nur aus.

main.go


package main

import (
	"fmt"
	"log"
	"net/http"
	"net/http/httputil"
)

func handleRequest(res http.ResponseWriter, req *http.Request) {
	dump, _ := httputil.DumpRequest(req, true)
	fmt.Printf("%s\n\n", dump)
	fmt.Fprintf(res, "OK")
}

func main() {
	http.HandleFunc("/", handleRequest)
	log.Fatal(http.ListenAndServe(":8000", nil))
}

Wenn Sie mit dieser Implementierung auf eine Anforderung warten, wird das folgende Ausführungsergebnis angezeigt.

$ go run main.go
POST / HTTP/1.1
Host: go:8000
Accept: */*
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Content-Length: 648
Content-Type: application/x-ndjson
User-Agent: Ruby

{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}

Überprüfen Sie zunächst mindestens, ob die Methode "POST" ist und ob der Inhaltstyp "ndjson" ist.

	if req.Method != http.MethodPost {
		res.WriteHeader(http.StatusMethodNotAllowed)
		res.Write([]byte("Method not allowed"))
		return
	}

	if req.Header.Get("Content-type") != "application/x-ndjson" {
		res.WriteHeader(http.StatusBadRequest)
		res.Write([]byte("Only application/x-ndjson content is allowed"))
		return
	}

Als nächstes müssen wir den ndjson des Anforderungskörpers analysieren, was nur mit dem Modul encoding / json möglich ist. Sie können Zeile für Zeile dekodieren, indem Sie die "More" -Methode von "json.Decoder" drücken. Der folgende Beispielcode ist ein Beispiel für das Parsen von ndjson mit json.Decoder.

ndjsondemo.go


package main

import (
	"encoding/json"
	"fmt"
	"strings"
)

func main() {
	data := `{"base": "white rice", "proteins": ["tofu"]}
{"base": "salad", "proteins": ["tuna", "salmon"]}
`
	decoder := json.NewDecoder(strings.NewReader(data))
	for decoder.More() {
		var value interface{}
		if err := decoder.Decode(&value); err != nil {
			fmt.Errorf("parse error: %w", err)
			return
		}
		fmt.Printf("value: %#v\n", value)
	}
}

Der folgende Beispielcode fügt der Serverimplementierung ndjson-Parsing hinzu.

main.go


package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
)

func handleRequest(res http.ResponseWriter, req *http.Request) {
	if req.Method != http.MethodPost {
		res.WriteHeader(http.StatusMethodNotAllowed)
		res.Write([]byte("Method not allowed"))
		return
	}

	if req.Header.Get("Content-type") != "application/x-ndjson" {
		res.WriteHeader(http.StatusBadRequest)
		res.Write([]byte("Only application/x-ndjson content is allowed"))
		return
	}

	decoder := json.NewDecoder(req.Body)
	for decoder.More() {
		var value interface{}
		if err := decoder.Decode(&value); err != nil {
			fmt.Errorf("parse error: %w\n", err)
		} else {
			fmt.Printf("value: %#v\n", value)
		}
	}

	fmt.Fprintf(res, "OK")
}

func main() {
	http.HandleFunc("/", handleRequest)
	log.Fatal(http.ListenAndServe(":8000", nil))
}

Wenn Sie diesen Server starten und Fluentd Protokolle an diesen Server sendet, wird eine Ausgabe ähnlich der folgenden angezeigt:

value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}

Anhand dieses Ausgabeergebnisses können Sie erkennen, dass JSON für jede Zeile von ndjson analysiert wird.

Was ist, wenn der Protokollempfängerserver nicht verfügbar ist?

Was passiert mit den während dieser Zeit gesendeten Protokollen, wenn der Endpunkt, der die Protokolle von Fluentd empfängt, nicht verfügbar ist?

Um dies zu bestätigen, stoppen Sie den Go-Server und lassen Sie Fluentd das Protokoll senden.

Anschließend wurde die folgende Warnung im Fluentd-Protokoll angezeigt. Soweit ich lesen kann, scheint dies eine Warnung für unterwegs zu sein: 8000 TCP-Verbindung wird nicht geöffnet. Zusätzlich wurde in diesem Protokoll nach 1 Sekunde, 2 Sekunden, 4 Sekunden, 6 Sekunden und Sekunden der Exponentialfunktion eine Wiederholungsübertragung durchgeführt, bis retry_time = 7.

2020-11-02 07:19:39 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-11-02 07:19:40 +0000 chunk="5b31a91682a46b9ed00331d272b9caf7" error_class=SocketError error="Failed to open TCP connection to go:8000 (getaddrinfo: Name does not resolve)"

Versuchen Sie nach Überprüfung der Warnung, den Go-Server nach einer Weile zu starten. Was wird passieren?

Einige Sekunden nach dem Start des Go-Servers schickte mir Fluentd ein Protokoll, das beim Ausfall erstellt wurde. Fließende Wiederholungsversuche scheinen so oft wiederholt zu werden, wie von retry_limit festgelegt. Diesmal scheint die Übertragung bei der 8. Wiederholung erfolgreich gewesen zu sein.

Wenn Sie ungefähr 18 einstellen, wird es länger als einen Tag wiederholt. Da das Wiederholungsintervall jedoch immer größer wird, ist das Senden auch nach Wiederherstellung des Ziels weiterhin schwierig. Daher denke ich, dass es notwendig ist, es so anzupassen, dass das Wiederholungsintervall in Kombination mit anderen Optionen nicht groß wird, oder ein USR1-Signal zu senden, um es gewaltsam zu spülen. Über typische Optionen des BufferedOutput-Plugins --Qiita

Was ist, wenn der Protokollempfänger einen Fehler der Serie 500 auslöst?

Oben haben wir den Fall überprüft, in dem der Protokollempfänger vollständig gestoppt ist. Was aber, wenn der Protokollempfänger instabil wird? In diesem Fall wird beispielsweise der Antwortcode der 500er-Serie kontinuierlich zurückgegeben.

Schreiben Sie als Test die Go-Server-Implementierung so um, dass sie immer den Status 500 zurückgibt. Lassen Sie dann Fluentd das Protokoll senden.

main.go


func handleRequest(res http.ResponseWriter, req *http.Request) {
	res.WriteHeader(http.StatusInternalServerError)
	return
    // ...
}

Das Fluentd-Protokoll zeigte eine Warnung ähnlich der folgenden an: In diesem Fall scheint es im Gegensatz zum Serverausfall nicht zu einem erneuten Senden nach Sekunden der Exponentialfunktion zu kommen.

2020-11-02 07:27:25 +0000 [warn]: #0 got unrecoverable error in primary and no secondary error_class=Fluent::UnrecoverableError error="500 Internal Server Error "

Versuchen Sie, den Go-Servercode zu korrigieren und den Go-Server neu zu starten. Was wird passieren?

Das Protokoll wurde nach einer Weile nicht erneut gesendet.

Soweit ich die Dokumentation gelesen habe, scheint es, dass Fluentds "retryable_response_codes" von out_http gesetzt werden muss. Wenn dies festgelegt ist, scheint es zu versuchen, das Protokoll erneut zu senden, wenn der Statuscode angegeben wird. Stellen Sie diese Einstellung wie folgt ein:

my_fluentd.conf


<match nginx.access>
  @type http
  endpoint http://go:8000/
  retryable_response_codes [500, 503]
  <buffer>
    flush_interval 3s
  </buffer>
</match>

Versuchen Sie nach dem Hinzufügen dieser Einstellung dieselbe Überprüfung erneut. Dann änderte sich der Protokollinhalt von Fluentd, als der Go-Server 500 Antworten zurückgab, wie folgt. Sie können sehen, dass jetzt Wiederholungsversuche durchgeführt werden.

2020-11-02 07:33:31 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-11-02 07:33:32 +0000 chunk="5b31ac31236dc81f666960c6649cbfdc" error_class=Fluent::Plugin::HTTPOutput::RetryableResponse error="500 Internal Server Error "

Nach einer Weile habe ich den Code auf dem Go-Server repariert und den Go-Server neu gestartet. Dann wurde das Protokoll erneut gesendet und erreichte den Go-Server.

Verifizierungs-Schlüssel

Der für diese Studie verwendete Bestätigungscode befindet sich auf GitHub: https://github.com/suin/golang_playground/tree/7623374f54c509e9e02360184f0c196183fefb28/fluentd_http_out

Recommended Posts

Ich habe die http-Ausgabe von Fluentd untersucht
Ich habe den Mechanismus der Flaschenanmeldung untersucht!
Ich habe untersucht, wie das Zielfernrohr aussieht
Ich habe die Gerätebaumüberlagerung untersucht