[GO] J'ai étudié la sortie http de Fluentd

Dans cet article, je vais vous parler des résultats de mes recherches sur le plug-in de sortie http de Fluend.

Qu'est-ce que le plug-in de sortie http de Fluentd?

Fluentd vous permet de définir un plug-in de sortie pour envoyer des journaux à divers stockages tels que des fichiers et Elasticsearch. Le plug-in de sortie http en fait partie, mais le middleware vers lequel envoyer n'est pas fixe, et c'est un lecteur polyvalent qui peut envoyer à n'importe quel middleware pouvant recevoir des requêtes HTTP. Vous pouvez effectuer des opérations de type webhook avec la sortie http. Le serveur HTTP de réception peut être implémenté dans n'importe quelle langue, vous pouvez donc faire ce que vous voulez lorsque vous recevez le journal.

out_http est intégré à la v1.7.0

Out_http est intégré à Fluentd, c'est-à-dire qu'il peut être utilisé sans installer le plug-in de la v1.7.0.

Le fluentd / fluentd: la dernière image publiée sur Docker Hub est v1.3, donc cela ne fonctionnait pas avec. Vous pouvez probablement l'utiliser en installant le plug-in out_http.

Cette fois, j'ai décidé d'utiliser fluentd / fluentd: v1.11-1.

Les paramètres requis pour installer out_http sont très simples

Ensuite, j'ai étudié les paramètres requis pour introduire out_http avec Fluentd. Vous pouvez voir qu'il existe de nombreux éléments de paramétrage dans [Official Document] out_http, mais les paramètres minimum étaient les suivants.

my_fluentd.conf


<match nginx.access>
  @type http
  endpoint http://go:8000/
</match>

Cet exemple envoie le journal à http: // go: 8000 /. Les différents paramètres fonctionnent avec les valeurs par défaut, le comportement est donc le suivant:

--Méthode: POST

Il s'agit d'un format de données inconnu appelé ndjson, qui est un format de données dans lequel les valeurs JSON sont séparées par des caractères de saut de ligne. Cela semble être le type de données standard de facto dans la zone de journalisation.

N'est-il pas trop tard pour envoyer le journal après 60 secondes?

Je voulais traiter les journaux autant que possible en temps réel, j'ai donc pensé que la valeur par défaut 60 secondes plus tard serait trop lente. Ceci peut être raccourci en changeant le paramètre flush_interval du directeur <buffer>.

my_fluentd.conf


<match nginx.access>
  @type http
  endpoint http://go:8000/
  <buffer>
    flush_interval 3s
  </buffer>
</match>

Avec ce paramètre, le journal sera envoyé après 3 secondes.

Requête HTTP envoyée par out_http

La requête HTTP envoyée par out_http ressemble à ceci: L'exemple suivant est un journal d'accès nginx.

POST / HTTP/1.1
Host: go:8000
Accept: */*
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Content-Length: 648
Content-Type: application/x-ndjson
User-Agent: Ruby

{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}

Le contenu de chaque ligne semble être les données JSON traitées par l'analyseur nginx de Fluentd.

Filtrage des journaux à envoyer

out_http est conçu pour envoyer tous les journaux. Alors, comment envoyer uniquement un journal spécifique?

Pour envoyer uniquement des journaux spécifiques, utilisez Filter. Utilisez le plug-in grep (https://docs.fluentd.org/filter/grep) pour mettre en correspondance des modèles et affiner des champs spécifiques. L'exemple de configuration suivant est un exemple d'envoi uniquement du journal de la méthode POST avec out_http.

my_fluentd.conf


<filter nginx.access>
 @type grep
  <regexp>
    key method
    pattern /^POST$/
  </regexp>
</filter>

<match nginx.access>
  @type http
  endpoint http://go:8000/
  <buffer>
    flush_interval 3s
  </buffer>
</match>

Problèmes de données de journal à envoyer

Identité du journal

L'UUID n'est pas affecté au journal envoyé par out_http. Par conséquent, le récepteur ne peut pas déterminer si le journal est nouveau ou renvoyé.

Ce problème semble être résolu en utilisant le plug-in add-uuid.

Nginx vous permet d'ajouter $ request_id au format du journal, qui est un identifiant unique pour chaque demande. Ce n'est pas un ID pour chaque journal.

Date et l'heure

Si le journal ne contient pas la date et l'heure, out_http ne transmettra pas les informations de date et d'heure. Il est nécessaire de sortir la date et l'heure du côté qui écrit le journal.

Exemple d'implémentation du point final de réception

Ici, nous examinerons comment traiter le journal envoyé par Fluentd. Le serveur HTTP qui gère les journaux est implémenté dans Go.

La requête envoyée par Fluentd est une requête HTTP normale, donc si vous implémentez un serveur HTTP dans le module net / http de Go, il peut être traité. L'exemple de code suivant vide simplement la demande.

main.go


package main

import (
	"fmt"
	"log"
	"net/http"
	"net/http/httputil"
)

func handleRequest(res http.ResponseWriter, req *http.Request) {
	dump, _ := httputil.DumpRequest(req, true)
	fmt.Printf("%s\n\n", dump)
	fmt.Fprintf(res, "OK")
}

func main() {
	http.HandleFunc("/", handleRequest)
	log.Fatal(http.ListenAndServe(":8000", nil))
}

Si vous attendez une demande avec cette implémentation, vous verrez le résultat d'exécution suivant.

$ go run main.go
POST / HTTP/1.1
Host: go:8000
Accept: */*
Accept-Encoding: gzip;q=1.0,deflate;q=0.6,identity;q=0.3
Content-Length: 648
Content-Type: application/x-ndjson
User-Agent: Ruby

