[PYTHON] Erstellen Sie benutzerdefinierte Regeln mit ElastAlert

Dieser Artikel ist der 9. Tagesartikel von MicroAd Adventskalender 2019.

Einführung

Jeder liebt elastische Suche, Ein Tool namens Elast Alert ist nützlich, wenn Sie Daten in Elasticsearch ständig überwachen und Warnungen überspringen möchten, wenn ein bestimmtes Muster auftritt.

ElastAlert wird standardmäßig mit einer Reihe von Überwachungsmustern geliefert, die als "Regel" bezeichnet werden. Manchmal reicht es jedoch nicht aus, Ihre Anforderungen zu erfüllen. In einem solchen Fall lösen wir das Problem, indem wir ein neues Überwachungsmuster erstellen, das als benutzerdefinierte Regel bezeichnet wird.

Wie man ... macht

Diese beiden wurden als Referenz beim Erstellen benutzerdefinierter Regeln verwendet. Insbesondere letzteres ist ein Muss, da es praktischen Code enthält.

Benutzerdefinierte Regel zum Erstellen dieser Zeit

In diesem Artikel

Erstellen Sie eine Regel, die sein kann.

Wenn beispielsweise in einem Index von Elasticsearch field_a und field_b vorhanden sind,

-0.001 < sum(field_a) - sum(field_b) < 0.001

In diesem Zustand möchte ich so etwas wie ** Alarm ausführen, wenn ** verschwindet.

Als Beispiel für die spezifische Verwendung des Autors: "Es sollte gleich sein, aber es ist schlecht, wenn sich die Werte voneinander unterscheiden." Legen Sie diese Regel für zwei Daten fest und wenn es sich um unterschiedliche Werte handelt, sofort Es ist wie das Überspringen von Benachrichtigungen.

Unter den Standardregeln hat "MetricAggregationRule" eine ähnliche Funktion. Da diese Regel jedoch nur das Ergebnis der Aggregation eines einzelnen Felds verwenden kann, ist es nicht möglich, "das Berechnungsergebnis zwischen aggregierten Ergebnissen zu überwachen".

Erstellen Sie benutzerdefinierte Regeln

