[PYTHON] Create custom rules with ElastAlert

This article is the 9th day article of MicroAd Advent Calendar 2019.

Introduction

Everyone loves Elasticsearch, A tool called [ElastAlert](https://github.com/ Yelp/elastalert) is useful when you want to constantly monitor data on Elasticsearch and skip alerts when a specific pattern occurs.

ElastAlert comes with a number of monitoring patterns called Rule by default, but there are times when you can't meet your needs by themselves. In such a case, let's solve it by creating a new monitoring pattern called a custom rule.

How to make

These two were used as a reference when creating custom rules. Especially the latter is a must-see because it has practical code.

Custom rule to create this time

In this article

Create a rule that can be.

For example, suppose you have field_a and field_b in an index with Elasticsearch.

-0.001 < sum(field_a) - sum(field_b) < 0.001

In that state, I want to do something like ** alert when ** disappears.

As an example of the specific use of the author, "It should be the same, but it is bad if the values are different from each other." Set this rule for two data, and if they are different values, immediately It's like skipping notifications to.

Among the default rules, MetricAggregationRule has a similar function, but since this rule can only use the result of aggregating a single field, it is not possible to" monitor the calculation result between aggregated results ".

Create custom rules

While referring to the MetricAggregationRule class, it is the base of the rules that handle aggregated data [BaseAggregationRule](https://github.com/ Yelp/elastalert/blob/325f1dfe7a45f3ca2a2cc00127ab71fcd4f9cead/elastalert/ruletypes.py#L972" Base ) Is inherited to create a class called BinaryOperationOnAggregatedMetricRule.

elastalert/elastalert_modules/custum_rules.py


import operator as op
from elastalert.util import EAException
from elastalert.ruletypes import BaseAggregationRule

class BinaryOperationOnAggregatedMetricRule(BaseAggregationRule):

    required_options = frozenset([
        'metric_agg_key_first', 'metric_agg_key_second', 'metric_agg_type_first', 'metric_agg_type_second',
        'binary_operator'
    ])
    allowed_aggregations = frozenset(['min', 'max', 'avg', 'sum', 'cardinality', 'value_count'])
    allowed_binary_operators = {'add': {'func': op.add, 'sign': '+'},
                                'subtract': {'func': op.sub, 'sign': '-'},
                                'multiply': {'func': op.mul, 'sign': '*'},
                                'divide': {'func': op.truediv, 'sign': '/'}}

    def __init__(self, *args):
        super(BinaryOperationOnAggregatedMetricRule, self).__init__(*args)
        self.ts_field = self.rules.get('timestamp_field', '@timestamp')
        self.metric_key_first = 'metric_' + self.rules['metric_agg_key_first'] + '_' + self.rules['metric_agg_type_first']
        self.metric_key_second = 'metric_' + self.rules['metric_agg_key_second'] + '_' + self.rules['metric_agg_type_second']
        self.binary_operator = self.allowed_binary_operators[self.rules['binary_operator']]
        self.rules['aggregation_query_element'] = self.generate_aggregation_query()

        if not self.rules['metric_agg_type_first'] in self.allowed_aggregations \
            or not self.rules['metric_agg_type_second'] in self.allowed_aggregations:
            raise EAException("metric_agg_type must be one of %s" % (str(self.allowed_aggregations)))

        if not self.rules['binary_operator'] in self.allowed_binary_operators.keys():
            raise EAException("binary_operator must be one of %s" % (str(self.allowed_binary_operators.keys())))

        if 'max_threshold' not in self.rules and 'min_threshold' not in self.rules:
            raise EAException("BinaryOperationOnAggregatedMetricRule must have at least one of either max_threshold or min_threshold")

    def get_match_str(self, match):
        message = 'Threshold violation, %s_%s %s %s_%s = %s (min: %s max : %s)\n\n' % (
            self.rules['metric_agg_key_first'],
            self.rules['metric_agg_type_first'],
            self.binary_operator['sign'],
            self.rules['metric_agg_key_second'],
            self.rules['metric_agg_type_second'],
            str(self.binary_operator['func'](*[match[self.metric_key_first],match[self.metric_key_second]])),
            self.rules.get('min_threshold'),
            self.rules.get('max_threshold')
        )
        if self.rules.get('delete_ruletype_text'):
            message = ''

        top_events = [[key[11:], counts] for key, counts in match.items() if key.startswith('top_events_')]

        def events_to_message(items):
            message = ''
            items = sorted(items, key=lambda x: x[1], reverse=True)
            for term, count in items:
                message += '%s : %s\n' % (term, count)
            return message

        for key, counts in top_events:
            message += '%s:\n' % (key)
            message += '%s\n' % (events_to_message(counts.items()))

        return message

    def generate_aggregation_query(self):
        """
        custom_top_count_keys: A list of fields.
            ElastAlert will perform a terms query for the top X most common values for each of the fields,
            where X is 5 by default, or custom_top_count_number if it exists.
        custom_top_count_number: The number of terms to list if custom_top_count_keys is set. (Optional, integer, default 5)
        """

        query = {
            "all_matching_docs": {
                "filters": {
                    "filters": {
                        "all": {
                            "match_all": {}
                        }
                    }
                },
                'aggs': {
                    'topx_match_aggs': {
                        "filter": {
                            "bool": {
                                "must": []
                            }
                        },
                        'aggregations': {
                        }
                    },
                    self.metric_key_first: {
                        self.rules['metric_agg_type_first']: {
                            'field': self.rules['metric_agg_key_first']
                        }
                    },
                    self.metric_key_second: {
                        self.rules['metric_agg_type_second']: {
                            'field': self.rules['metric_agg_key_second']
                        }
                    },
                    'binary_operation': {
                        'bucket_script': {
                            'buckets_path': {
                                'first': self.metric_key_first,
                                'second': self.metric_key_second
                            },
                            'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
                        }
                    }
                }
            }
        }

        if self.rules.get('custom_top_count_keys'):
            number = self.top_count_number = self.rules.get('custom_top_count_number', 5)
            keys = self.top_count_keys = self.rules.get('custom_top_count_keys')
            for key in keys:
                child_query = {
                    'terms': {
                        'field': key,
                        'order': {'_count': 'desc'},
                        'size': number
                    },
                    'aggs': {
                        'metric_aggregation_first': {
                            self.rules['metric_agg_type_first']: {'field': self.rules['metric_agg_key_first']}
                        },
                        'metric_aggregation_second': {
                            self.rules['metric_agg_type_second']: {'field': self.rules['metric_agg_key_second']}
                        },
                        'metric_aggregation': {
                            'bucket_script': {
                                'buckets_path': {
                                    'first': 'metric_aggregation_first',
                                    'second': 'metric_aggregation_second'
                                },
                                'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
                            }
                        }

                    }
                }
                query['all_matching_docs']['aggs']['topx_match_aggs']['aggregations'][key] = child_query
        return query

    def check_matches(self, timestamp, query_key, aggregation_data):
        if "compound_query_key" in self.rules:
            self.check_matches_recursive(timestamp, query_key, aggregation_data, self.rules['compound_query_key'], dict())
        else:
            metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
            metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
            binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']

            if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
                match = {
                    self.rules['timestamp_field']: timestamp,
                    self.metric_key_first: metric_val_first,
                    self.metric_key_second: metric_val_second,
                    'binary_operation': binary_operation
                }
                if query_key is not None:
                    match[self.rules['query_key']] = query_key

                # Set TopX counts
                if self.rules.get('custom_top_count_keys'):
                    counts = self.get_top_counts(aggregation_data)
                    match.update(counts)

                self.add_match(match)

    def check_matches_recursive(self, timestamp, query_key, aggregation_data, compound_keys, match_data):
        if len(compound_keys) < 1:
            # shouldn't get to this point, but checking for safety
            return

        match_data[compound_keys[0]] = aggregation_data['key']
        if 'bucket_aggs' in aggregation_data:
            for result in aggregation_data['bucket_aggs']['buckets']:
                self.check_matches_recursive(timestamp,
                                             query_key,
                                             result,
                                             compound_keys[1:],
                                             match_data)
        else:
            metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
            metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
            binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']

            if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
                match_data[self.rules['timestamp_field']] = timestamp
                match_data[self.metric_key_first] = metric_val_first
                match_data[self.metric_key_second] = metric_val_second
                match_data['binary_operation'] = binary_operation

                # add compound key to payload to allow alerts to trigger for every unique occurrence
                compound_value = [match_data[key] for key in self.rules['compound_query_key']]
                match_data[self.rules['query_key']] = ",".join([str(value) for value in compound_value])

                # Set TopX counts
                if self.rules.get('custom_top_count_keys'):
                    counts = self.get_top_counts(aggregation_data)
                    match_data.update(counts)

                self.add_match(match_data)

    def get_top_counts(self, aggregation_data):
        """
        Counts the number of events for each unique value for each key field.
        Returns a dictionary with top_events_<key> mapped to the top 5 counts for each key.
        """
        all_counts = {}
        number = self.top_count_number
        keys = self.top_count_keys
        for key in keys:

            hits_terms = aggregation_data['all_matching_docs']['buckets']['all'].get('topx_match_aggs').get(key, None)
            if hits_terms is None:
                top_events_count = {}
            else:
                buckets = hits_terms.get('buckets')

                terms = {}
                for bucket in buckets:
                    terms[bucket['key']] = bucket['metric_aggregation']['value']
                counts = terms.items()
                counts = sorted(counts, key=lambda x: x[1], reverse=True)
                top_events_count = dict(counts[:number])

            # Save a dict with the top 5 events by key
            all_counts['top_events_%s' % (key)] = top_events_count

        return all_counts

    def fulfill_condition(self, metric_val_first, metric_val_second, binary_operator):
        if metric_val_first is None or metric_val_second is None:
            return False
        if metric_val_second == 0 and binary_operator == op.truediv:
            return False
        if 'max_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) > self.rules['max_threshold']:
            return True
        if 'min_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) < self.rules['min_threshold']:
            return True
        return False

The code is python, but most of the point is to understand the structure of the internally issued ʻElasticsearch query`.

In this query, it was necessary to understand the contents of bucket_script and buckets_path.

Creating a configuration file

The items specified in the configuration file that uses this rule are as follows.

Can be used respectively.

rule_setting.yaml



es_host: <host_name>
es_port: <port_number>

name: your rule name
type: "elastalert_modules.custom_rules.BinaryOperationOnAggregatedMetricRule"

index: <index_name>
timestamp_field: <timestamp_field_name>
doc_type: <doc_type_name>

# metric_agg_type must be one of ['min', 'max', 'avg', 'sum', 'cardinality', 'value_count']
# binary_operator must be one of ['add', 'subtract', 'multiply', 'divide']
metric_agg_key_first: fielde_a
metric_agg_type_first: sum
metric_agg_key_second: fielde_b
metric_agg_type_second: sum
binary_operator: subtract

min_threshold: -0.0001
max_threshold: 0.00001

query_key:
  - xxxxxx_id

custom_top_count_keys:
  - zzzzzz_id

Sample alert

If the settings and programs work, you should get an alert like this:

your rule name

Threshold violation, fielde_a_sum - fielde_b_sum = 0.25 (min: -0.0001 max : 0.00001)

zzzzzz_id:
19 : 0.25

binary_operation: 0.25
xxxxxx_id: 19
time: 2019-12-08T00:00:00.00000000Z
metric_fielde_a_sum: 1.0
metric_fielde_b_sum: 0.75
num_hits: 5000
num_matches: 1

in conclusion

Most monitoring patterns are feasible as long as you can issue queries that work as intended.

However, there are cases where it is a little troublesome only when customizing the part that specifies the time. This is because the query is constructed in a place different from the custom rule for the part that specifies the time, so it is difficult to customize the time specification as it is.

but it's okay. How pretty. I would like to introduce the situation that can be managed at the next opportunity.

Recommended Posts

Create custom rules with ElastAlert
Create games with Pygame
Create filter with scipy
Create an environment with virtualenv
Create Cloud TPU with tf-nightly
Create an API with Django
Create / search / create table with PynamoDB
Create 3d gif with python3
Create graph with plotly button
Create a homepage with django
Create Image Viewer with Tkinter
Create patent map with Streamlit
Create a heatmap with pyqtgraph
Create a directory with python
Create xlsx file with XlsxWriter
Create plot animation with Python + Matplotlib
Create Awaitable with Python / C API
Create screen scroll with Pythonista + scene
[AWS] Create API with API Gateway + Lambda
Create folders from '01' to '12' with python
Create a virtual environment with Python!
Create an Excel file with Python3
Utilize Python custom scripts with StackStorm
Create LGTM images with GIMP! (Python-fu)
Dynamically create new dataframes with pandas
Create noise-filled audio data with SoX
Create basic algorithms with Smart Trade
Create API using hug with mod_wsgi
Create an age group with pandas
Create Splunk custom search command Part 2
[GUI with Python] PyQt5 -Custom Widget-
Create a poisson stepper with numpy.random
Create github pages with lektor Part 1
Create a file uploader with Django
Use a custom kernel with WSL2