Dieser Artikel ist der 9. Tagesartikel von MicroAd Adventskalender 2019.
Jeder liebt elastische Suche, Ein Tool namens Elast Alert ist nützlich, wenn Sie Daten in Elasticsearch ständig überwachen und Warnungen überspringen möchten, wenn ein bestimmtes Muster auftritt.
ElastAlert wird standardmäßig mit einer Reihe von Überwachungsmustern geliefert, die als "Regel" bezeichnet werden. Manchmal reicht es jedoch nicht aus, Ihre Anforderungen zu erfüllen. In einem solchen Fall lösen wir das Problem, indem wir ein neues Überwachungsmuster erstellen, das als benutzerdefinierte Regel bezeichnet wird.
Diese beiden wurden als Referenz beim Erstellen benutzerdefinierter Regeln verwendet. Insbesondere letzteres ist ein Muss, da es praktischen Code enthält.
In diesem Artikel
Erstellen Sie eine Regel, die sein kann.
Wenn beispielsweise in einem Index von Elasticsearch field_a
und field_b
vorhanden sind,
-0.001 < sum(field_a) - sum(field_b) < 0.001
In diesem Zustand möchte ich so etwas wie ** Alarm ausführen, wenn ** verschwindet.
Als Beispiel für die spezifische Verwendung des Autors: "Es sollte gleich sein, aber es ist schlecht, wenn sich die Werte voneinander unterscheiden." Legen Sie diese Regel für zwei Daten fest und wenn es sich um unterschiedliche Werte handelt, sofort Es ist wie das Überspringen von Benachrichtigungen.
Unter den Standardregeln hat "MetricAggregationRule" eine ähnliche Funktion. Da diese Regel jedoch nur das Ergebnis der Aggregation eines einzelnen Felds verwenden kann, ist es nicht möglich, "das Berechnungsergebnis zwischen aggregierten Ergebnissen zu überwachen".
Unter Bezugnahme auf die MetricAggregationRule-Klasse ist dies die Basis der Regeln, die aggregierte Daten verarbeiten [BaseAggregationRule](https://github.com/ Yelp / elastalert / blob / 325f1dfe7a45f3ca2a2cc00127ab71fcd4f9cead / elastalert. ) Wird geerbt, um eine Klasse namens "BinaryOperationOnAggregatedMetricRule" zu erstellen.
elastalert/elastalert_modules/custum_rules.py
import operator as op
from elastalert.util import EAException
from elastalert.ruletypes import BaseAggregationRule
class BinaryOperationOnAggregatedMetricRule(BaseAggregationRule):
required_options = frozenset([
'metric_agg_key_first', 'metric_agg_key_second', 'metric_agg_type_first', 'metric_agg_type_second',
'binary_operator'
])
allowed_aggregations = frozenset(['min', 'max', 'avg', 'sum', 'cardinality', 'value_count'])
allowed_binary_operators = {'add': {'func': op.add, 'sign': '+'},
'subtract': {'func': op.sub, 'sign': '-'},
'multiply': {'func': op.mul, 'sign': '*'},
'divide': {'func': op.truediv, 'sign': '/'}}
def __init__(self, *args):
super(BinaryOperationOnAggregatedMetricRule, self).__init__(*args)
self.ts_field = self.rules.get('timestamp_field', '@timestamp')
self.metric_key_first = 'metric_' + self.rules['metric_agg_key_first'] + '_' + self.rules['metric_agg_type_first']
self.metric_key_second = 'metric_' + self.rules['metric_agg_key_second'] + '_' + self.rules['metric_agg_type_second']
self.binary_operator = self.allowed_binary_operators[self.rules['binary_operator']]
self.rules['aggregation_query_element'] = self.generate_aggregation_query()
if not self.rules['metric_agg_type_first'] in self.allowed_aggregations \
or not self.rules['metric_agg_type_second'] in self.allowed_aggregations:
raise EAException("metric_agg_type must be one of %s" % (str(self.allowed_aggregations)))
if not self.rules['binary_operator'] in self.allowed_binary_operators.keys():
raise EAException("binary_operator must be one of %s" % (str(self.allowed_binary_operators.keys())))
if 'max_threshold' not in self.rules and 'min_threshold' not in self.rules:
raise EAException("BinaryOperationOnAggregatedMetricRule must have at least one of either max_threshold or min_threshold")
def get_match_str(self, match):
message = 'Threshold violation, %s_%s %s %s_%s = %s (min: %s max : %s)\n\n' % (
self.rules['metric_agg_key_first'],
self.rules['metric_agg_type_first'],
self.binary_operator['sign'],
self.rules['metric_agg_key_second'],
self.rules['metric_agg_type_second'],
str(self.binary_operator['func'](*[match[self.metric_key_first],match[self.metric_key_second]])),
self.rules.get('min_threshold'),
self.rules.get('max_threshold')
)
if self.rules.get('delete_ruletype_text'):
message = ''
top_events = [[key[11:], counts] for key, counts in match.items() if key.startswith('top_events_')]
def events_to_message(items):
message = ''
items = sorted(items, key=lambda x: x[1], reverse=True)
for term, count in items:
message += '%s : %s\n' % (term, count)
return message
for key, counts in top_events:
message += '%s:\n' % (key)
message += '%s\n' % (events_to_message(counts.items()))
return message
def generate_aggregation_query(self):
"""
custom_top_count_keys: A list of fields.
ElastAlert will perform a terms query for the top X most common values for each of the fields,
where X is 5 by default, or custom_top_count_number if it exists.
custom_top_count_number: The number of terms to list if custom_top_count_keys is set. (Optional, integer, default 5)
"""
query = {
"all_matching_docs": {
"filters": {
"filters": {
"all": {
"match_all": {}
}
}
},
'aggs': {
'topx_match_aggs': {
"filter": {
"bool": {
"must": []
}
},
'aggregations': {
}
},
self.metric_key_first: {
self.rules['metric_agg_type_first']: {
'field': self.rules['metric_agg_key_first']
}
},
self.metric_key_second: {
self.rules['metric_agg_type_second']: {
'field': self.rules['metric_agg_key_second']
}
},
'binary_operation': {
'bucket_script': {
'buckets_path': {
'first': self.metric_key_first,
'second': self.metric_key_second
},
'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
}
}
}
}
}
if self.rules.get('custom_top_count_keys'):
number = self.top_count_number = self.rules.get('custom_top_count_number', 5)
keys = self.top_count_keys = self.rules.get('custom_top_count_keys')
for key in keys:
child_query = {
'terms': {
'field': key,
'order': {'_count': 'desc'},
'size': number
},
'aggs': {
'metric_aggregation_first': {
self.rules['metric_agg_type_first']: {'field': self.rules['metric_agg_key_first']}
},
'metric_aggregation_second': {
self.rules['metric_agg_type_second']: {'field': self.rules['metric_agg_key_second']}
},
'metric_aggregation': {
'bucket_script': {
'buckets_path': {
'first': 'metric_aggregation_first',
'second': 'metric_aggregation_second'
},
'script': 'params.first' + self.binary_operator['sign'] + 'params.second'
}
}
}
}
query['all_matching_docs']['aggs']['topx_match_aggs']['aggregations'][key] = child_query
return query
def check_matches(self, timestamp, query_key, aggregation_data):
if "compound_query_key" in self.rules:
self.check_matches_recursive(timestamp, query_key, aggregation_data, self.rules['compound_query_key'], dict())
else:
metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']
if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
match = {
self.rules['timestamp_field']: timestamp,
self.metric_key_first: metric_val_first,
self.metric_key_second: metric_val_second,
'binary_operation': binary_operation
}
if query_key is not None:
match[self.rules['query_key']] = query_key
# Set TopX counts
if self.rules.get('custom_top_count_keys'):
counts = self.get_top_counts(aggregation_data)
match.update(counts)
self.add_match(match)
def check_matches_recursive(self, timestamp, query_key, aggregation_data, compound_keys, match_data):
if len(compound_keys) < 1:
# shouldn't get to this point, but checking for safety
return
match_data[compound_keys[0]] = aggregation_data['key']
if 'bucket_aggs' in aggregation_data:
for result in aggregation_data['bucket_aggs']['buckets']:
self.check_matches_recursive(timestamp,
query_key,
result,
compound_keys[1:],
match_data)
else:
metric_val_first = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_first]['value']
metric_val_second = aggregation_data['all_matching_docs']['buckets']['all'][self.metric_key_second]['value']
binary_operation = aggregation_data['all_matching_docs']['buckets']['all']['binary_operation']['value']
if self.fulfill_condition(metric_val_first, metric_val_second, self.binary_operator['func']):
match_data[self.rules['timestamp_field']] = timestamp
match_data[self.metric_key_first] = metric_val_first
match_data[self.metric_key_second] = metric_val_second
match_data['binary_operation'] = binary_operation
# add compound key to payload to allow alerts to trigger for every unique occurrence
compound_value = [match_data[key] for key in self.rules['compound_query_key']]
match_data[self.rules['query_key']] = ",".join([str(value) for value in compound_value])
# Set TopX counts
if self.rules.get('custom_top_count_keys'):
counts = self.get_top_counts(aggregation_data)
match_data.update(counts)
self.add_match(match_data)
def get_top_counts(self, aggregation_data):
"""
Counts the number of events for each unique value for each key field.
Returns a dictionary with top_events_<key> mapped to the top 5 counts for each key.
"""
all_counts = {}
number = self.top_count_number
keys = self.top_count_keys
for key in keys:
hits_terms = aggregation_data['all_matching_docs']['buckets']['all'].get('topx_match_aggs').get(key, None)
if hits_terms is None:
top_events_count = {}
else:
buckets = hits_terms.get('buckets')
terms = {}
for bucket in buckets:
terms[bucket['key']] = bucket['metric_aggregation']['value']
counts = terms.items()
counts = sorted(counts, key=lambda x: x[1], reverse=True)
top_events_count = dict(counts[:number])
# Save a dict with the top 5 events by key
all_counts['top_events_%s' % (key)] = top_events_count
return all_counts
def fulfill_condition(self, metric_val_first, metric_val_second, binary_operator):
if metric_val_first is None or metric_val_second is None:
return False
if metric_val_second == 0 and binary_operator == op.truediv:
return False
if 'max_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) > self.rules['max_threshold']:
return True
if 'min_threshold' in self.rules and binary_operator(*[metric_val_first, metric_val_second]) < self.rules['min_threshold']:
return True
return False
Der Code ist Python, aber der größte Teil besteht darin, die intern ausgegebene "Elasticsearch-Abfragestruktur" zu verstehen.
In dieser Abfrage war es notwendig, den Inhalt von "Bucket_Script" und "Buckets_path" zu verstehen.
Die in der Konfigurationsdatei angegebenen Elemente, die diese Regel verwenden, lauten wie folgt.
+, -, ×, ÷
Kann jeweils verwendet werden.
rule_setting.yaml
es_host: <host_name>
es_port: <port_number>
name: your rule name
type: "elastalert_modules.custom_rules.BinaryOperationOnAggregatedMetricRule"
index: <index_name>
timestamp_field: <timestamp_field_name>
doc_type: <doc_type_name>
# metric_agg_type must be one of ['min', 'max', 'avg', 'sum', 'cardinality', 'value_count']
# binary_operator must be one of ['add', 'subtract', 'multiply', 'divide']
metric_agg_key_first: fielde_a
metric_agg_type_first: sum
metric_agg_key_second: fielde_b
metric_agg_type_second: sum
binary_operator: subtract
min_threshold: -0.0001
max_threshold: 0.00001
query_key:
- xxxxxx_id
custom_top_count_keys:
- zzzzzz_id
Wenn die Einstellungen und Programme funktionieren, sollten Sie eine Warnung wie folgt erhalten:
your rule name
Threshold violation, fielde_a_sum - fielde_b_sum = 0.25 (min: -0.0001 max : 0.00001)
zzzzzz_id:
19 : 0.25
binary_operation: 0.25
xxxxxx_id: 19
time: 2019-12-08T00:00:00.00000000Z
metric_fielde_a_sum: 1.0
metric_fielde_b_sum: 0.75
num_hits: 5000
num_matches: 1
Die meisten Überwachungsmuster sind möglich, solange Sie Abfragen ausgeben können, die wie beabsichtigt funktionieren.
Es gibt jedoch Fälle, in denen es nur beim Anpassen des Teils, das die Zeit angibt, etwas problematisch ist. Dies liegt daran, dass die Abfrage an einer anderen Stelle als der benutzerdefinierten Regel für den Teil erstellt wird, der die Zeit angibt. Daher ist es schwierig, die Zeitspezifikation so anzupassen, wie sie ist.
aber es ist okay. Wie hübsch. Ich möchte die Situation vorstellen, die bei der nächsten Gelegenheit bewältigt werden kann.
Recommended Posts