Unter Bezugnahme auf die MetricAggregationRule-Klasse ist dies die Basis der Regeln, die aggregierte Daten verarbeiten [BaseAggregationRule](https://github.com/ Yelp / elastalert / blob / 325f1dfe7a45f3ca2a2cc00127ab71fcd4f9cead / elastalert. ) Wird geerbt, um eine Klasse namens "BinaryOperationOnAggregatedMetricRule" zu erstellen.

elastalert/elastalert_modules/custum_rules.py


import operator as op
from elastalert.util import EAException
from elastalert.ruletypes import BaseAggregationRule

class BinaryOperationOnAggregatedMetricRule(BaseAggregationRule):

    required_options = frozenset([
        'metric_agg_key_first', 'metric_agg_key_second', 'metric_agg_type_first', 'metric_agg_type_second',
        'binary_operator'
    ])
    allowed_aggregations = frozenset(['min', 'max', 'avg', 'sum', 'cardinality', 'value_count'])
    allowed_binary_operators = {'add': {'func': op.add, 'sign': '+'},
                                'subtract': {'func': op.sub, 'sign': '-'},
                                'multiply': {'func': op.mul, 'sign': '*'},
                                'divide': {'func': op.truediv, 'sign': '/'}}

    def __init__(self, *args):
        super(BinaryOperationOnAggregatedMetricRule, self).__init__(*args)
        self.ts_field = self.rules.get('timestamp_field', '@timestamp')
        self.metric_key_first = 'metric_' + self.rules['metric_agg_key_first'] + '_' + self.rules['metric_agg_type_first']
        self.metric_key_second = 'metric_' + self.rules['metric_agg_key_second'] + '_' + self.rules['metric_agg_type_second']
        self.binary_operator = self.allowed_binary_operators[self.rules['binary_operator']]
        self.rules['aggregation_query_element'] = self.generate_aggregation_query()

        if not self.rules['metric_agg_type_first'] in self.allowed_aggregations \
            or not self.rules['metric_agg_type_second'] in self.allowed_aggregations:
            raise EAException("metric_agg_type must be one of %s" % (str(self.allowed_aggregations)))

        if not self.rules['binary_operator'] in self.allowed_binary_operators.keys():
            raise EAException("binary_operator must be one of %s" % (str(self.allowed_binary_operators.keys())))

        if 'max_threshold' not in self.rules and 'min_threshold' not in self.rules:
            raise EAException("BinaryOperationOnAggregatedMetricRule must have at least one of either max_threshold or min_threshold")

    def get_match_str(self, match):
        message = 'Threshold violation, %s_%s %s %s_%s = %s (min: %s max : %s)\n\n' % (
            self.rules['metric_agg_key_first'],
            self.rules['metric_agg_type_first'],
            self.binary_operator['sign'],
            self.rules['metric_agg_key_second'],
            self.rules['metric_agg_type_second'],
            str(self.binary_operator['func'](*[match[self.metric_key_first],match[self.metric_key_second]])),
            self.rules.get('min_threshold'),
            self.rules.get('max_threshold')
        )
        if self.rules.get('delete_ruletype_text'):
            message = ''

        top_events = [[key[11:], counts] for key, counts in match.items() if key.startswith('top_events_')]

        def events_to_message(items):
            message = ''
            items = sorted(items, key=lambda x: x[1], reverse=True)
            for term, count in items:
                message += '%s : %s\n' % (term, count)
            return message

        for key, counts in top_events:
            message += '%s:\n' % (key)
            message += '%s\n' % (events_to_message(counts.items()))

        return message

    def generate_aggregation_query(self):
        """
        custom_top_count_keys: A list of fields.
            ElastAlert will perform a terms query for the top X most common values for each of the fields,
            where X is 5 by default, or custom_top_count_number if it exists.
        custom_top_count_number: The number of terms to list if custom_top_count_keys is set. (Optional, integer, default 5)
        """

        query = {
            "all_matching_docs": {
                "filters": {
                    "filters": {
                        "all": {
                            "match_all": {}
                        }
                    }
                },
                'aggs': {
                    'topx_match_aggs': {
                        "filter": {
                            "bool": {
                                "must": []
                            }
                        },
                        'aggregations': {
                        }
                    },
                    self.metric_key_first: {
                        self.rules['metric_agg_type_first']: {
                            'field': self.rules['metric_agg_key_first']
                        }
                    },
                    self.metric_key_second: {
                        self.rules['metric_agg_type_second']: {
                            'field': self.rules['metric_agg_key_second']
                        }
                    },
                    'binary_operation': {
                        'bucket_script': {
                            'buckets_path': {
                                'first': self.metric_key_first,
                                'second': self.metric_key_second
                            },
                            'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
                        }
                    }
                }
            }
        }

        if self.rules.get('custom_top_count_keys'):
            number = self.top_count_number = self.rules.get('custom_top_count_number', 5)
            keys = self.top_count_keys = self.rules.get('custom_top_count_keys')
            for key in keys:
                child_query = {
                    'terms': {
                        'field': key,
                        'order': {'_count': 'desc'},
                        'size': number
                    },
                    'aggs': {
                        'metric_aggregation_first': {
                            self.rules['metric_agg_type_first']: {'field': self.rules['metric_agg_key_first']}
                        },
                        'metric_aggregation_second': {
                            self.rules['metric_agg_type_second']: {'field': self.rules['metric_agg_key_second']}
                        },
                        'metric_aggregation': {
                            'bucket_script': {
                                'buckets_path': {
                                    'first': 'metric_aggregation_first',
                                    'second': 'metric_aggregation_second'
                                },
                                'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
                            }
                        }

                    }
                }
                query['all_matching_docs']['aggs']['topx_match_aggs']['aggregations'][key] = child_query
        return query

    def check_matches(self, timestamp, query_key, aggregation_data):
        if "compound_query_key" in self.rules:
            self.check_matches_recursive(timestamp, query_key, aggregation_data, self.rules['compound_query_key'], dict())
        else:
            metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
            metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
            binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']

            if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
                match = {
                    self.rules['timestamp_field']: timestamp,
                    self.metric_key_first: metric_val_first,
                    self.metric_key_second: metric_val_second,
                    'binary_operation': binary_operation
                }
                if query_key is not None:
                    match[self.rules['query_key']] = query_key

                # Set TopX counts
                if self.rules.get('custom_top_count_keys'):
                    counts = self.get_top_counts(aggregation_data)
                    match.update(counts)

                self.add_match(match)

    def check_matches_recursive(self, timestamp, query_key, aggregation_data, compound_keys, match_data):
        if len(compound_keys) < 1:
            # shouldn't get to this point, but checking for safety
            return

        match_data[compound_keys[0]] = aggregation_data['key']
        if 'bucket_aggs' in aggregation_data:
            for result in aggregation_data['bucket_aggs']['buckets']:
                self.check_matches_recursive(timestamp,
                                             query_key,
                                             result,
                                             compound_keys[1:],
                                             match_data)
        else:
            metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
            metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
            binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']

            if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
                match_data[self.rules['timestamp_field']] = timestamp
                match_data[self.metric_key_first] = metric_val_first
                match_data[self.metric_key_second] = metric_val_second
                match_data['binary_operation'] = binary_operation

                # add compound key to payload to allow alerts to trigger for every unique occurrence
                compound_value = [match_data[key] for key in self.rules['compound_query_key']]
                match_data[self.rules['query_key']] = ",".join([str(value) for value in compound_value])

                # Set TopX counts
                if self.rules.get('custom_top_count_keys'):
                    counts = self.get_top_counts(aggregation_data)
                    match_data.update(counts)

                self.add_match(match_data)

    def get_top_counts(self, aggregation_data):
        """
        Counts the number of events for each unique value for each key field.
        Returns a dictionary with top_events_<key> mapped to the top 5 counts for each key.
        """
        all_counts = {}
        number = self.top_count_number
        keys = self.top_count_keys
        for key in keys:

            hits_terms = aggregation_data['all_matching_docs']['buckets']['all'].get('topx_match_aggs').get(key, None)
            if hits_terms is None:
                top_events_count = {}
            else:
                buckets = hits_terms.get('buckets')

                terms = {}
                for bucket in buckets:
                    terms[bucket['key']] = bucket['metric_aggregation']['value']
                counts = terms.items()
                counts = sorted(counts, key=lambda x: x[1], reverse=True)
                top_events_count = dict(counts[:number])

            # Save a dict with the top 5 events by key
            all_counts['top_events_%s' % (key)] = top_events_count

        return all_counts

    def fulfill_condition(self, metric_val_first, metric_val_second, binary_operator):
        if metric_val_first is None or metric_val_second is None:
            return False
        if metric_val_second == 0 and binary_operator == op.truediv:
            return False
        if 'max_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) > self.rules['max_threshold']:
            return True
        if 'min_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) < self.rules['min_threshold']:
            return True
        return False

Der Code ist Python, aber der größte Teil besteht darin, die intern ausgegebene "Elasticsearch-Abfragestruktur" zu verstehen.

In dieser Abfrage war es notwendig, den Inhalt von "Bucket_Script" und "Buckets_path" zu verstehen.

Konfigurationsdatei erstellen

Die in der Konfigurationsdatei angegebenen Elemente, die diese Regel verwenden, lauten wie folgt.

Kann jeweils verwendet werden.

rule_setting.yaml



es_host: <host_name>
es_port: <port_number>

name: your rule name
type: "elastalert_modules.custom_rules.BinaryOperationOnAggregatedMetricRule"

index: <index_name>
timestamp_field: <timestamp_field_name>
doc_type: <doc_type_name>

# metric_agg_type must be one of ['min', 'max', 'avg', 'sum', 'cardinality', 'value_count']
# binary_operator must be one of ['add', 'subtract', 'multiply', 'divide']
metric_agg_key_first: fielde_a
metric_agg_type_first: sum
metric_agg_key_second: fielde_b
metric_agg_type_second: sum
binary_operator: subtract

min_threshold: -0.0001
max_threshold: 0.00001

query_key:
  - xxxxxx_id

custom_top_count_keys:
  - zzzzzz_id

Beispielalarm

Wenn die Einstellungen und Programme funktionieren, sollten Sie eine Warnung wie folgt erhalten:

your rule name

Threshold violation, fielde_a_sum - fielde_b_sum = 0.25 (min: -0.0001 max : 0.00001)

zzzzzz_id:
19 : 0.25

binary_operation: 0.25
xxxxxx_id: 19
time: 2019-12-08T00:00:00.00000000Z
metric_fielde_a_sum: 1.0
metric_fielde_b_sum: 0.75
num_hits: 5000
num_matches: 1

abschließend

Die meisten Überwachungsmuster sind möglich, solange Sie Abfragen ausgeben können, die wie beabsichtigt funktionieren.

Es gibt jedoch Fälle, in denen es nur beim Anpassen des Teils, das die Zeit angibt, etwas problematisch ist. Dies liegt daran, dass die Abfrage an einer anderen Stelle als der benutzerdefinierten Regel für den Teil erstellt wird, der die Zeit angibt. Daher ist es schwierig, die Zeitspezifikation so anzupassen, wie sie ist.

aber es ist okay. Wie hübsch. Ich möchte die Situation vorstellen, die bei der nächsten Gelegenheit bewältigt werden kann.

Recommended Posts

Erstellen Sie benutzerdefinierte Regeln mit ElastAlert
Erstelle Spiele mit Pygame
Erstellen Sie einen Filter mit scipy
Erstellen Sie eine Umgebung mit virtualenv
Erstellen Sie Cloud-TPU mit tf-nightly
Erstellen Sie eine API mit Django
Erstellen / Suchen / Erstellen einer Tabelle mit PynamoDB
Erstellen Sie ein 3D-GIF mit Python3
Plotly Erstellen Sie ein Diagramm mit einer Schaltfläche
Erstellen Sie eine Homepage mit Django
Erstellen Sie den Image Viewer mit Tkinter
Erstellen Sie eine Patentkarte mit Streamlit
Erstellen Sie ein Verzeichnis mit Python
Erstellen Sie eine Xlsx-Datei mit XlsxWriter
Erstellen Sie eine Plotanimation mit Python + Matplotlib
Erstellen Sie Awaitable mit der Python / C-API
Erstellen Sie eine Bildlaufrolle mit Pythonista + -Szene
[AWS] API mit API Gateway + Lambda erstellen
Erstellen Sie eine virtuelle Umgebung mit Python!
Verwenden Sie benutzerdefinierte Python-Skripte mit StackStorm
Machen wir ein LGTM-Bild mit GIMP! (Python-Fu)
Erstellen Sie mit SoX rauschgefüllte Audiodaten
Erstellen Sie mit Smart Trade grundlegende Algorithmen
Erstellen Sie eine API mit hug mit mod_wsgi
Erstellen Sie eine Altersgruppe mit Pandas
Erstellung eines benutzerdefinierten Splunk-Suchbefehls Teil 2
[GUI mit Python] PyQt5 -Custom Widget-
Erstellen Sie einen Poisson-Stepper mit numpy.random
Erstellen Sie Github-Seiten mit Lektor Part 1
Erstellen Sie mit Django einen Datei-Uploader
Verwenden Sie einen benutzerdefinierten Kernel mit WSL2