使用Python基于metricbeat和heartbeat采集数据进行告警
一、系统架构
IP | 主机名 | 角色 | 备注 |
---|---|---|---|
11.0.1.11 | kafka1 | kafka和MySQL | |
11.0.1.12 | kafka2 | kafka | |
11.0.1.13 | kafka3 | kafka | |
11.0.1.14 | demo1 | metricbeat和heartbeat |
二、部署Kafka
省略
二、部署Metricbeat和Heartbeat
metricbeat配置:
metricbeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
fields:
ip: 11.0.1.14
output.kafka:
hosts: ["11.0.1.11:9092","11.0.1.12:9092","11.0.1.13:9092"]
topic: "ELK-metricbeat"
heartbeat配置:
heartbeat.config.monitors:
path: ${path.config}/monitors.d/*.yml
reload.enabled: false
reload.period: 5s
# ---------------------------- Kafka Output ----------------------------
output.kafka:
hosts: ["11.0.1.11:9092","11.0.1.12:9092","11.0.1.13:9092"]
topic: "ELK-heartbeat"
heartbeat的tcp.yml配置:
- type: tcp
id: my-tcp-monitor
name: My TCP monitor
enabled: true
schedule: '@every 20s'
hosts: ["11.0.1.14:80","11.0.1.13:80","11.0.1.12:80"]
ipv4: true
ipv6: true
mode: all
三、MariaDB表结构
cmdb_app表(存储应用系统的信息):
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cmdb_app
-- ----------------------------
DROP TABLE IF EXISTS `cmdb_app`;
CREATE TABLE `cmdb_app` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`ops_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`ops_tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`ops_dep` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;
SET FOREIGN_KEY_CHECKS = 1;
解释:
app_name:系统名称
ops_user:运维人员姓名
ops_tel:运维人员手机号
ops_dep:运维责任部门
cmdb_os表(存储服务器信息):
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cmdb_os
-- ----------------------------
DROP TABLE IF EXISTS `cmdb_os`;
CREATE TABLE `cmdb_os` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`eip` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`module` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;
SET FOREIGN_KEY_CHECKS = 1;
解释:
app_name:系统信息
eip:服务器IP
module:服务器用途
alert_list表(存储告警信息):
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for alert_list
-- ----------------------------
DROP TABLE IF EXISTS `alert_list`;
CREATE TABLE `alert_list` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`timestamp` datetime NULL DEFAULT NULL,
`url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`status` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`ops_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`ops_tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`ops_dep` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`module` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;
SET FOREIGN_KEY_CHECKS = 1;
四、使用Python程序,从Kafka读取数据,并将cmdb_os和cmdb_app信息根据kafka数据中的ip信息匹配起来,并将新的数据写入到新的Kafka
安装依赖:
pip install kafka-python pymysql apscheduler pyyaml
先说metricbeat_replace.py:
import json
import logging
from logging.handlers import RotatingFileHandler
import pymysql
from kafka import KafkaConsumer, KafkaProducer
import yaml
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
class DatabaseConnectionError(Exception):
def __init__(self, message="数据库连接失败"):
self.message = message
super().__init__(self.message)
class KafkaCMDBProcessor:
def __init__(self, kafka_config, mysql_config):
self.kafka_config = kafka_config
self.mysql_config = mysql_config
self.logger = self.setup_logger()
self.cmdb_data = None
# 初始化调度器
self.scheduler = BackgroundScheduler()
self.scheduler.start()
# 添加定时任务,每个整点、10分、20分、30分、40分、50分的时候执行
self.scheduler.add_job(self.load_cmdb_data, CronTrigger(minute='0,10,20,30,40,50'))
@staticmethod
def setup_logger():
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# 创建控制台处理程序并设置级别为调试
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# 创建文件处理程序并设置级别为调试,最大文件大小为1 MB,保留备份文件3个
fh = RotatingFileHandler('metricbeat_replace.log', maxBytes=1e6, backupCount=3)
fh.setLevel(logging.DEBUG)
# 创建格式化器
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 将格式化器添加到处理程序
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# 将处理程序添加到记录器
logger.addHandler(ch)
logger.addHandler(fh)
return logger
def start_processing(self):
self.connect_to_database() # 初始化时第一次连接数据库
self.load_cmdb_data() # 初始化时加载数据到内存
self.logger.info("开始处理...")
consumer = KafkaConsumer(
self.kafka_config['input_topic'],
group_id=self.kafka_config['consumer_group_id'],
bootstrap_servers=self.kafka_config['bootstrap_servers'],
auto_offset_reset='earliest'
)
self.logger.info("Kafka 消费者已创建.")
producer = KafkaProducer(bootstrap_servers=self.kafka_config['bootstrap_servers'])
self.logger.info("Kafka 生产者已创建.")
try:
for msg in consumer:
metricbeat_data = msg.value.decode('utf-8')
ip = self.extract_ip(metricbeat_data)
cmdb_data = self.get_cmdb_data(ip)
self.process_and_send_message(producer, metricbeat_data, cmdb_data)
except KeyboardInterrupt:
self.logger.info("接收到 KeyboardInterrupt。正在优雅地关闭。")
except Exception as e:
self.logger.error(f"发生错误:{str(e)}")
# 如果在处理过程中发生异常,可以在这里添加适当的处理逻辑
finally:
consumer.close()
producer.close()
def connect_to_database(self):
try:
self.logger.info("正在连接数据库...")
db = pymysql.connect(
host=self.mysql_config['host'],
port=self.mysql_config['port'],
user=self.mysql_config['user'],
password=self.mysql_config['password'],
database=self.mysql_config['db']
)
self.logger.info("数据库连接成功.")
self.db_connection_error_logged = False # 连接成功后重置连接错误标志
except pymysql.Error as e:
error_message = f"连接数据库时发生错误:{str(e)}"
self.logger.error(error_message.split('\n')[0])
raise DatabaseConnectionError(error_message) from e
finally:
if db:
db.close()
def load_cmdb_data(self):
db = None
cursor = None
try:
self.logger.info("开始加载数据.")
db = pymysql.connect(
host=self.mysql_config['host'],
port=self.mysql_config['port'],
user=self.mysql_config['user'],
password=self.mysql_config['password'],
database=self.mysql_config['db']
)
cursor = db.cursor()
# 查询 cmdb_os 表中的数据
sql_cmdb_os = "SELECT app_name, eip, module FROM cmdb_os"
cursor.execute(sql_cmdb_os)
cmdb_os_result = cursor.fetchall()
# 查询 cmdb_app 表中的数据
sql_cmdb_app = "SELECT app_name, ops_user, ops_tel, ops_dep FROM cmdb_app"
cursor.execute(sql_cmdb_app)
cmdb_app_result = cursor.fetchall()
# 将数据保存到内存中
self.cmdb_data = {
"cmdb_os": cmdb_os_result,
"cmdb_app": cmdb_app_result
}
self.logger.info("数据加载完成.")
except pymysql.Error as e:
error_message = f"加载数据时发生数据库错误:{str(e)}"
self.logger.error(error_message.split('\n')[0])
self.logger.warning("数据库连接异常,继续沿用上次的 CMDB 数据.")
finally:
if cursor:
cursor.close()
if db:
db.close()
@staticmethod
def extract_ip(metricbeat_data):
data = json.loads(metricbeat_data)
return data.get('fields', {}).get('ip', '')
def get_cmdb_data(self, ip):
if self.cmdb_data:
# 在内存中查找数据
cmdb_os_data = [row for row in self.cmdb_data["cmdb_os"] if row[1] == ip]
cmdb_app_data = [row for row in self.cmdb_data["cmdb_app"] if row[0] == cmdb_os_data[0][0]]
return cmdb_os_data, cmdb_app_data
else:
return None
def process_and_send_message(self, producer, original_data, cmdb_data):
original_data_str = original_data.decode('utf-8') if isinstance(original_data, bytes) else original_data
new_message = json.loads(original_data_str)
if cmdb_data:
cmdb_os_data, cmdb_app_data = cmdb_data
new_message["cmdb_data"] = {
"app_name": cmdb_os_data[0][0],
"eip": cmdb_os_data[0][1],
"module": cmdb_os_data[0][2],
"ops_user": cmdb_app_data[0][1],
"ops_tel": cmdb_app_data[0][2],
"ops_dep": cmdb_app_data[0][3]
}
else:
new_message["cmdb_data"] = None
producer.send(self.kafka_config['output_topic'], value=json.dumps(new_message, ensure_ascii=False).encode('utf-8'))
producer.flush()
if __name__ == "__main__":
try:
with open('application.yml', 'r') as config_file:
config_data = yaml.safe_load(config_file)
kafka_config_data = config_data.get('kafka', {})
mysql_config_data = config_data.get('mysql', {})
processor = KafkaCMDBProcessor(kafka_config_data, mysql_config_data)
processor.start_processing()
except FileNotFoundError:
print("错误:找不到配置文件 'application.yml'。")
except Exception as e:
print(f"发生意外错误:{str(e)}")
application.yml配置如下:
kafka:
bootstrap_servers:
- '11.0.1.11:9092'
- '11.0.1.12:9092'
- '11.0.1.13:9092'
consumer_group_id: 'metricbeat_replace'
input_topic: 'ELK-metricbeat'
output_topic: 'ELK-system_metricbeat'
mysql:
host: '11.0.1.11'
port: 13306
user: 'root'
password: '123456'
db: 'zll_python_test'
处理后的数据如下:
{"@timestamp": "2024-01-20T14:02:34.706Z", "@metadata": {"beat": "metricbeat", "type": "_doc", "version": "8.11.1"}, "host": {"name": "demo1"}, "agent": {"type": "metricbeat", "version": "8.11.1", "ephemeral_id": "979b3ab7-80af-4ab5-a552-3692165b7000", "id": "982d0bd1-d0d9-45b5-bc78-0a5f25911c12", "name": "demo1"}, "metricset": {"name": "memory", "period": 10000}, "event": {"module": "system", "duration": 120280, "dataset": "system.memory"}, "service": {"type": "system"}, "system": {"memory": {"used": {"pct": 0.2325, "bytes": 919130112}, "free": 3034763264, "cached": 529936384, "actual": {"used": {"pct": 0.1493, "bytes": 590319616}, "free": 3363573760}, "swap": {"total": 2147479552, "used": {"bytes": 0, "pct": 0}, "free": 2147479552}, "total": 3953893376}}, "fields": {"ip": "11.0.1.14"}, "ecs": {"version": "8.0.0"}, "cmdb_data": {"app_name": "应用系统", "eip": "11.0.1.14", "module": "demo1", "ops_user": "运维仔", "ops_tel": "12345678901", "ops_dep": "运维部"}}
heartbeat_replace.py如下:
import json
import logging
from logging.handlers import RotatingFileHandler
import pymysql
from kafka import KafkaConsumer, KafkaProducer
import yaml
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
class DatabaseConnectionError(Exception):
def __init__(self, message="数据库连接失败"):
self.message = message
super().__init__(self.message)
class KafkaCMDBProcessor:
def __init__(self, kafka_config, mysql_config):
self.kafka_config = kafka_config
self.mysql_config = mysql_config
self.logger = self.setup_logger()
self.cmdb_data = None
# 初始化调度器
self.scheduler = BackgroundScheduler()
self.scheduler.start()
# 添加定时任务,每个整点、10分、20分、30分、40分、50分的时候执行
self.scheduler.add_job(self.load_cmdb_data, CronTrigger(minute='0,10,20,30,40,50'))
@staticmethod
def setup_logger():
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# 创建控制台处理程序并设置级别为调试
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# 创建文件处理程序并设置级别为调试,最大文件大小为1 MB,保留备份文件3个
fh = RotatingFileHandler('heartbeat_replace.log', maxBytes=1e6, backupCount=3)
fh.setLevel(logging.DEBUG)
# 创建格式化器
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# 将格式化器添加到处理程序
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# 将处理程序添加到记录器
logger.addHandler(ch)
logger.addHandler(fh)
return logger
def start_processing(self):
self.connect_to_database() # 初始化时第一次连接数据库
self.load_cmdb_data() # 初始化时加载数据到内存
self.logger.info("开始处理...")
consumer = KafkaConsumer(
self.kafka_config['input_topic'],
group_id=self.kafka_config['consumer_group_id'],
bootstrap_servers=self.kafka_config['bootstrap_servers'],
auto_offset_reset='earliest'
)
self.logger.info("Kafka 消费者已创建.")
producer = KafkaProducer(bootstrap_servers=self.kafka_config['bootstrap_servers'])
self.logger.info("Kafka 生产者已创建.")
try:
for msg in consumer:
heartbeat_data = msg.value.decode('utf-8')
ip = self.extract_url_domain(heartbeat_data)
cmdb_data = self.get_cmdb_data(ip)
self.process_and_send_message(producer, heartbeat_data, cmdb_data)
except KeyboardInterrupt:
self.logger.info("接收到 KeyboardInterrupt。正在优雅地关闭。")
except Exception as e:
self.logger.error(f"发生错误:{str(e)}")
# 如果在处理过程中发生异常,可以在这里添加适当的处理逻辑
finally:
consumer.close()
producer.close()
def connect_to_database(self):
try:
self.logger.info("正在连接数据库...")
db = pymysql.connect(
host=self.mysql_config['host'],
port=self.mysql_config['port'],
user=self.mysql_config['user'],
password=self.mysql_config['password'],
database=self.mysql_config['db']
)
self.logger.info("数据库连接成功.")
self.db_connection_error_logged = False # 连接成功后重置连接错误标志
except pymysql.Error as e:
error_message = f"连接数据库时发生错误:{str(e)}"
self.logger.error(error_message.split('\n')[0])
raise DatabaseConnectionError(error_message) from e
finally:
if db:
db.close()
def load_cmdb_data(self):
db = None
cursor = None
try:
self.logger.info("开始加载数据.")
db = pymysql.connect(
host=self.mysql_config['host'],
port=self.mysql_config['port'],
user=self.mysql_config['user'],
password=self.mysql_config['password'],
database=self.mysql_config['db']
)
cursor = db.cursor()
# 查询 cmdb_os 表中的数据
sql_cmdb_os = "SELECT app_name, eip, module FROM cmdb_os"
cursor.execute(sql_cmdb_os)
cmdb_os_result = cursor.fetchall()
# 查询 cmdb_app 表中的数据
sql_cmdb_app = "SELECT app_name, ops_user, ops_tel, ops_dep FROM cmdb_app"
cursor.execute(sql_cmdb_app)
cmdb_app_result = cursor.fetchall()
# 将数据保存到内存中
self.cmdb_data = {
"cmdb_os": cmdb_os_result,
"cmdb_app": cmdb_app_result
}
self.logger.info("数据加载完成.")
except pymysql.Error as e:
error_message = f"加载数据时发生数据库错误:{str(e)}"
self.logger.error(error_message.split('\n')[0])
self.logger.warning("数据库连接异常,继续沿用上次的 CMDB 数据.")
finally:
if cursor:
cursor.close()
if db:
db.close()
@staticmethod
def extract_url_domain(heartbeat_data):
data = json.loads(heartbeat_data)
return data.get('url', {}).get('domain', '')
def get_cmdb_data(self, ip):
if self.cmdb_data:
# 在内存中查找数据
cmdb_os_data = [row for row in self.cmdb_data["cmdb_os"] if row[1] == ip]
cmdb_app_data = [row for row in self.cmdb_data["cmdb_app"] if row[0] == cmdb_os_data[0][0]]
return cmdb_os_data, cmdb_app_data
else:
return None
def process_and_send_message(self, producer, original_data, cmdb_data):
original_data_str = original_data.decode('utf-8') if isinstance(original_data, bytes) else original_data
new_message = json.loads(original_data_str)
if cmdb_data:
cmdb_os_data, cmdb_app_data = cmdb_data
new_message["cmdb_data"] = {
"app_name": cmdb_os_data[0][0],
"eip": cmdb_os_data[0][1],
"module": cmdb_os_data[0][2],
"ops_user": cmdb_app_data[0][1],
"ops_tel": cmdb_app_data[0][2],
"ops_dep": cmdb_app_data[0][3]
}
else:
new_message["cmdb_data"] = None
producer.send(self.kafka_config['output_topic'], value=json.dumps(new_message, ensure_ascii=False).encode('utf-8'))
producer.flush()
if __name__ == "__main__":
try:
with open('application.yml', 'r') as config_file:
config_data = yaml.safe_load(config_file)
kafka_config_data = config_data.get('kafka', {})
mysql_config_data = config_data.get('mysql', {})
processor = KafkaCMDBProcessor(kafka_config_data, mysql_config_data)
processor.start_processing()
except FileNotFoundError:
print("错误:找不到配置文件 'application.yml'。")
except Exception as e:
print(f"发生意外错误:{str(e)}")
application.yml配置如下:
kafka:
bootstrap_servers:
- '11.0.1.11:9092'
- '11.0.1.12:9092'
- '11.0.1.13:9092'
consumer_group_id: 'heartbeat_replace'
input_topic: 'ELK-heartbeat'
output_topic: 'ELK-system_heartbeat'
mysql:
host: '11.0.1.11'
port: 13306
user: 'root'
password: '123456'
db: 'zll_python_test'
处理后的数据如下:
{"@timestamp": "2024-01-20T14:03:37.102Z", "@metadata": {"beat": "heartbeat", "type": "_doc", "version": "8.11.1"}, "monitor": {"name": "My ICMP Monitor", "type": "icmp", "id": "my-icmp-monitor", "status": "up", "check_group": "b4caac6d-b79c-11ee-bf86-000c29a1adec-1", "duration": {"us": 131}, "ip": "11.0.1.14", "timespan": {"gte": "2024-01-20T14:03:37.102Z", "lt": "2024-01-20T14:03:53.102Z"}}, "url": {"domain": "11.0.1.14", "full": "icmp://11.0.1.14", "scheme": "icmp"}, "fields": {"nodename": "demo1"}, "summary": {"retry_group": "b4caac6d-b79c-11ee-bf86-000c29a1adec", "attempt": 1, "max_attempts": 1, "final_attempt": true, "up": 1, "down": 0, "status": "up"}, "state": {"id": "default-18d1d73022a-0", "up": 32661, "down": 0, "ends": null, "started_at": "2024-01-19T00:41:32.970059265+08:00", "duration_ms": "163324132", "status": "up", "checks": 32661, "flap_history": []}, "event": {"type": "heartbeat/summary", "dataset": "icmp"}, "icmp": {"requests": 1, "rtt": {"us": 83}}, "ecs": {"version": "8.0.0"}, "agent": {"name": "demo1", "type": "heartbeat", "version": "8.11.1", "ephemeral_id": "46819a45-3552-4e57-91f3-e58ffb12c72a", "id": "d56462aa-6f6b-4237-8bfc-a93c7bf933f4"}, "cmdb_data": {"app_name": "应用系统", "eip": "11.0.1.14", "module": "demo1", "ops_user": "运维仔", "ops_tel": "12345678901", "ops_dep": "运维部"}}
总的来说,metricbeat_heartbeat和heartbeat_replace代码基本一致,只是个别地方heartbeat换成metricbeat,return data.get(‘fields’, {}).get(‘ip’, ‘’)和return data.get(‘url’, {}).get(‘domain’, ‘’)的差别而已
五、heartbeat告警
heartbeat_alarm.py如下:
# heartbeat_alarm.py
import json
import logging
import mysql.connector
from collections import defaultdict
from datetime import datetime, timedelta
from kafka import KafkaConsumer
import yaml
# 配置日志记录器
logging.basicConfig(
level=logging.INFO,
filename='heartbeat_checker.log',
format='%(asctime)s [%(levelname)s] - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
class HeartbeatChecker:
def __init__(self, config_path='application.yml'):
# 初始化 HeartbeatChecker 对象
self.config_path = config_path
self.kafka_bootstrap_servers = None
self.kafka_group_id = None
self.kafka_topic = None
self.mysql_host = None
self.mysql_port = None
self.mysql_user = None
self.mysql_password = None
self.mysql_database = None
self.consecutive_down_threshold = None
self.consecutive_up_threshold = None
# 从 YAML 文件加载配置
self.load_config()
self.kafka_consumer = None
def load_config(self):
try:
# 从 YAML 文件加载配置
with open(self.config_path, 'r') as file:
config = yaml.safe_load(file)
# 提取 Kafka 配置
self.kafka_bootstrap_servers = config['kafka']['bootstrap_servers']
self.kafka_group_id = config['kafka']['group_id']
self.kafka_topic = config['kafka']['topic']
# 提取 MySQL 配置
self.mysql_host = config['mysql']['host']
self.mysql_port = config['mysql']['port']
self.mysql_user = config['mysql']['user']
self.mysql_password = config['mysql']['password']
self.mysql_database = config['mysql']['database']
# 提取连续 down 和连续 up 的阈值
self.consecutive_down_threshold = config['thresholds']['consecutive_down']
self.consecutive_up_threshold = config['thresholds']['consecutive_up']
except Exception as e:
# 处理配置加载错误
logger.error(f"加载配置时发生错误: {e}")
raise
def create_kafka_consumer(self):
try:
# 创建 Kafka Consumer 实例
self.kafka_consumer = KafkaConsumer(
self.kafka_topic,
bootstrap_servers=self.kafka_bootstrap_servers,
group_id=self.kafka_group_id,
auto_offset_reset='latest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
except Exception as e:
# 处理创建 Kafka Consumer 错误
logger.error(f"创建 Kafka Consumer 时发生错误: {e}")
raise
def check_heartbeat_alerts(self):
# 初始化 defaultdict 以存储每个 URL 的监测状态列表
url_groups = defaultdict(list)
mysql_connection = None
try:
# 创建 Kafka Consumer 并连接到 MySQL 数据库
self.create_kafka_consumer()
mysql_connection = mysql.connector.connect(
host=self.mysql_host,
port=self.mysql_port,
user=self.mysql_user,
password=self.mysql_password,
database=self.mysql_database
)
mysql_cursor = mysql_connection.cursor()
# 遍历 Kafka 消息
for message in self.kafka_consumer:
json_data = message.value
url = json_data.get('url', {}).get('full')
monitor_status = json_data.get('monitor', {}).get('status')
timestamp_str = json_data.get('@timestamp')
cmdb_data = json_data.get('cmdb_data')
if url and monitor_status and timestamp_str:
timestamp = self.convert_to_local_time(timestamp_str)
# 处理连续 up 的情况
if monitor_status == 'up' and self.url_exists_down_in_mysql(mysql_cursor, url):
url_groups[url].append(monitor_status)
mysql_cursor.fetchall()
if len(url_groups[url]) >= self.consecutive_up_threshold and all(
status == 'up' for status in url_groups[url][-self.consecutive_up_threshold:]):
self.delete_from_mysql(mysql_cursor, url, mysql_connection)
logger.info(f"URL: {url} - 由于连续 up 被从 MySQL 中移除")
else:
# 处理连续 down 的情况
if monitor_status == 'down' and not self.url_exists_down_in_mysql(mysql_cursor, url):
url_groups[url].append(monitor_status)
mysql_cursor.fetchall()
if len(url_groups[url]) >= self.consecutive_down_threshold and all(
status == 'down' for status in url_groups[url][-self.consecutive_down_threshold:]):
self.send_alert(url)
self.write_to_mysql(mysql_cursor, timestamp, url, monitor_status, mysql_connection, cmdb_data)
url_groups[url] = []
logger.info(f"URL: {url} - 被添加到 MySQL 中")
elif monitor_status == 'up' and self.url_exists_down_in_mysql(mysql_cursor, url):
url_groups[url].append(monitor_status)
mysql_cursor.fetchall()
if len(url_groups[url]) >= self.consecutive_up_threshold and all(
status == 'up' for status in url_groups[url][-self.consecutive_up_threshold:]):
self.delete_from_mysql(mysql_cursor, url, mysql_connection)
url_groups[url] = []
logger.info(f"URL: {url} - 由于连续 up 被从 MySQL 中移除")
except Exception as e:
# 处理运行时错误
logger.error(f"发生错误: {e}")
finally:
# 关闭 Kafka Consumer 和 MySQL 连接
if self.kafka_consumer:
self.kafka_consumer.close()
if mysql_connection:
mysql_connection.close()
def send_alert(self, url):
# 记录告警信息
logger.info(f"告警: URL {url} 连续 {self.consecutive_down_threshold} 次掉线")
@staticmethod
def write_to_mysql(cursor, timestamp, url, status, connection, cmdb_data=None):
try:
# 插入数据到 MySQL,包括 "cmdb_data" 字段
insert_query = """
INSERT INTO alert_list (timestamp, url, status, app_name, module, ops_user, ops_tel, ops_dep)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
timestamp,
url,
status,
cmdb_data.get('app_name', '') if cmdb_data else '',
cmdb_data.get('module', '') if cmdb_data else '',
cmdb_data.get('ops_user', '') if cmdb_data else '',
cmdb_data.get('ops_tel', '') if cmdb_data else '',
cmdb_data.get('ops_dep', '') if cmdb_data else ''
) if cmdb_data else (timestamp, url, status, '', '', '', '', ''))
connection.commit()
logging.info(f"Inserted into MySQL: URL {url}, Status {status}, cmdb_data {cmdb_data}")
except Exception as e:
# 处理写入 MySQL 错误
logger.error(f"Error writing to MySQL: {e}")
@staticmethod
def delete_from_mysql(cursor, url, connection):
try:
# 从 MySQL 删除数据
delete_query = "DELETE FROM alert_list WHERE url = %s AND status = 'down'"
cursor.execute(delete_query, (url,))
connection.commit()
logging.info(f"从 MySQL 中删除: URL {url}")
except Exception as e:
# 处理从 MySQL 删除错误
logger.error(f"从 MySQL 中删除时发生错误: {e}")
@staticmethod
def url_exists_down_in_mysql(cursor, url):
try:
# 检查 URL 是否存在于 MySQL 中
query = "SELECT * FROM alert_list WHERE url = %s AND status = 'down'"
cursor.execute(query, (url,))
return bool(cursor.fetchone())
except Exception as e:
# 处理检查 URL 存在性错误
logger.error(f"检查 URL 是否存在于 MySQL 中时发生错误: {e}")
return False
@staticmethod
def convert_to_local_time(timestamp_str):
# 将 UTC 时间转换为本地时间
timestamp_utc = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%fZ")
timestamp_local = timestamp_utc + timedelta(hours=8)
return timestamp_local.strftime("%Y-%m-%d %H:%M:%S")
if __name__ == "__main__":
try:
# 运行主程序
heartbeat_checker = HeartbeatChecker()
heartbeat_checker.check_heartbeat_alerts()
except KeyboardInterrupt:
print("退出...")
appllication.yml如下:
# application.yml
kafka:
bootstrap_servers:
- '11.0.1.11:9092'
- '11.0.1.12:9092'
- '11.0.1.13:9092'
group_id: 'python_alert'
topic: 'ELK-system_heartbeat'
mysql:
host: '11.0.1.11'
port: 13306
user: 'root'
password: '123456'
database: 'zll_python_test'
thresholds:
consecutive_down: 1
consecutive_up: 1
其中consecutive_down表示连续down几次触发告警,consecutive_up表示连续up几次告警恢复。
六、metricbeat告警
metricbeat可以配置的告警比较多,比如CPU、内存、文件系统等,Python代码如下:
import logging
from logging.handlers import RotatingFileHandler
from kafka import KafkaConsumer, KafkaProducer
import yaml
import json
class KafkaAlertProcessor:
def __init__(self, config_path):
# 配置记录日志到控制台和文件
self.configure_logging()
with open(config_path, 'r', encoding='utf-8') as config_file:
config = yaml.safe_load(config_file)
self.kafka_brokers = config['kafka']['brokers']
self.input_topic = config['kafka']['input_topic']
self.output_topic = config['kafka']['output_topic']
self.group_id = config['kafka']['group_id']
self.cpu_alert_threshold = config['alert_thresholds']['cpu']
self.memory_alert_threshold = config['alert_thresholds']['memory']
self.filesystem_alert_threshold = config['alert_thresholds']['filesystem']
self.common_template = config['alert_templates']['common']
self.cpu_alert_template = config['alert_templates']['cpu']
self.memory_alert_template = config['alert_templates']['memory']
self.filesystem_alert_template = config['alert_templates']['filesystem']
self.consumer = None
self.producer = None
@staticmethod
def configure_logging():
# 配置日志记录到控制台和文件
logger = logging.getLogger('')
logger.setLevel(logging.INFO)
# 配置控制台输出
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 配置文件输出,按文件大小进行轮转,最多保存10个日志文件,每个文件最大1M
file_handler = RotatingFileHandler('metricbeat_alarm.log', maxBytes=1000000, backupCount=10,
delay=False)
file_handler.setLevel(logging.INFO)
# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# 将处理程序添加到日志记录器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
def initialize_consumer(self):
self.consumer = KafkaConsumer(
self.input_topic,
group_id=self.group_id,
bootstrap_servers=','.join(self.kafka_brokers),
auto_offset_reset='latest',
enable_auto_commit=False,
)
def initialize_producer(self):
self.producer = KafkaProducer(
bootstrap_servers=','.join(self.kafka_brokers),
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8')
)
def process_alert(self, data):
cmdb_data = data.get("cmdb_data", {})
common_message = self.common_template.format(
cmdb_data_eip=cmdb_data.get("eip", ""),
cmdb_data_app_name=cmdb_data.get("app_name", ""),
cmdb_data_module=cmdb_data.get("module", ""),
cmdb_data_ops_user=cmdb_data.get("ops_user", ""),
cmdb_data_ops_tel=cmdb_data.get("ops_tel", ""),
cmdb_data_ops_dep=cmdb_data.get("ops_dep", "")
)
# 检查 CPU 使用率
if "system" in data and "cpu" in data["system"] and "total" in data["system"]["cpu"] and "pct" in \
data["system"]["cpu"]["total"]:
cpu_usage = data["system"]["cpu"]["total"]["pct"] / data["system"]["cpu"]["cores"]
if cpu_usage > self.cpu_alert_threshold:
cpu_alert_message = self.cpu_alert_template.format(
cpu_usage=data["system"]["cpu"]["total"]["pct"] / data["system"]["cpu"]["cores"]
)
alert_message = cpu_alert_message + common_message
self.send_alert("CPU 告警", alert_message)
# 检查内存使用率
if "system" in data and "memory" in data["system"] and "actual" in data["system"]["memory"] and "used" in \
data["system"]["memory"]['actual'] and 'pct' in data["system"]["memory"]['actual']['used']:
memory_usage = data["system"]["memory"]["actual"]["used"]["pct"]
if memory_usage > self.memory_alert_threshold:
memory_alert_message = self.memory_alert_template.format(
memory_actual_used_pct=memory_usage
)
alert_message = memory_alert_message + common_message
self.send_alert("内存告警", alert_message)
# 检查文件系统使用率
if "system" in data and "filesystem" in data["system"] and "used" in data["system"]["filesystem"] and "pct" in \
data["system"]["filesystem"]["used"]:
filesystem_usage = data["system"]["filesystem"]["used"]["pct"]
if filesystem_usage > self.filesystem_alert_threshold:
fs_alert_message = self.filesystem_alert_template.format(
filesystem_used_pct=filesystem_usage,
filesystem_mount_point=data["system"]["filesystem"].get("mount_point", "Unknown")
)
alert_message = fs_alert_message + common_message
self.send_alert("文件系统告警", alert_message)
def send_alert(self, alert_type, alert_message):
formatted_message = f"{alert_type} - {alert_message}"
logging.warning(formatted_message)
self.producer.send(self.output_topic,
value={"alert_type": alert_type, "alert_message": formatted_message})
def run(self):
self.initialize_consumer()
self.initialize_producer()
try:
for msg in self.consumer:
try:
data = json.loads(msg.value.decode('utf-8'))
self.process_alert(data)
except json.JSONDecodeError as e:
logging.error(f"解码 JSON 错误: {e}")
except KeyboardInterrupt:
pass
finally:
if self.consumer:
self.consumer.close()
if __name__ == "__main__":
kafka_alert_processor = KafkaAlertProcessor(config_path="application.yml")
kafka_alert_processor.run()
application.yml文件如下:
kafka:
brokers:
- "11.0.1.11:9092"
- "11.0.1.12:9092"
- "11.0.1.13:9092"
input_topic: "ELK-system_metricbeat"
output_topic: "ELK-alarm"
group_id: "ELK-alarm"
alert_thresholds:
cpu: 0.01
memory: 0.1
filesystem: 0.1
alert_templates:
common: "IP:{cmdb_data_eip}\n系统名称:{cmdb_data_app_name}\n模块:{cmdb_data_module}\n运维责任人:{cmdb_data_ops_user}\n电话:{cmdb_data_ops_tel}\n责任部门:{cmdb_data_ops_dep}\n"
cpu: "【CPU使用率告警】\n告警信息:CPU 使用率 超过阈值。当前平均值:{cpu_usage}.\n"
memory: "【内存使用率告警】\n告警信息:内存 使用率 超过阈值。当前值:{memory_actual_used_pct}.\n"
filesystem: "【文件系统使用率告警】\n告警信息:文件系统 使用率 超过阈值。当前值:{filesystem_used_pct}. 挂载点:{filesystem_mount_point}.\n"
产生的告警信息如下:
【内存使用率告警】
告警信息:内存 使用率 超过阈值。当前值:0.1502.
IP:11.0.1.14
系统名称:应用系统
模块:demo1
运维责任人:运维仔
电话:12345678901
责任部门:运维部
【文件系统使用率告警】
告警信息:文件系统 使用率 超过阈值。当前值:0.1178. 挂载点:/.
IP:11.0.1.14
系统名称:应用系统
模块:demo1
运维责任人:运维仔
电话:12345678901
责任部门:运维部
【CPU使用率告警】
告警信息:CPU 使用率 超过阈值。当前平均值:0.002.
IP:11.0.1.14
系统名称:应用系统
模块:demo1
运维责任人:运维仔
电话:12345678901
责任部门:运维部
我目前是将告警信息输出到kafka、控制台和日志的,各位看官可以根据自己的需要,将信息写入到接口、redis或者数据库中。
七、告警恢复
正在研究中