{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}
{"remote":"172.21.0.1","host":"-","user":"-","method":"GET","path":"/","code":"200","size":"612","referer":"-","agent":"HTTPie/2.2.0","http_x_forwarded_for":"-"}

Tout d'abord, vérifiez au minimum si la méthode est POST et si le Content-Type est ndjson.

	if req.Method != http.MethodPost {
		res.WriteHeader(http.StatusMethodNotAllowed)
		res.Write([]byte("Method not allowed"))
		return
	}

	if req.Header.Get("Content-type") != "application/x-ndjson" {
		res.WriteHeader(http.StatusBadRequest)
		res.Write([]byte("Only application/x-ndjson content is allowed"))
		return
	}

Ensuite, nous devons analyser le corps de la requête ndjson, ce qui ne peut être fait qu'avec le module ʻencoding / json. Vous pouvez décoder ligne par ligne en appuyant sur la méthode More de json.Decoder. L'exemple de code suivant est un exemple d'analyse ndjson à l'aide de json.Decoder`.

ndjsondemo.go


package main

import (
	"encoding/json"
	"fmt"
	"strings"
)

func main() {
	data := `{"base": "white rice", "proteins": ["tofu"]}
{"base": "salad", "proteins": ["tuna", "salmon"]}
`
	decoder := json.NewDecoder(strings.NewReader(data))
	for decoder.More() {
		var value interface{}
		if err := decoder.Decode(&value); err != nil {
			fmt.Errorf("parse error: %w", err)
			return
		}
		fmt.Printf("value: %#v\n", value)
	}
}

L'exemple de code suivant ajoute l'analyse ndjson à l'implémentation du serveur.

main.go


package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
)

func handleRequest(res http.ResponseWriter, req *http.Request) {
	if req.Method != http.MethodPost {
		res.WriteHeader(http.StatusMethodNotAllowed)
		res.Write([]byte("Method not allowed"))
		return
	}

	if req.Header.Get("Content-type") != "application/x-ndjson" {
		res.WriteHeader(http.StatusBadRequest)
		res.Write([]byte("Only application/x-ndjson content is allowed"))
		return
	}

	decoder := json.NewDecoder(req.Body)
	for decoder.More() {
		var value interface{}
		if err := decoder.Decode(&value); err != nil {
			fmt.Errorf("parse error: %w\n", err)
		} else {
			fmt.Printf("value: %#v\n", value)
		}
	}

	fmt.Fprintf(res, "OK")
}

func main() {
	http.HandleFunc("/", handleRequest)
	log.Fatal(http.ListenAndServe(":8000", nil))
}

Si vous démarrez ce serveur et que Fluentd envoie des journaux à ce serveur, vous verrez une sortie similaire à ce qui suit:

value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}
value: map[string]interface {}{"agent":"HTTPie/2.2.0", "code":"200", "host":"-", "http_x_forwarded_for":"-", "method":"GET", "path":"/", "referer":"-", "remote":"172.21.0.1", "size":"612", "user":"-"}

À partir de ce résultat de sortie, vous pouvez voir que JSON est analysé pour chaque ligne de ndjson.

Que faire si le serveur de réception de journaux est arrêté?

Qu'arrive-t-il aux journaux envoyés pendant cette période si le point de terminaison qui reçoit les journaux de Fluentd est en panne?

Pour confirmer cela, arrêtez le serveur Go et laissez Fluentd envoyer le journal.

Ensuite, l'avertissement suivant a été affiché dans le journal Fluentd. Pour autant que je sache, cela semble être un avertissement sur le départ: la connexion 8000 TCP n'est pas ouverte. De plus, ce journal a nécessité une nouvelle transmission après 1 seconde, 2 secondes, 4 secondes, 6 secondes et des secondes de fonction exponentielle jusqu'à ce que retry_time = 7.

2020-11-02 07:19:39 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-11-02 07:19:40 +0000 chunk="5b31a91682a46b9ed00331d272b9caf7" error_class=SocketError error="Failed to open TCP connection to go:8000 (getaddrinfo: Name does not resolve)"

Après avoir vérifié l'avertissement, essayez de démarrer le serveur Go après un certain temps. Que va-t-il se passer?

Quelques secondes après le démarrage du serveur Go, Fluentd m'a envoyé un journal créé lors de sa panne. Les tentatives Fluentd semblent être répétées autant de fois que défini par retry_limit. Cette fois, il semble que la transmission ait réussi lors de la 8e tentative.

Si vous le définissez sur environ 18, il réessaiera pendant plus d'un jour. Cependant, étant donné que l'intervalle de relance devient de plus en plus grand, il restera difficile à envoyer même après la restauration de la destination. Par conséquent, je pense qu'il est nécessaire de l'ajuster pour que l'intervalle de relance ne devienne pas grand en combinaison avec d'autres options, ou d'envoyer un signal USR1 pour le purger de force. À propos des options typiques du plugin BufferedOutput --Qiita

Que faire si le récepteur de journal génère une erreur de la série 500?

Ci-dessus, nous avons vérifié le cas où le récepteur de journal est complètement arrêté. Mais que faire si le récepteur de journal devient instable? Par exemple, il s'agit d'un cas où le code de réponse de la série 500 est renvoyé en continu.

À titre de test, réécrivez l'implémentation du serveur Go afin qu'elle renvoie toujours un état 500. Ensuite, laissez Fluentd envoyer le journal.

main.go


func handleRequest(res http.ResponseWriter, req *http.Request) {
	res.WriteHeader(http.StatusInternalServerError)
	return
    // ...
}

Le journal Fluentd affichait un avertissement similaire au suivant: Dans ce cas, contrairement au serveur arrêté, le renvoi après des secondes de fonction exponentielle ne semble pas se produire.

2020-11-02 07:27:25 +0000 [warn]: #0 got unrecoverable error in primary and no secondary error_class=Fluent::UnrecoverableError error="500 Internal Server Error "

Essayez de corriger le code du serveur Go et de redémarrer le serveur Go. Que va-t-il se passer?

Le journal n'a pas été renvoyé après un certain temps.

Pour autant que j'ai lu la documentation, il semble que les retryable_response_codes de Fluentd's out_http doivent être définis. Si cela est défini, il semble qu'il essaiera de renvoyer le journal lorsque le code d'état est spécifié. Définissez ce paramètre comme suit:

my_fluentd.conf


<match nginx.access>
  @type http
  endpoint http://go:8000/
  retryable_response_codes [500, 503]
  <buffer>
    flush_interval 3s
  </buffer>
</match>

Après avoir ajouté ce paramètre, réessayez la même vérification. Ensuite, lorsque le serveur Go a renvoyé une réponse 500, le contenu du journal de Fluentd a changé comme suit. Vous pouvez voir que des tentatives sont en cours.

2020-11-02 07:33:31 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-11-02 07:33:32 +0000 chunk="5b31ac31236dc81f666960c6649cbfdc" error_class=Fluent::Plugin::HTTPOutput::RetryableResponse error="500 Internal Server Error "

Après un certain temps, j'ai corrigé le code sur le serveur Go et redémarré le serveur Go. Ensuite, le journal a été renvoyé et est arrivé au serveur Go.

code de vérification

Le code de vérification utilisé pour cette étude est disponible sur GitHub: https://github.com/suin/golang_playground/tree/7623374f54c509e9e02360184f0c196183fefb28/fluentd_http_out

Recommended Posts

J'ai étudié la sortie http de Fluentd
J'ai étudié le mécanisme de connexion flask!
J'ai étudié à quoi ressemble la lunette
J'ai étudié la superposition de l'arborescence des appareils