ELK日志
ELK
介绍
日志处理
- 日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及错误发生的原因, 经常分析日志可以了解服务器的负荷, 性能安全性, 从而及时采取措施纠正错误
理想的日志系统特点
-
收集
- 能够采集多种来源的日志数据
-
传输
- 能够稳定的把日志数据传输到中央系统
-
存储
- 如何存储日志数据
-
分析
- 可以支持 UI 分析
-
警告
- 能够提供错误报告, 监控机制(ElastAlert)
ELK
-
介绍
- ELK提供了一整套解决方案, 都是开源软件, 之间互相配合、 完美衔接, 高效的满足了很多场合的应用,是目前主流的日志系统。由ElasticSearch、Logstash和Kibana三个开源工具组成,后新增了FileBeat,亦称ELFK
-
组成
-
ElasticSearch
- 一个基于Lucene的开源分布式搜索服务器。基于RESTful web接口,用Java开发
- 分布式, 零配置, 自动发现, 索引自动分片, 索引副本机制, restful风格接口, 多数据源, 自动搜索负载
- 设计用于云计算中, 能够达到实时搜索, 稳定, 可靠, 快速, 安装使用方便
-
Logstash
- 对日志进行收集、过滤、分析,开源免费
- 一般为c/s架构, client端安装在需要收集日志的主机上, server端负责将收到的各节点日志进行过滤、修改等操作再一并发往elasticsearch上去
-
Kibana
- 基于浏览器页面的Elasticsearch前端展示工具,可以汇总、分析和搜索重要数据日志。开源免费
-
FileBeat
- 轻量级的日志收集处理工具(Agent),占用资源少, 适合于在各服务器上搜集日志后传给Logstash,官方推荐
-
-
工作原理
- 应用程序(AppServer)→FileBeat→Logstash→ElasticSearch→Kibana→浏览器(Browser)
- FileBeat收集应用产生的日志,发给Logstash进行处理,处理后存放到ElasticSearch集群中,Kibana从ES集群中查询数据生成图表返回给Browser(原始模式由Logstash收集并处理)
-
文档
-
elastic中文社区
- https://elasticsearch.cn/
-
Elasticsearch
- https://www.elastic.co/guide/en/elasticsearch/reference/index.html
-
Logstash
- https://www.elastic.co/guide/en/logstash/index.html
-
Kibana
- https://www.elastic.co/guide/en/kibana/index.html
-
Filebeat
- https://www.elastic.co/guide/en/beats/filebeat/index.html
-
部署准备
服务器
- 至少2台,配置至少2U3G
- 1.1.1.11 es1 Elasticsearch/kibana/head
- 1.1.1.12 es2 Elasticsearch/logstash
角色划分
- 2台机器全部安装jdk1.8, 因为elasticsearch是java开发的
- 2台全部安装elasticsearch (后续都简称为es)
- master节点(es1)上需要安装kibana以及head插件
- data(es2)上安装logstash
ELK版本信息
- Elasticsearch-6.4.1
- logstash-6.4.1
- kibana-6.4.1
- filebeat-6.4.1
- 所有软件版本保持一致
其他
- 所有机器做好互相解析,时间同步, 关闭防火墙, selinux
Elasticsearch
简介
-
可扩展性
- 通过索引分片机制实现,index为索引,每个index下面分为n个shard,真正的数据以doc(文档数据,类似于Gruntfile.js中格式)的形式存储在每个shard中
-
高可用
- 通过shard冗余备份、跨可用区域部署以及数据快照等方式实现,并能够应对集群节点故障和数据损坏
-
节点(Node)
- 一个运行中的Elasticsearch实例称为一个节点
- 集群是由一个或者多个拥有相同cluster.name配置的节点组成,它们共同承担数据和负载的压力
- 当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据
-
主节点(Master Node)
- 当一个节点被选举成为主节点时, 它将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等
- 主节点并不需要涉及到文档级别的变更和搜索等操作,所以当集群只拥有一个主节点的情况下,即使流量增加它也不会成为瓶颈
-
请求
- 用户可以将请求发送到集群中的任何节点 ,包括主节点
- 每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点
- 无论请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回給客户端
-
索引(index)
- 为了将数据添加到Elasticsearch,我们需要索引(index) —— 一个存储关联数据的地方
- 实际上,索引只是一个用来指向一个或多个分片(shards)的"逻辑命名空间(logical namespace)"
-
分片(shard)
- 最小级别的工作单元 ,保存索引中所有数据的一部分。分为主分片(primary shard)或副本分片(replica shard)
- 数据存储在分片中,并且在分片中被索引,应用程序直接与索引通信,索引中的每个文档属于一个单独的主分片
- 分片分配到集群中的节点上,当集群扩容或缩小,Elasticsearch会自动在节点间迁移分片,以使集群保持平衡
- 分片的最大容量取决于使用状况:硬件存储大小、文档大小和复杂度、如何索引和查询文档、期望的响应时间
- 复制分片只是主分片的一个副本,它可以防止硬件故障导致的数据丢失,同时可以提供读请求
- 当索引创建完成的时候,主分片的数量就固定了,但是复制分片的数量可以随时调整
-
数据分布示例
-
例如一个索引中分配了3个主分片(默认是5个)和一个副本
-
一个节点
-
3个主分片分布在主节点
-
集群健康情况
- “status”: “yellow”,
“active_primary_shards”: 3,
“active_shards”: 3,
“unassigned_shards”: 3,
- “status”: “yellow”,
-
在同一个节点上保存相同的数据副本没有必要,如果这个节点故障了,所有的副本也会丢失
-
-
两个节点
-
3个主分片分布在主节点,3个复制分片分布在节点2
-
集群健康情况
- “status”: “green”,
“active_primary_shards”: 3,
“active_shards”: 6,
- “status”: “green”,
-
-
三个节点
- 每个节点分布两个分片,意味着每个节点的硬件资源(CPU、RAM、I/O)被较少的分片共享,这样每个分片就会有更好的表现
-
分片本身就是一个完整成熟的搜索引擎,它可以使用单一节点的所有资源。使用这6个分片(3主+3复制)最多可以扩展到6个节点,每个节点上有一个分片,这样每个分片就可以100%使用这个节点的资源了
-
-
服务端口
-
9200
- 传输数据
-
9300
- 集群通信
-
集群部署
-
布署java环境(略)
- 所有节点
-
安装elasticsearch
(所有节点)-
创建用户
- elasticsearch不能以root用户运行,必须要创建一个普通用户运行
- useradd es
- echo 123|passwd --stdin es
-
解压
- tar xf elasticsearch-6.4.1.tar.gz -C /usr/local/
- ln -s /usr/local/elasticsearch-6.4.1 /usr/local/es
-
创建数据目录
- mkdir -p /data/es/{data,logs}
-
授权
- chown -R es.es /data/es/ /usr/local/es/
-
-
配置本地解析
(所有节点)- vim /etc/hosts
1.1.1.11 es1
1.1.1.12 es2
- vim /etc/hosts
-
配置es-master
1.1.1.11
cluster.name: es-cluster # 集群名称,各节点配成相同的集群名称
node.name: es1 # 节点名称,各节点配置不同。
node.master: true # 指示某个节点是否符合成为主节点的条件
node.data: true # 指示节点是否为数据节点。数据节点包含并管理索引的一部分
path.data: /data/es/data # 数据存储目录
path.logs: /data/es/logs # 日志存储目录
bootstrap.memory_lock: true # 内存锁定,是否禁用交换
network.host: 0.0.0.0 # 绑定节点IP
http.port: 9200 # rest api端口
discovery.zen.ping.unicast.hosts: [“es1”, “es2”] #集群节点IP或主机
http.cors.enabled: true #允许http跨域访问
http.cors.allow-origin: “*” #允许的源地址
-
配置es-data
1.1.1.12- 在master的基础上作如下修改
node.name: es2
node.master: false
- 在master的基础上作如下修改
-
设置JVM堆大小
(所有节点)-
sed -i ‘s/-Xms1g/-Xms4g/’ /es/config/jvm.options
-
sed -i ‘s/-Xmx1g/-Xmx4g/’ /usr/local/es/config/jvm.options
-
注意
- 确保堆内存最小值(Xms)与最大值(Xmx)的大小相同,防止程序在运行时改变堆内存大小
- 如果系统内存足够大,将堆内存最大和最小值设置为31G,因为有一个32G性能瓶颈问题。堆内存大小不要超过系统内存的50%
- 这一步本实验中不做(虚拟机内存小), 若公司服务器性能较高, 为了提高es性能需要设置
-
-
修改资源限制
- [root@es ~]# vim /etc/security/limits.conf
es soft nofile 65536
es hard nofile 131072
es soft nproc 4096
es hard nproc 4096
es soft memlock unlimited
es hard memlock unlimited
- [root@es ~]# vim /etc/security/limits.conf
[root@es ~]# echo “vm.max_map_count=262144” >> /etc/sysctl.conf
[root@es es]# sysctl -p
vm.max_map_count = 262144
-
启动服务
-
先启动master, 再启动data
-
su - es -c ‘/usr/local/es/bin/elasticsearch -d’
-
-c
- 切换到es用户执行命令
-
-d
- es以守护进程方式运行
-
-
-
验证
-
查看端口
- netstat -tlnp |egrep “:9200|:9300”
-
查看服务情况
- curl 1.1.1.11:9200 uuid有值
- curl 1.1.1.12:9200
-
查看集群
健康状况-
curl 1.1.1.11:9200/_cluster/health?pretty
-
curl 1.1.1.12:9200/_cluster/health?pretty
-
字段
-
status
-
green
- 所有的主分片和副本分片都正常运行
-
yellow
- 所有的主分片都正常运行,但不是所有的副本分片都正常运行
-
red
- 有主分片没能正常运行
-
-
number_of_nodes、number_of_data_nodes
- 节点数量
-
active_primary_shards
- 集群中的主分片数量。涵盖了所有索引的汇总值
-
active_shards
- 涵盖了所有索引的所有分片的汇总值,即包括副本分片
-
relocating_shards
- 当前正在从一个节点迁往其他节点的分片的数量。通常来说应该是0,不过在ES发现集群不太均衡时,该值会上涨
-
initializing_shards
- 刚刚创建的分片的个数。比如,当你刚创建第一个索引,分片都会短暂的处于initializing状态
-
unassigned_shards
- 已经在集群状态中存在的分片,但是实际在集群里又找不着。通常未分配分片的来源是未分配的副本
-
-
-
-
安装问题
-
1
-
报错
- [WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [es2] uncaught exception in thread [main]
org.elasticsearch.bootstrap.StartupException: java.lang.RuntimeException: can not run elasticsearch as root
- [WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [es2] uncaught exception in thread [main]
-
解决
- 使用普通用户运行
-
-
2
-
es soft nofile 65536 #第一列用户名
es hard nofile 65536
- ulimit -n
- 查看当前用户的进程最大可同时打开文件数
- ulimit -a
- 查看所有限制
- 3
- 报错
- memory locking requested for elasticsearch process but memory is not locked
- 请求锁内存失败,系统默认能让进程锁住的最大内存为64k
- 解决
- # vim /etc/security/limits.conf
es soft memlock unlimited
es hard memlock unlimited
- 4
- 报错
- max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
- es用户拥有的内存权限太小了,至少需要262114
- 解决
- echo vm.max_map_count=262144 >> /etc/sysctl.conf
- sysctl -p
- 从配置文件"/etc/sysctl.conf"加载内核参数设置
- 使用问题分析实例
一个10节点的ES集群,某天碰到问题了,集群健康状态看起来像是这样:
{
"cluster_name": "elasticsearch_zach",
"status": "red",
"timed_out": false,
"number_of_nodes": 8,
"number_of_data_nodes": 8,
"active_primary_shards": 90,
"active_shards": 180,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 20
}
我们集群是 red ,意味着我们缺数据(主分片 + 副本分片)了。
我们知道我们集群原先有 10 个节点,但是在这个健康状态里列出来的只有 8 个数据节点。
有两个数据节点不见了。我们看到有 20 个未分配分片。
这就是我们能收集到的全部信息。那些缺失分片的情况依然是个谜:
我们是缺了 20 个索引,每个索引里少 1 个主分片?
还是缺 1 个索引里的 20 个主分片?
还是 10 个索引里的各 1 主 1 副本分片?
具体是哪个索引?
要回答这个问题,我们需要使用 level 参数让 cluster-health 答出更多一点的信息:
GET _cluster/health?level=indices
{
"cluster_name": "elasticsearch_zach",
"status": "red",
"timed_out": false,
"number_of_nodes": 8,
"number_of_data_nodes": 8,
"active_primary_shards": 90,
"active_shards": 180,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 20
"indices": {
"v1": {
"status": "green",
"number_of_shards": 10,
"number_of_replicas": 1,
"active_primary_shards": 10,
"active_shards": 20,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0
},
"v2": {
"status": "red",
"number_of_shards": 10,
"number_of_replicas": 1,
"active_primary_shards": 0,
"active_shards": 0,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 20
},
"v3": {
"status": "green",
"number_of_shards": 10,
"number_of_replicas": 1,
"active_primary_shards": 10,
"active_shards": 20,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0
},
....
}
}
我们可以看到 v2 索引就是让集群变 red 的那个索引。
由此明确了,20 个缺失分片全部来自这个索引。
我们还可以看到这个索引曾经有 10 个主分片和一个副本,而现在这 20 个分片全不见了。
可以推测,这 20 个索引就是位于从我们集群里不见了的那两个节点上。
安装head插件
1.1.1.11
-
下载
-
head插件
- https://github.com/mobz/elasticsearch-head/archive/refs/heads/master.zip
-
node
- https://mirrors.tuna.tsinghua.edu.cn/nodejs-release/v14.19.3/node-v14.19.3-linux-s390x.tar.gz
-
-
安装
export NODE_HOME=/usr/local/node
export NODE_PATH=
N
O
D
E
H
O
M
E
/
l
i
b
/
n
o
d
e
m
o
d
u
l
e
s
e
x
p
o
r
t
P
A
T
H
=
NODE_HOME/lib/node_modules export PATH=
NODEHOME/lib/nodemodulesexportPATH=NODE_HOME/bin:$PATH
- . /etc/profile.d/node.sh
- node -v
- head
- unzip elasticsearch-head-master.zip
- mv elasticsearch-head-master /usr/local
- 注意不能安装在es的插件目录plugins中
- cd /usr/local/elasticsearch-head-master
- npm install -g cnpm --registry=https://registry.npmmirror.com/
- 安装cnmp工具
- cnpm install -g grunt-cli
- grunt -version
- cnpm install
-
服务端口
- 9100
-
配置
-
启动
-
grunt server &
- 需在elasticsearch-head-master目录中启动
-
-
使用
-
http://1.1.1.11:9100
- 浏览器访问9100端口
-
连接集群
- 连接URL中localhost改为集群节点IP
-
索引操作
-
创建索引
-
默认为5分片1副本,最新版本貌似默认1分片,可自定义创建
-
默认
-
curl -X PUT “1.1.1.11:9200/twitter?pretty”
- 创建索引twitter
-
-
自定义
- 创建一个索引facebook,4分片2副本
- curl -X PUT “1.1.1.11:9200/facebook?pretty” -H ‘Content-Type: application/json’ -d’
{
“settings” : {
“index” : {
“number_of_shards” : 4,
“number_of_replicas” : 2
}
}
}’
-
-
删除索引
-
curl -X DELETE “1.1.1.11:9200/twitter?pretty”
-
网页操作
- 动作->删除
-
logstash
介绍
-
简介
- logstash是一个接收、处理、转发日志的工具。支持系统日志、webserver日志等所有可以抛出来的日志类型
- Logstash在ELK中担任搬运工的角色,它为数据存储、报表查询和日志解析创建了一个功能强大的管道链
- Logstash提供了多种多样的 input,filters,codecs和output组件,让使用者轻松实现强大的功能
-
文档
-
官方文档
- https://www.elastic.co/cn/products/logstash
-
Logstash 参考
- https://www.elastic.co/guide/en/logstash/index.html
-
标准日志grok正则
- https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
-
常用filter插件
- https://blog.csdn.net/wfs1994/article/details/80862952
-
-
服务端口
- 9600
工作阶段
-
input
- 数据输入端,可接收来自任何地方的源数据
-
filter
- 数据中转层, 主要进行格式处理, 数据类型转换、数据过滤、字段添加, 修改等
-
output
- 数据输出端,负责将数据输出到指定位置,兼容大多数应用
安装配置
1.1.1.12
-
安装
- tar xf logstash-6.4.1.tar.gz -C /usr/local
- ln -s /usr/local/logstash-6.4.1 /usr/local/logstash
-
配置
http.host: “0.0.0.0”
- ln -s /usr/local/logstash/bin/logstash /usr/bin/logstash
- 软链接
- cd /usr/local/logstash/config
- 进入配置目录
-
命令
-
logstash -f syslog-test.conf -t
- 检测文件有没有错误
-
logstash -f syslog-test.conf
- 启动服务,端口9600
-
选项
-
-f file
- 指定配置文件
-
-t
- 检测配置并退出
-
-
[input
插件](https://www.elastic.co/guide/en/logstash/6.4/input-plugins.html)
-
stdin
-
从标准输入读取
- input { stdin {} }
-
-
file
-
从本地文件中读取
-
配置项
-
path
- 输入文件的路径,可多个,类型为数组。必选
- path => [“/var/log/httpd/access_log”,“/var/log/message*”]
-
start_position
-
起始位置,beginning或end,类型为字符串
-
beginning
- 从文件开头位置开始读取,文件读完不会终止
-
end(默认)
- 从文件结尾位置开始读取,即读取新写入的内容
-
-
-
例
- input {
file {
path => [“/var/log/httpd/access_log”]
start_position => “beginning”
}
}
- input {
-
-
syslog
. @1.1.1.12
- systemctl restart rsyslog
-
beat
-
接收来自Filebeat的事件
-
配置项
-
port
- 必选
-
-
例
- input {
beats {
port => 5044
codec => json #直接将保存在message中的json字串解析出来
}
}
- input {
-
-
redis
- 从redis-server list中获取
-
kafka
- 从kafka消息队列中获取
filter插件
-
grok
-
介绍
- 正则捕获,通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构
- 目前logstash中解析非结构化日志数据最好的方式
-
配置项
-
match
- 定义查找位置和模式的映射,类型为哈希
-
-
语法规则
- match => {“message” => “%{内置模式名称:自定义字段名称}”}
- pattern_definitions => { “自定义模式名称” => “正则表达式” }
match => {“message” => “%{自定义模式名称:自定义字段名称}”} - match => {“message” => “(?<自定义字段名称>正则表达式)”}
- patterns_dir => [“自定义结构化模式文件目录”]
match => {“message” => “%{自定义结构化模式名称}”}
-
内置模式
- 安装目录的vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns目录中
- 读取目录中所有文件
-
例(httpd日志)
-
input { stdin {} }
filter {
grok {
remove_field => [“message”]
match => {“message” => “%{COMBINEDAPACHELOG}”}
}
}
output { stdout {} }
- 自定义
- # vim grok2.conf
input { stdin {} }
filter {
grok {
remove_field => [“message”]
patterns_dir => [“/logstash/patterns/”]
match => {“message” => “%{APACHE_LOG}”}
}
}
output { stdout {} }
- # vim /logstash/patterns/httpd
IPADDR (\d+.){3}\d+
ALL .*
UPPER [A-Z]+
NUMBER \d+
APACHE_LOG %{IPADDR:ip} .* [%{ALL:time}] “(%{UPPER:method} %{ALL:url} HTTP/%{ALL:http_version}|-)” %{NUMBER:http_code} %{NUMBER:bytes} “(%{ALL:http_referer}|-)” “(%{ALL:http_agent}|-)”
-
mutate
-
数据修改插件,提供了丰富的基础类型数据处理能力,包括重命名、删除、替换、修改日志事件中的字段
-
convert
- 用于数据类型的转换, 可以设置的转换类型包括: “integer”, “float” 和"string"
-
gsub
- 通过正则表达式替换字段中匹配到的值,只对字符串字段有效
-
lowercase
- 将大写字母全部转换成小写. 这个需求非常常见, 因为ES默认是按照小写字母进行检索的, 建议对常用检索字段启用lowercase配置
-
filter {
grok {
match => {“message” => “%{COMBINEDAPACHELOG}”}
}
mutate { #执行顺序从上到下
rename => [“clientip”,“ip”] #将client字段重命名为ip
update #更新某个字段的内容, 如果字段不存在, 不会新建
replace #替代某个字段的内容, 如果字段不存在, 会新建
convert => [“response”,“integer”,“bytes”,“integer”] #将response和bytes数据类型转换为整型
gsub => [“clientip”,“.”,““] #将clientip字段的值中”.“替换成””
uppercase #将小写字母全部转换成大写
lowercase => [“verb”] #将verb字段的值中大写字母转换为小写
strip #去除字段前后的空格
remove_field => [“message”] #删除message字段
split => [“clientip”,“.”] #将ip地址以"."为分隔符分割
join #合并数组中的元素
merge #合并数组或哈希字段
}
}
-
-
geoip
- geoip是常见的免费的IP地址归类查询库,可以根据IP地址查询其地域信息,包括国别、省市、经纬度等等,此插件对于可视化地图和区域统计非常有用
- input { stdin {} }
filter {
grok { match => {“message” => “%{IPV4:ip}”} }
geoip {
source => “ip”
fields => [“ip”,“country_name”,“region_name”,“city_name”] #自定义geoip中的显示字段
}
}
output { stdout {} }
-
date
-
时间戳
-
@timestamp
- 默认字段。表示系统时间,elasticsearch用此字段标注日志的产生时间
-
timestamp
- 自定义或内置模式中定义的时间字段,日志的实际产生时间
-
问题
- 若是实时监控,这两个字段时间几乎一致,但若是导入历史日志记录,则这两个时间完全不同,会导致kibana中展示的日志产生时间全部都是导入时的系统时间
-
-
data插件作用
- 将自定义的时间字段的值,转存到(覆盖)@timestamp字段
-
配置
- filter {
grok {
match => { “message” => “%{HTTPDATE:time}”}
}
date {
match => [“time”,“dd/MMM/yyyy:HH:mm:ss Z”]
}
}
- filter {
-
时区
- es内部时间类型字段以及日志存储都是采用UTC(0时区)时间
- 在kibana平台上系统会自动读取浏览器的当前时区,并将时间类型字段转换为当前时区的时间
-
-
clone
- 复制事件, 可能添加或者删除字段
-
drop
- 完全丢弃事件, 如debug事件
-
通用配置项
-
可用于所有的filter插件
-
remove_field
- 删除字段,类型为数组
- remove_field => [“message”,“ident”]
-
remove_tag、periodic_flush、id、enable_metric、add_tag、add_field
-
output插件
-
stdout
- 输出到标准输出
-
elasticsearch
- 发送事件数据到Elasticsearch, 便于查询, 分析, 绘图
- output {
elasticsearch {
hosts => [“1.1.1.11:9200”] #es集群套接字
index => [“http_access_log-%{+YYYY.MM.dd}”] #自定义索引名-系统时间
}
}
-
file
- 将事件数据写入到磁盘文件上
-
mongodb
- 将事件数据发送至高性能NoSQL mongodb, 便于永久存储, 查询, 分析, 大数据分片
-
redis
- 将数据发送至redis-server, 常用于中间层暂时缓存
- output {
redis {
host => [“127.0.0.1”]
port => “6379”
data_type => “list”
key => “logstash:nginx_log_test”
}
}
-
graphite
- 发送事件数据到graphitehttp://graphite.wikidot.com/
-
statsd
- 发送事件数据到statsd
-
kafka
- 将数据发送至kafka
logstash收集nginx日志
方案一
-
nginx
- #vim /etc/nginx/nginx.conf
log_format json ‘{’
‘“client”:“KaTeX parse error: Double superscript at position 37: … '̲"time":"time_local”,’
‘“verb”:“KaTeX parse error: Double superscript at position 40: … '̲"url":"request_uri”,’
‘“status”:"KaTeX parse error: Double superscript at position 32: … '̲"size":body_bytes_sent,’
‘“referer”: “KaTeX parse error: Double superscript at position 38: … '̲"agent": "http_user_agent”’
‘}’;
access_log /var/log/nginx/access_json.log json;
- #vim /etc/nginx/nginx.conf
-
logstash
input {
file {
path => “/var/log/nginx/access_json.log”
codec => “json” #输入预定义好的JSON数据, 可以省略filter/grok 配置, 从而减轻logstash的负载
start_position => “beginning”
}
}
output {
elasticsearch {
hosts => [“1.1.1.11:9200”]
index => “nginx-log-%{+YYYY.MM.dd}”
}
}
方案二
- Logstash自带apache标准日志的grok正则,nginx只是最后多了$http_x_forwarded_for
- input {
file {
path => “/var/log/nginx/access.log”
start_position => beginning
}
}
filter {
grok {
match => { “message” => “%{COMBINEDAPACHELOG} %{QS:x_forwarded_for}”}
}
date {
match => [ “timestamp” , “dd/MMM/YYYY:HH:mm:ss Z” ]
}
geoip {
source => “clientip”
}
}
output {
elasticsearch {
hosts => “1.1.1.11:9200”
}
}
kibana
安装
1.1.1.11
- tar xf kibana-6.4.1-linux-x86_64.tar.gz -C /usr/local
- ln -s /usr/local/kibana-6.4.1-linux-x86_64 /usr/local/kibana
配置
server.port: 5601
server.host: “0.0.0.0”
elasticsearch.url: “http://1.1.1.11:9200” #es集群任意节点的套接字
logging.dest: /usr/local/kibana/logs
服务端口
- 5601
启动
- /usr/local/kibana/bin/kibana &
使用
-
浏览器访问:1.1.1.11:5601
-
添加索引
- Management->Index Patterns->Create Index Patterns->Index pattern搜索到唯一结果->Next step->配置Time Filter name选择timestamp->Create index pattern
-
索引管理
- Management->Index Management->勾选索引>Manage n indices->管理操作
-
查看索引
- Discover->选择索引->勾选索引>选择事件戳
filebeat
介绍
- beats是ELK体系中新增的一个轻量的日志采集器,前面日志采集使用的是logstash,但是logstash占用的资源比较大,官方推荐使用beats来作为日志采集工具。且beats可扩展,支持自定义构建
- Beats可以直接(或者通过Logstash)将数据发送到Elasticsearch,在es可以进一步处理和增强数据,然后在Kibana中将其可视化
- filebeat是beats家族中最常用的一种。其他成员:auditbeat,metricbeat,packetbeat,heartbeat,winlogbeat
安装
- tar xf filebeat-6.4.1-linux-x86_64.tar.gz
- mv filebeat-6.4.1-linux-x86_64 /usr/local/filebeat
基本配置
filebeat.inputs:
- type: log
enabled: true #注释掉或者改成true,默认false
paths:
- /var/log/httpd/access_log #指定需要收集的日志文件的路径
启动
-
cd /usr/local/filebeat
-
./filebeat -c filebeat.yml &
- -c指定配置文件
输出类型配置
-
标准输出
output.console:
enable: true -
发送到ES(默认)
output.elasticsearch:
hosts: [“1.1.1.11:9200”] #es集群节点的套接字 -
发送到kafka
output.kafka:
hosts: [“1.1.10.1:9092,1.1.10.2:9092,1.1.10.3:9092”]
topic: ‘nginx’
output.logstash:
hosts: [“1.1.1.12:5044”] #logstash监听套接字
filebeat&logstash
-
采集处理
单个日志-
filebeat
- filebeat.inputs:
-
-
type: log
paths:- /var/log/httpd/access_log
output.logstash:
hosts: [“1.1.1.12:5044”]
- logstash
- input {
beats {port => 5044}
}
-
采集处理
多个日志-
filebeat
- filebeat.inputs:
-
-
type: log
paths:- /var/log/httpd/access_log
fields:
filetype: web # 用于区别不同的日志
fields_under_root: true # 将自定义字段置于顶层
- /var/log/httpd/access_log
-
type: log
paths:- /var/log/secure
fields:
filetype: sys
fields_under_root: true
- /var/log/secure
output.logstash:
hosts: [“1.1.1.12:5044”]
- logstash
- input {
beats {port => 5044}
}
filter {
if [filetype] == “web” {
grok {
match => {
“message” => “%{COMBINEDAPACHELOG}”
}
remove_field => [“message”,“beat”,“offset”,“tags”,“prospector”]
}
}
}
output {
if [filetype] == “web” {
elasticsearch {
hosts => [“1.1.1.11:9200”] #es集群节点的套接字
index => “http-%{+YYYY.MM.dd}” #索引名自定义
}
}
else if [filetype] == “sys” {
elasticsearch {
hosts => [“1.1.1.11:9200”]
index => “syslog-%{+YYYY.MM.dd}”
}
}
}
es集群中的角色
Master Node
-
作用
- 主要负责集群中索引的创建、删除以及数据的Rebalance等操作
- 当Master节点失联或者挂掉的时候,ES集群会自动从其他Master节点选举出一个Leader
-
建议
- 稳定的主节点对集群的健康是非常重要的, 为了确保集群的稳定建议分离主节点和数据节点
- 为了防止脑裂,设置参数discovery.zen.minimum_master_nodes=N/2+1,N为Master个数
- 集群中Master节点的个数为奇数个,如3个或者5个
-
设置
- 配置文件:node.master: true
Data Node
-
作用
- 主要是存储索引数据的节点,主要对文档进行增删改查操作,聚合操作等
-
建议
- 数据节点对cpu,内存,io要求较高, 在优化的时候需要监控数据节点的状态,当资源不够的时候,需要在集群中添加新的节点
- 和Master节点分开,避免因为Data Node节点出问题影响到Master节点
-
设置
- 配置文件:node.data: true
Client Node
- 当master和data都设置为false的时候,该节点只能处理路由请求,处理搜索,分发索引操作等,从本质上来说该客户节点表现为智能负载平衡器
- 独立的客户端节点在一个比较大的集群中是非常有用的,他协调主节点和数据节点,客户端节点加入集群可以得到集群的状态,根据集群的状态可以直接路由请求
生产环境中
集群部署建议
- 设置3台以上的节点作为master节点,这些节点只负责成为主节点,维护整个集群的状态
- 根据数据量设置一批data节点,这些节点只负责存储数据,后期提供建立索引和查询索引的服务
- 如果用户请求比较频繁,在集群中建议再设置一批client节点,这些节点只负责处理用户请求,实现请求转发,负载均衡等功能, 以减轻data节点的压力
ELK+kafka
架构
-
架构图
-
分层
-
数据采集层
- 业务服务器集群,用filebeat做日志采集,同时把采集的日志分别发送给两个logstash服务
-
数据处理层,数据缓冲层
- logstash服务把接受到的日志经过格式处理,转存到本地的kafka broker+zookeeper集群中
-
数据转发层
- 这个单独的Logstash节点会实时去kafka broker集群拉数据,转发至ES DataNode
-
数据持久化存储
- ES DataNode 会把收到的数据,写磁盘,建索引库
-
数据检索,数据展示
- ES Master + Kibana 主要 协调 ES集群,处理数据检索请求,数据展示
-
消息队列介绍
-
是什么
- Message queue是一种进程间或同一进程的不同线程间的通信方式, 软件的贮列用来处理一系列的输入,可以是来自用户或应用服务
- 是一种异步通信, 每个贮列中的纪录包含数据的详细信息(发生的时间,输入设备,输入参数等)
- 消息的发送者和接收者不需要同时与消息队列交互,消息会保存在队列中, 直到接收者取回它
- 消息队列主要解决应用耦合、异步处理、流量削锋等问题
- 当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等, 而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能
-
使用场景
-
应用耦合
- 多应用间通过消息队列对同一消息进行处理, 避免调用接口失败导致整个过程失败
-
异步处理
- 多应用对消息队列中同一消息进行处理, 应用间并发处理消息, 相比串行处理, 减少处理时间
-
限流削峰
- 广泛应用于秒杀或抢购活动中, 避免流量过大导致应用系统挂掉的情况
-
-
优点
-
解耦
- 在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的
- 消息系统在处理过程中间插入了一个隐含的、基于数据的接口层, 两边的处理过程都要实现这一接口
- 这允许你独立的扩展或修改两边的处理过程, 只要确保它们遵守同样的接口约束
-
冗余
- 有些情况下, 处理数据的过程会失败,除非数据被持久化, 否则将造成丢失
- 消息队列把数据进行持久化直到它们已经被完全处理, 通过这一方式规避了数据丢失风险
- 许多消息队列在把一个消息从队列中删除之前, 需要你的处理系统明确的指出该消息已经被处理完毕, 从而确保数据被安全的保存直到你使用完毕
-
扩展性
- 因为消息队列解耦了处理过程, 所以增大消息入队和处理的频率很容易, 只要另外增加处理过程即可,不需要改变代码、不需要调节参数
-
峰值处理
- 在访问量剧增的情况下, 应用仍然需要继续发挥作用, 但是这样的突发流量并不常见
- 如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费
- 使用消息队列能够使关键组件顶住突发的访问压力, 而不会因为突发的超负荷的请求而完全崩溃
-
可恢复性
- 系统的一部分组件失效时, 不会影响到整个系统
- 即使一个处理消息的进程挂掉, 加入队列中的消息仍然可以在系统恢复后被处理
-
顺序保证
- 在大多使用场景下, 数据处理的顺序都很重要
- 大部分消息队列本来就是排序的, 并且能保证数据会按照特定的顺序来处理
- 如Kafka能保证一个Partition内的消息的有序性
-
缓冲
- 在任何重要的系统中, 都会有需要不同的处理时间的元素
- 消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速
- 该缓冲有助于控制和优化数据流经过系统的速度
-
异步通信
- 很多时候, 用户不想也不需要立即处理消息
- 消息队列提供了异步处理机制, 允许用户把一个消息放入队列, 但并不立即处理它
- 想向队列中放入多少消息就放多少, 然后在需要的时候再去处理它们
-
-
模式
-
点对点模式
-
角色
- 消息队列
- 发送者 (生产者)
- 接收者(消费者)
-
特点
- 队列可以有多个接收者,但一个消息只能被其中之一接收
- 消息一旦被消费(接收者成功接收后要应答)就会被删除
- 发送者发送消息之后, 不管有没有接收者在运行, 都不影响发送者发送消息
- 当没有消费者可用时, 这个消息会被保存直到有 一个可用的消费者
-
-
发布/订阅模式
-
角色
- 角色主题(Topic)
- 发布者(Publisher)
- 订阅者(Subscriber)
-
特点
- 每个消息可以有多个订阅者,订阅者需要提前订阅该角色主题, 并保持在线运行
- 针对某个主题,必须创建订阅者之后,才能消费发布者发布的消息
- 当发布一个消息, 所有订阅这个topic的服务都能得到这个消息
-
-
-
常用开源消息队列
-
Kafka
- 常用于分布式架构,对性能要求高的可考虑Kafka
-
RocketMQ
- 思路来源于kafka, 改成了主从结构, 在事务性可靠性方面做了优化
-
RabbitMQ
-
kafka介绍
-
官网
- http://kafka.apache.org/documentation.html
-
概述
- 分布式消息队列。有高性能、持久化、多副本备份、横向扩展能力
- 一般在架构设计中起到解耦、削峰、异步处理的作用
- kafka对外使用topic的概念, 生产者往topic里写消息, 消费者从中读消息。消息保存根据topic归类
- 为了做到水平扩展, 一个topic实际是由多个partition组成的, 遇到瓶颈时, 可以通过增加partition的数量来进行横向扩容,单个parition内是保证消息有序
- 发送消息者称为Producer,消息接受者称为Consumer,集群的实例称为broker。它们都依赖于zookeeper来保证系统可用性,zookeeper集群保存一些meta信息
-
AMQP协议
- Advanced Message Queuing Protocol(高级消息队列协议),是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议
- AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台
- 前面说的几种比较流行的消息队列协议,要么支持AMQP协议,要么借鉴了AMQP协议的思想进行了开发、实现、设计
-
基本概念
-
消费者(Consumer)
- 从消息队列中请求消息的客户端应用程序
-
生产者(Producer)
- 向broker发布消息的应用程序
-
AMQP服务端(broker)
- 用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例
-
主题(Topic)
- 一个主题类似新闻中的体育、娱乐等分类概念,在实际工程中通常一个业务一个主题
-
分区(Partition)
- 一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO(First Input First Output,先入先出)的队列
- kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息
-
备份(Replication)
- 为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用
-
-
支持的客户端语言
- C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
-
服务端口
- 9092
zookeeper介绍
-
官网
- http://zookeeper.apache.org
-
概述
- Zookeeper是一种在分布式系统中被广泛用来作为分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群
-
分布式配置管理
- 将多个分布式服务器的应用配置文件,保存在zookeeper的某个目录节点中,所有相关的应用程序对这个目录进行监听。若配置信息发生改变,每个应用程序就会收到zookeeper的通知,然后从zookeeper获取新的配置信息应用到系统中
-
kafka
- kafka增加和减少服务器都会在Zookeeper节点上触发相应的事件
- kafka系统会捕获这些事件,进行新一轮的负载均衡,客户端也会捕获这些事件来进行新一轮的处理
- Kafka集群是把状态保存在Zookeeper中的,首先要部署Zookeeper集群
-
服务端口
-
2181
- 客户端连接Zookeeper服务器的端口
-
2888
- leader和follower之间的通信端口,服务启动后只有leader会监听这个端口
-
3888
- leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口
-
-
集群
- Zookeeper集群的工作是超过半数才能对外提供服务,成员数量一般为奇数台
- zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来
部署集群
zookeeper
-
准备
-
软件版本
- zookeeper-3.4.12
-
服务器
- 1.1.1.21
1.1.1.22
1.1.1.23
- 1.1.1.21
-
-
布署java环境
- 略
-
安装zookeeper
- tar xf zookeeper-3.4.12.tar.gz -C /usr/local/
- ln -s /usr/local/zookeeper-3.4.12/ /usr/local/zookeeper
-
修改配置文件
dataDir=/data/zookeeper/data #快照日志的存储路径
dataLogDir=/data/zookeeper/datalog #事物日志的存储路径,默认存储到dataDir制定的目录(会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多)
server.1=1.1.1.21:2888:3888 #server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到数据目录下面myid文件里
server.2=1.1.1.22:2888:3888
server.3=1.1.1.23:2888:3888
- 其他默认配置注释
tickTime=2000 #Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔
initLimit=10 #这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,
#而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。
#当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,
#那么表明这个客户端连接失败。总的时间长度就是 10*2=20 秒
syncLimit=5 #这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,
#总的时间长度就是5*2=10秒
dataDir=/data/zookeeper/data #快照日志的存储路径
dataLogDir=/data/zookeeper/datalog #事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录
#这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
clientPort=2181 #这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
server.1=192.168.10.21:2888:3888
server.2=192.168.10.22:2888:3888
server.3=192.168.10.23:2888:3888
#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到数据目录下面myid文件里
#192.168.10.21为IP地址,第一个端口是leader和follower之间的通信端口,默认是2888,服务启动后,只有leader会监听这个端口
#第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
-
创建数据目录及服务器id
-
mkdir /data/zookeeper/data{,log} -p
-
echo 1 > /data/zookeeper/data/myid
- id分别为1 2 3,与配置文件对应
-
-
启动服务
-
/usr/local/zookeeper/bin/zkServer.sh start
- 启动服务
-
/usr/local/zookeeper/bin/zkServer.sh status
- 查看状态
-
netstat -tanp |grep -E “2888|3888”
-
jps
- 看运行的进程号
-
-
重要说明
-
myid和server.myid
- 在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识
-
conf/zoo.cfg
- zookeeper配置文件
-
conf/log4j.properties
- zk的日志输出文件。用java写的程序基本上日志都用log4j来进行管理
-
bin/zkServer.sh
- 主管理程序文件
-
bin/zkEnv.sh
- 主要配置zookeeper集群启动时配置环境变量的文件
-
zookeeper不会主动的清除旧的快照和日志文件,这个是操作者的责任
-
清理方法
- 脚本+计划任务清理
- 使用bin/zkCleanup.sh这个脚本清理,具体使用方法找官方文档
- 3.4.0开始zookeeper可自动清理snapshot和事务日志,在zoo.cfg中配置:
-
vim conf/zoo.cfg
autopurge.purgeInterval=1 #清理频率,单位是小时,默认是0不开启
autopurge.snapRetainCount=3 #需要保留的文件数目, 默认是保留3个
部署集群
kafka
-
环境
-
已经搭建好的zookeeper集群(虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群)
-
软件版本
- kafka_2.12-2.1.0
-
-
安装kafka
- tar xf kafka_2.12-2.1.0.tgz -C /usr/local/
- ln -s /usr/local/kafka_2.12-2.1.0/ /usr/local/kafka
-
创建数据目录
- mkdir /data/kafka-logs
-
修改配置文件
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://1.1.1.21:9092 #监听套接字,每台服务器独立
log.dirs=/data/kafka-logs #消息存放的目录,多个目录’,'分隔,num.io.threads要大于这个目录的个数,如果配置多个目录,新创建的topic把消息持久化在分区数最少那一个目录中
num.partitions=1 #默认的分区数,一个topic默认1个分区数
default.replication.factor=2 #kafka保存消息的副本数
zookeeper.connect=1.1.1.21:2181,1.1.1.22:2181,1.1.1.23:2181 #zookeeper集群
- 其他默认配置注释
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://192.168.10.21:9092 #监听套接字
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/data/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数
#如果配置多个目录,新创建的topic把消息持久化在分区数最少那一个目录中
num.partitions=1 #默认的分区数,一个topic默认1个分区数
num.recovery.threads.per.data.dir=1 #在启动时恢复日志和关闭时刷新日志时每个数据目录的线程的数量,默认1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数
replica.fetch.max.bytes=5242880 #取消息的最大字节数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息如果有,删除
zookeeper.connect=192.168.10.21:2181,192.168.10.22:2181,192.168.10.23:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
-
启动kafka
-
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
-
jps
- 看运行的进程号
-
-
验证
-
创建topic
- /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 1.1.1.21:2181 --replication-factor 2
-
创建发布者
- /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 1.1.1.22:9092 --topic qian
- 交互输入发布内容
-
创建订阅者
- /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 1.1.1.21:9092 --topic qian --from-beginning
- 如果都能接收到发布者发布的内容,说明kafka部署成功
-
topic、发布者、订阅者在kafka集群中任意服务器上创建均可
-
其他操作
-
查看所有topic
- /usr/local/kafka/bin/kafka-topics.sh --zookeeper 1.1.1.23:2181 --list
-
查看指定topic的详细信息
- /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 1.1.1.23:2181 --topic qian
-
删除topic
- /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 1.1.1.23:2181 --topic qian
-
-
-
filebeat->logstash->kafka
input {
beats {
port => 5044
}
}
filter {
grok {
match => {
“message” => “%{COMBINEDAPACHELOG}”
}
remove_field => [“message”,“beat”,“offset”,“tags”,“prospector”]
}
}
output {
kafka {
bootstrap_servers => “1.1.1.21:9092,1.1.1.22:9092,1.1.1.23:9092”
topic_id => “httpd-log”
compression_type => “snappy”
codec => “json”
}
}
eflk数据流
数据流
- appserver --> filebeat --> kafka --> logstash --> elasicsearch --> kibana
filebeat配置
filebeat.inputs:
- type: log
paths:- /var/log/nginx/access.log
json.keys_under_root: true
json.add_error_key: true
json.message_key: log
output.kafka:
hosts: [ “1.1.1.21:9092”,“1.1.1.22:9092”,“1.1.1.23:9092”]
topic: ‘nginx-access-log’
- /var/log/nginx/access.log
logstash配置
input {
kafka {
bootstrap_servers => “1.1.1.21:9092”,“1.1.1.22:9092”,“1.1.1.23:9092”
topics => “nginx-access-log”
auto_offset_reset => “earliest”
codec => “json”
decorate_events => true
}
}
filter {
grok {
match => {“log” => “%{COMBINEDAPACHELOG} %{QS:x-forwarded-for}”}
remove_field => [“error”,“beat”,“offset”,“auth”,“ident”,“log”]
}
}
output{
elasticsearch {
hosts => [“1.1.1.11:9200”]
index => “nginx-access-log-%{+YYYY.MM.dd}”
}
}