【监控】告警聚合|告警召回功能实现

    监控系统发展的过程中,当你的系统监控规则和数据比较完善的时候或者覆盖面比较高的情况下,在发生问题时就会产生大量的告警信息或者持续的告警,在告警的持续阶段从低-中-高和相关的告警都会出现多次。所以在告警发生时对于关键信息的提取尤为关键

针对应用告警:

我们应用告警主要采用Prometheus,基于Alertmanager做了基于alertname、instance和pod的分组聚合覆盖容器和自定义指标的分组聚合,以及根据定义的级别进行了同组、同机器、同集群高级别静默低级别
参考: https://www.noalert.cn/post/ru-he-yong-yuan-sheng-prometheus-jian-kong-da-gui-mo-kubernetes-ji-qun/

针对业务告警:

逻辑架构:

业务模块:

我们的业务告警全部是基于现网日志去做的监控告警分析,现网基本覆盖所有组件的性能日志和业务日志。对于消费链路、消费过程中的问题和打日志的规范问题不做阐述

业务告警规则:

  • 基于给定的状态码或耗时进行全量日志的比例分析和关键词提取
  • 告警检测时间 默认一分钟
  • 告警检测全量数据默认为type:_doc,部分自定义
  • 告警规则带有业务模块属性
  • 告警规则带有告警推送地址
  • 告警规则带有检测比例属性

业务规则模板

index: access-xxxx-*
name: aiuiathenaosstimeout
type: percentage_match

percentage_config:
  items:
    高: [5, 100]
    中: [1, 5]
    低: [0.1, 1]
percentage_format_string: "%.3f"
match_min_num: 10
aiui_group: ['aiui-athena-nlu']
minutes: 1

filter:
- term:
    _type: doc
match_bucket_filter:
- terms:
    ret: [11004]
http_post_url: "http://xxxxxxx:8088/alert"
http_post_static_payload:
    subject: 上海-入口-组件名称-11004异常状态码监控
    message: 上海-组件异常ret-11004

业务告警格式

_group': ['athena', 'aiui-test'], '_level': '低', 'num_matches': 1, 'message': '告警信息二', 'match_percentage': '0.110', 'subject': '告警标
题二'}

talking is cheap show me code

##代码环境

  • Python tornado 框架
  • elastalert
    ##主体更改
AGG_LOOP_MS = 65000  #毫秒
MAX_AGG_SECONDS = 3600 #秒
ALERT_TTL = 90 #秒
EVENT_LOG_FILE =  "./test.log"

now = time.time()
unique_value = str(random.randint(0, 3000000000))
#unique_value = str(random.sample('abcdefghijklmnopqrstuvwxyz0123456789', 8))
logger = EventLogger(EVENT_LOG_FILE)
logger.write(now, "alert", "merge", unique_value, ALERT_TTL, json.dumps(data,ensure_ascii=False))
#框架调用
ioloop.PeriodicCallback(cronjob_aggregation, AGG_LOOP_MS).start()
ioloop.IOLoop.current().start()

event_logger.py #告警写入和读取

#!python3
# -*- coding: utf-8 -*-
import json
import os
import pathlib
import requests

TS = 0
EVENT_TYPE = 1
STATUS = 2
UNIQ_VALUE = 3
TTL = 4
MSG = 5
class EventLogger(object):
    file_path = ""
    content = []

    def __init__(self, file_path):
        self.file_path = file_path
        self.read()

    def __del__(self):
        pass

    def write(self, ts, event_type, status, uniq_value, ttl, msg):
        if isinstance(msg, dict):
            msg = json.dumps(msg)
        with open(self.file_path, "a+") as f:
            ts = int(ts)
            ttl = int(ttl)
            f.write("{};{};{};{};{};{}\n".format(ts, event_type, status, uniq_value, ttl, msg))
            self.content.append((ts, event_type, status, uniq_value, ttl, msg))
            print "alert insert ok"

    def read(self):
        if not os.path.isfile(self.file_path):
            pathlib.Path(self.file_path).touch()
        with open(self.file_path, "r") as f:
            self.content = []
            #for l in f.readlines():
            for l in f.read().splitlines():
                l = l.split(";")
                info = l[:MSG]
                info[TS] = int(info[TS])
                info[TTL] = int(info[TTL])
                msg = ";".join(l[MSG:])
                self.content.append((info + [msg]))

聚合核心部分:

def cronjob_aggregation():
    logger = EventLogger(EVENT_LOG_FILE)
    content = [x for x in logger.content]
    content.reverse()
    now = time.time()
    left_time = now - MAX_AGG_SECONDS

    msg_to_send = defaultdict(list)
    wechatid_to_send = list()
    already_send_set = dict()
    to_group = 0
    # 获取所有没有发送的聚合消息
    for row in content:
        if row[TS] < left_time:
            break  # 超过支持的最大消息过期时间就不处理了
        if row[EVENT_TYPE] == "alert" and row[STATUS] == "merge" and row[UNIQ_VALUE] not in already_send_set:
            msg_to_send[row[UNIQ_VALUE]].append(row)
        if row[EVENT_TYPE] == "alert" and row[STATUS] == "send" and row[UNIQ_VALUE] not in already_send_set:
            already_send_set[row[UNIQ_VALUE]] = int(row[TTL]) + int(row[TS])  # 标记每个聚合消息的超时时间
        if 'aiui_group' in json.loads(row[MSG]).keys() and row[UNIQ_VALUE] not in already_send_set:
            wechatid_to_send.append(parse_user_info(json.loads(row[MSG]))['id'])
        if 'level' in json.loads(row[MSG]) and row[UNIQ_VALUE] not in already_send_set:
           if json.loads(row[MSG])['level'].encode("utf-8") in ["高","disaster"]:
              to_group = 1

    # 发送这些消息
    alert_content = ''
    list_alert_content = []
    level_list = []
    for k, v in msg_to_send.items():
        if int(time.time()) <= already_send_set.get(k, 0):
            continue  # 没到超时时间的直接跳过
        msg_decoded = [json.loads(x[MSG]) for x in v]
        list_alert_content += [x["message"] for x in msg_decoded]
        alert_content = '\n'.join(list_alert_content)
        subject = "[聚合消息] {}".format(msg_decoded[-1]["subject"])
        logger.write(now, "alert", "send", k, v[-1][TTL], json.dumps({"subject":subject, "message":alert_content},ensure_ascii=False))
    if msg_to_send.items() and len(msg_to_send.items()) > 1 and to_group == 1:
       subject += ' 共'+str(len(msg_to_send.items())) +'条告警'
       send2tag(subject.encode("utf-8"),alert_content,'|'.join(wechatid_to_send))
       send2group(subject.encode("utf-8"),alert_content)
    elif msg_to_send.items() and len(msg_to_send.items()) > 1 and to_group == 0:
       subject += ' 共'+str(len(msg_to_send.items())) +'条告警'
       send2tag(subject.encode("utf-8"),alert_content,'|'.join(wechatid_to_send))
    else:
       subject = "[业务告警] {}".format(msg_decoded[-1]["subject"])
       send2tag(subject.encode("utf-8"),alert_content,'|'.join(wechatid_to_send))

还可以根据重要的告警级别进行TTL时间内的高级别告警召回,代码如上更改捞取数据和发送规则即可