[PYTHON] Créez des règles personnalisées avec ElastAlert

Cet article est l'article du 9ème jour du MicroAd Advent Calendar 2019.

introduction

Tout le monde aime la recherche élastique, Un outil appelé [Elast Alert](https://github.com/ Yelp / elastalert) est utile lorsque vous souhaitez surveiller en permanence les données sur Elasticsearch et ignorer les alertes lorsqu'un modèle spécifique se produit.

ElastAlert est livré avec un certain nombre de modèles de surveillance appelés «Règle» par défaut, mais il y a des moments où cela ne suffit pas pour répondre à vos besoins. Dans un tel cas, résolvons le problème en créant un nouveau modèle de surveillance appelé règle personnalisée.

Comment faire

Ces deux ont été utilisés comme référence lors de la création de règles personnalisées. Surtout ce dernier est à voir car il a un code pratique.

Règle personnalisée à créer cette fois

Dans cet article

Créez une règle qui peut l'être.

Par exemple, s'il y a field_a et field_b dans un index d'Elasticsearch,

-0.001 < sum(field_a) - sum(field_b) < 0.001

Dans cet état, je veux faire quelque chose comme ** alerte lorsque ** disparaît.

À titre d’exemple de l’utilisation spécifique de l’auteur, «Ce devrait être la même chose, mais c’est mauvais si les valeurs sont différentes les unes des autres.» Définissez cette règle pour deux données, et s’il s’agit de valeurs différentes, immédiatement C'est comme sauter les notifications vers.

Parmi les règles par défaut, «MetricAggregationRule» a une fonction similaire, mais comme cette règle ne peut utiliser que le résultat de l'agrégation d'un seul champ, il n'est pas possible de «surveiller le résultat du calcul entre les résultats agrégés».

Créer des règles personnalisées

Tout en se référant à la classe MetricAggregationRule, elle est la base des règles qui gèrent les données agrégées [BaseAggregationRule](https://github.com/ Yelp / elastalert / blob / 325f1dfe7a45f3ca2a2cc00127ab71fcd4f9cead / elastalert / ruletypes.py" ) Est hérité pour créer une classe appelée BinaryOperationOnAggregatedMetricRule.

elastalert/elastalert_modules/custum_rules.py


import operator as op
from elastalert.util import EAException
from elastalert.ruletypes import BaseAggregationRule

class BinaryOperationOnAggregatedMetricRule(BaseAggregationRule):

    required_options = frozenset([
        'metric_agg_key_first', 'metric_agg_key_second', 'metric_agg_type_first', 'metric_agg_type_second',
        'binary_operator'
    ])
    allowed_aggregations = frozenset(['min', 'max', 'avg', 'sum', 'cardinality', 'value_count'])
    allowed_binary_operators = {'add': {'func': op.add, 'sign': '+'},
                                'subtract': {'func': op.sub, 'sign': '-'},
                                'multiply': {'func': op.mul, 'sign': '*'},
                                'divide': {'func': op.truediv, 'sign': '/'}}

    def __init__(self, *args):
        super(BinaryOperationOnAggregatedMetricRule, self).__init__(*args)
        self.ts_field = self.rules.get('timestamp_field', '@timestamp')
        self.metric_key_first = 'metric_' + self.rules['metric_agg_key_first'] + '_' + self.rules['metric_agg_type_first']
        self.metric_key_second = 'metric_' + self.rules['metric_agg_key_second'] + '_' + self.rules['metric_agg_type_second']
        self.binary_operator = self.allowed_binary_operators[self.rules['binary_operator']]
        self.rules['aggregation_query_element'] = self.generate_aggregation_query()

        if not self.rules['metric_agg_type_first'] in self.allowed_aggregations \
            or not self.rules['metric_agg_type_second'] in self.allowed_aggregations:
            raise EAException("metric_agg_type must be one of %s" % (str(self.allowed_aggregations)))

        if not self.rules['binary_operator'] in self.allowed_binary_operators.keys():
            raise EAException("binary_operator must be one of %s" % (str(self.allowed_binary_operators.keys())))

        if 'max_threshold' not in self.rules and 'min_threshold' not in self.rules:
            raise EAException("BinaryOperationOnAggregatedMetricRule must have at least one of either max_threshold or min_threshold")

    def get_match_str(self, match):
        message = 'Threshold violation, %s_%s %s %s_%s = %s (min: %s max : %s)\n\n' % (
            self.rules['metric_agg_key_first'],
            self.rules['metric_agg_type_first'],
            self.binary_operator['sign'],
            self.rules['metric_agg_key_second'],
            self.rules['metric_agg_type_second'],
            str(self.binary_operator['func'](*[match[self.metric_key_first],match[self.metric_key_second]])),
            self.rules.get('min_threshold'),
            self.rules.get('max_threshold')
        )
        if self.rules.get('delete_ruletype_text'):
            message = ''

        top_events = [[key[11:], counts] for key, counts in match.items() if key.startswith('top_events_')]

        def events_to_message(items):
            message = ''
            items = sorted(items, key=lambda x: x[1], reverse=True)
            for term, count in items:
                message += '%s : %s\n' % (term, count)
            return message

        for key, counts in top_events:
            message += '%s:\n' % (key)
            message += '%s\n' % (events_to_message(counts.items()))

        return message

    def generate_aggregation_query(self):
        """
        custom_top_count_keys: A list of fields.
            ElastAlert will perform a terms query for the top X most common values for each of the fields,
            where X is 5 by default, or custom_top_count_number if it exists.
        custom_top_count_number: The number of terms to list if custom_top_count_keys is set. (Optional, integer, default 5)
        """

        query = {
            "all_matching_docs": {
                "filters": {
                    "filters": {
                        "all": {
                            "match_all": {}
                        }
                    }
                },
                'aggs': {
                    'topx_match_aggs': {
                        "filter": {
                            "bool": {
                                "must": []
                            }
                        },
                        'aggregations': {
                        }
                    },
                    self.metric_key_first: {
                        self.rules['metric_agg_type_first']: {
                            'field': self.rules['metric_agg_key_first']
                        }
                    },
                    self.metric_key_second: {
                        self.rules['metric_agg_type_second']: {
                            'field': self.rules['metric_agg_key_second']
                        }
                    },
                    'binary_operation': {
                        'bucket_script': {
                            'buckets_path': {
                                'first': self.metric_key_first,
                                'second': self.metric_key_second
                            },
                            'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
                        }
                    }
                }
            }
        }

        if self.rules.get('custom_top_count_keys'):
            number = self.top_count_number = self.rules.get('custom_top_count_number', 5)
            keys = self.top_count_keys = self.rules.get('custom_top_count_keys')
            for key in keys:
                child_query = {
                    'terms': {
                        'field': key,
                        'order': {'_count': 'desc'},
                        'size': number
                    },
                    'aggs': {
                        'metric_aggregation_first': {
                            self.rules['metric_agg_type_first']: {'field': self.rules['metric_agg_key_first']}
                        },
                        'metric_aggregation_second': {
                            self.rules['metric_agg_type_second']: {'field': self.rules['metric_agg_key_second']}
                        },
                        'metric_aggregation': {
                            'bucket_script': {
                                'buckets_path': {
                                    'first': 'metric_aggregation_first',
                                    'second': 'metric_aggregation_second'
                                },
                                'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
                            }
                        }

                    }
                }
                query['all_matching_docs']['aggs']['topx_match_aggs']['aggregations'][key] = child_query
        return query

    def check_matches(self, timestamp, query_key, aggregation_data):
        if "compound_query_key" in self.rules:
            self.check_matches_recursive(timestamp, query_key, aggregation_data, self.rules['compound_query_key'], dict())
        else:
            metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
            metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
            binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']

            if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
                match = {
                    self.rules['timestamp_field']: timestamp,
                    self.metric_key_first: metric_val_first,
                    self.metric_key_second: metric_val_second,
                    'binary_operation': binary_operation
                }
                if query_key is not None:
                    match[self.rules['query_key']] = query_key

                # Set TopX counts
                if self.rules.get('custom_top_count_keys'):
                    counts = self.get_top_counts(aggregation_data)
                    match.update(counts)

                self.add_match(match)

    def check_matches_recursive(self, timestamp, query_key, aggregation_data, compound_keys, match_data):
        if len(compound_keys) < 1:
            # shouldn't get to this point, but checking for safety
            return

        match_data[compound_keys[0]] = aggregation_data['key']
        if 'bucket_aggs' in aggregation_data:
            for result in aggregation_data['bucket_aggs']['buckets']:
                self.check_matches_recursive(timestamp,
                                             query_key,
                                             result,
                                             compound_keys[1:],
                                             match_data)
        else:
            metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
            metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
            binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']

            if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
                match_data[self.rules['timestamp_field']] = timestamp
                match_data[self.metric_key_first] = metric_val_first
                match_data[self.metric_key_second] = metric_val_second
                match_data['binary_operation'] = binary_operation

                # add compound key to payload to allow alerts to trigger for every unique occurrence
                compound_value = [match_data[key] for key in self.rules['compound_query_key']]
                match_data[self.rules['query_key']] = ",".join([str(value) for value in compound_value])

                # Set TopX counts
                if self.rules.get('custom_top_count_keys'):
                    counts = self.get_top_counts(aggregation_data)
                    match_data.update(counts)

                self.add_match(match_data)

    def get_top_counts(self, aggregation_data):
        """
        Counts the number of events for each unique value for each key field.
        Returns a dictionary with top_events_<key> mapped to the top 5 counts for each key.
        """
        all_counts = {}
        number = self.top_count_number
        keys = self.top_count_keys
        for key in keys:

            hits_terms = aggregation_data['all_matching_docs']['buckets']['all'].get('topx_match_aggs').get(key, None)
            if hits_terms is None:
                top_events_count = {}
            else:
                buckets = hits_terms.get('buckets')

                terms = {}
                for bucket in buckets:
                    terms[bucket['key']] = bucket['metric_aggregation']['value']
                counts = terms.items()
                counts = sorted(counts, key=lambda x: x[1], reverse=True)
                top_events_count = dict(counts[:number])

            # Save a dict with the top 5 events by key
            all_counts['top_events_%s' % (key)] = top_events_count

        return all_counts

    def fulfill_condition(self, metric_val_first, metric_val_second, binary_operator):
        if metric_val_first is None or metric_val_second is None:
            return False
        if metric_val_second == 0 and binary_operator == op.truediv:
            return False
        if 'max_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) > self.rules['max_threshold']:
            return True
        if 'min_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) < self.rules['min_threshold']:
            return True
        return False

Le code est en python, mais l'essentiel est de comprendre la structure des «requêtes Elasticsearch» émises en interne.

Dans cette requête, il était nécessaire de comprendre le contenu de «bucket_script» et de «buckets_path».

Créer un fichier de configuration

Les éléments spécifiés dans le fichier de configuration qui utilise cette règle sont les suivants.

Peut être utilisé respectivement.

rule_setting.yaml



es_host: <host_name>
es_port: <port_number>

name: your rule name
type: "elastalert_modules.custom_rules.BinaryOperationOnAggregatedMetricRule"

index: <index_name>
timestamp_field: <timestamp_field_name>
doc_type: <doc_type_name>

# metric_agg_type must be one of ['min', 'max', 'avg', 'sum', 'cardinality', 'value_count']
# binary_operator must be one of ['add', 'subtract', 'multiply', 'divide']
metric_agg_key_first: fielde_a
metric_agg_type_first: sum
metric_agg_key_second: fielde_b
metric_agg_type_second: sum
binary_operator: subtract

min_threshold: -0.0001
max_threshold: 0.00001

query_key:
  - xxxxxx_id

custom_top_count_keys:
  - zzzzzz_id

Exemple d'alerte

Si les paramètres et les programmes fonctionnent, vous devriez recevoir une alerte comme celle-ci:

your rule name

Threshold violation, fielde_a_sum - fielde_b_sum = 0.25 (min: -0.0001 max : 0.00001)

zzzzzz_id:
19 : 0.25

binary_operation: 0.25
xxxxxx_id: 19
time: 2019-12-08T00:00:00.00000000Z
metric_fielde_a_sum: 1.0
metric_fielde_b_sum: 0.75
num_hits: 5000
num_matches: 1

en conclusion

La plupart des modèles de surveillance sont réalisables tant que vous pouvez émettre des requêtes qui fonctionnent comme prévu.

Cependant, il y a des cas où cela n'est un peu gênant que lors de la personnalisation de la pièce qui spécifie l'heure. En effet, la requête est construite à un emplacement différent de la règle personnalisée pour la partie qui spécifie l'heure, il est donc difficile de personnaliser la spécification d'heure telle quelle.

mais c'est d'accord. Comme c'est joli. Je voudrais vous présenter la situation qui peut être gérée à la prochaine occasion.

Recommended Posts

Créez des règles personnalisées avec ElastAlert
Créez des jeux avec Pygame
Créer un filtre avec scipy
Créer un environnement avec virtualenv
Créez Cloud TPU avec tf-nightly
Créer une API avec Django
Créer / rechercher / créer une table avec PynamoDB
Créer un gif 3D avec python3
tracé Créer un graphique avec un bouton
Créer une page d'accueil avec django
Créer une visionneuse d'images avec Tkinter
Créez une carte des brevets avec Streamlit
Créer un répertoire avec python
Créer un fichier xlsx avec XlsxWriter
Créer une animation de tracé avec Python + Matplotlib
Créer Awaitable avec l'API Python / C
Créer un défilement d'écran avec Pythonista + scene
[AWS] Créer une API avec API Gateway + Lambda
Créez un environnement virtuel avec Python!
Utilisez des scripts personnalisés Python avec StackStorm
Faisons une image LGTM avec GIMP! (Python-fu)
Créez dynamiquement de nouvelles trames de données avec des pandas
Créez des données audio bruyantes avec SoX
Créez des algorithmes de base avec Smart Trade
Créez une API en utilisant hug avec mod_wsgi
Créez une tranche d'âge avec les pandas
Création d'une commande de recherche personnalisée Splunk, partie 2
[GUI avec Python] PyQt5-Widget personnalisé-
Créez un stepper de poisson avec numpy.random
Créer des pages github avec lektor partie 1
Créer un téléchargeur de fichiers avec Django
Utiliser un noyau personnalisé avec WSL2