架构:
logstash配置:
vim /etc/logstash/conf.d/daemonset-filebeat.conf
input {
kafka {
bootstrap_servers => "10.0.7.53:9092,10.0.7.54:9092,10.0.7.55:9092"
topics => ["daemonset-pod-console-log"]
}
}
filter {
if [fields][log_topic] == "daemonset-pod-console-log" {
mutate {
remove_field => ["@version","agent","ecs"]
}
}
}
output {
if [fields][log_topic] == "daemonset-pod-console-log" {
elasticsearch {
hosts => ["10.0.7.46:9200","10.0.7.47:9200","10.0.7.48:9200"]
index => "daemonset-pod-console-log-%{+YYYY.MM.dd}"
}
}
if 'error' in [message] or 'Error' in [message] or 'ERROR' in [message] or 'failed' in [message] or 'fail' in [message] or 'Fail' in [message] or 'Failed' in [message] or '异常' in [message] or 'exception' in [message] or 'Undefined index' in [message] or 'Invalid argument' in [message] or 'not found' in [message] {
# stdout { codec => rubydebug }
exec {
command => "sh /root/b.sh "%{message}""
}
}
}
调试:
/usr/share/logstash/bin/logstash -f daemonset-filebeat.conf
重启:
systemctl restart logstash
告警脚本:
#!/bin/bash
webhook="你的企业wx的webhook地址"
message=`echo $*`
error_message=`echo "${message}" |awk -F'message:' '{print $2}' | awk -F'"' '{print $1}'`
namespace=`echo "${message}" | awk -F'pods/' '{print $2}' |awk -F'_' '{print $1}'`
pod_name=`echo "${message}" |awk -F'pods/' '{print $2}' |awk -F'_' '{print $2}'`
curl ${webhook} -H "Content-Type: application/json" -d '{
"msgtype": "markdown",
"markdown": {
"content": "应用启动故障报警
>pod_name: '${pod_name}'
>namespace: '${namespace}'
>报错日志: '"${error_message}"'"
}
}'
模拟故障:
[root@k8s-node1 filebeat-kafka]# pwd
/var/log/pods/kube-system_filebeat-kafka-kqnxn_86f08f9d-c56d-4b5b-800d-78e7574f4fcd/filebeat-kafka
[root@k8s-node1 filebeat-kafka]# vim a.log
Undefined index afj jlj wqerlj ljaflafkj alfj afl af
Invalid argument lwqkjreljwr sja falkfj alfjka f
not found; jwle;qjekrlqwjr lqwjr qr
exception: 自定义异常
Fatl ERROR: Uncaugh Exxor: AppProcessAchievementAchievementProcess class not esist. in /var/www/EasySwooleEvent.php:248
查看报警: