文章目录
- 一、日志收集所解决的问题
- 二、`Elastic Stack` 组件介绍
- 2.1 `Elasticsearch`
- 2.2 `Logstash`
- 2.3 `Kibana`
- 2.4 `Filebeat` beats
 
- 三、`ELK Stack`集群安装
- 3.1 安装`JAVA`环境(所有`ES`节点)
- 3.2 安装`ES`集群
- 3.2.1 `ES`单节点部署
- 3.2.2 `ES` `JAVA`调优:堆(`heap`)内存大小
- 3.2.3 `ES`使用自定义`JAVA`环境配置
- 3.2.3 部署`ES`高可用集群
 
- 3.3 `Kibana`安装(`elk03`节点)
- 3.4 `Logstash`安装(`elk02`)
- 3.5 `Filebeat`安装(`elk01`)
- 3.5.1 `Filebeat`架构
 
 
- 四、`Filebeat`日志收集
- 4.1 `Filebeat`标准输入和输出
- 4.2.2 `Filebeat `基于`Log`类型进行输入并且自定义`Tags`
- 4.2.8 `Filebeat`输出数据至`Elasticsearch`集群
- 4.2.12 `FIlebeat` 收集`JSON`格式的`nginx`访问日志
 
 
- 五、`Logstash` 日志收集
- 5.1 `Logstash`安装
- 5.3 `Logstash`基于`File`的形式进行`input`
- 5.4 `Logstash`基于`Filebeat`形式进行`input`
- 5.5 Logstash基于ES形式进行output
 
- 六、`Logstash grok`插件使用
- 6.1 `Logstash` 内置正则使用
- 6.4 `Logstash`删除指定字段
 
- 八、`Logstash useragent`插件分析用户客户端类型
- 九、`Logstash`多`if`分支
- 十、`Kibana`操作使用
- 10.1 `Kibana`手动创建索引模板
 
- 九、`kafka`简介和使用
- 9.1`kafka`相关概念
- 9.1.1 `Kafka`介绍
- 9.1.2 `Kafka`相关专业术语
 
- 9.2 `Zookeeper`使用和配置
- 9.2.1 简介
- 9.2.2 `Zookeeper`集群部署:三节点😀
- 9.2.3 `Zookeeper`角色划分
- 9.2.4 `zookeeper`配置内存堆栈
 
- 9.3 `Kafka`集群安装:三节点😀
- 9.4 `Kafka Topic`日常操作
- 9.4.1 查看`Topic`相关信息
- 9.4.2 创建`Topic`
 
- 9.5 `Filebeat`收集日志至`Kafka`
- 9.6 `Logstash`收集`Kafka Topic`日志
 
- 十、`Elasticsearch Restful`风格`API`实战
- 10.1 `ES`集群状态
- 10.2 `Elasticsearch`术语
 
一、日志收集所解决的问题
运维工程师首要职责维护服务的正常运行,什么是正常运行状态?服务的正常运行是万千报错中的一种“特例”,维护这个特例?
- 生产环境出现问题后,需要查看各种⽇志进行分析排错;
- 日常运维工作,日志文件分散,运维工作繁琐。可以实现日志聚合;
- 开发人员没有登陆服务器的权限。开发人员可以通过web界面查看日志;
- 面对大量的访问日志,可以统计出各项指标,比如PV UV。
二、Elastic Stack 组件介绍
 
Elasticsearch 9部署文档:https://www.elastic.co/docs/deploy-manage/deploy/self-managed/install-elasticsearch-from-archive-on-linux-macos
2.1 Elasticsearch
 
Elasticsearch(简称ES)是一个分布式、**RESTful**风格的开源搜索和数据分析引擎,专为全文搜索、结构化搜索、分析、和数据可视化而设计。支持几乎实时的搜索,并且具有高扩展性,能够处理PB级别的数据。
- Index(索引): 索引类似于关系型数据库中的表,它存储着相似结构的数据。在- Elasticsearch中,每个索引包含一个或多个文档(- Document)。
- Document(文档): 文档是- Elasticsearch的基本数据单位,相当于关系型数据库中的一行记录。文档以- JSON格式存储,包含字段和值的对。
- Shards(分片)和- Replicas(副本):- Elasticsearch将数据分为多个分片存储,每个索引可以包含一个或多个分片。分片可以提高并行查询的性能。副本是分片的备份,保证了数据的高可用性和容错性。
- Cluster(集群):- Elasticsearch集群由一个或多个节点(- Node)组成。每个节点负责存储部分数据,并参与集群中的索引和搜索操作。集群有一个主节点,用于管理集群状态。
- Node(节点): 节点是- Elasticsearch的运行实例,每个节点属于某个集群。节点可以充当主节点、数据节点。
- 使用场景: - 日志和事件数据分析: Elasticsearch常用于集中化日志管理和分析,帮助企业在海量日志中快速定位问题。
- 电商网站的搜索功能: Elasticsearch被广泛用于电商网站的产品搜索,提供快速、相关性高的搜索结果。
- 数据存储与检索:Elasticsearch能有效存储和检索结构化或非结构化数据,适用于大规模数据的管理。 JSON key=values
 
- 日志和事件数据分析: 
2.2 Logstash
 
 Logstash 是一个开源的数据收集、处理和传输引擎,主要用于实时的数据管道管理。是 Elastic Stack的重要组成部分,负责将各种来源的数据收集起来,进行过滤和格式化处理,并最终将其输出到指定的目标,如 Elasticsearch、文件或其他存储系统。
2.3 Kibana
 
 Kibana 提供强大的数据可视化能力,使用户可以在浏览器中实时查看、搜索和分析存储在 Elasticsearch 中的数据。它提供丰富的可视化选项,如图表、地图和表格,支持构建交互式仪表板。
2.4 Filebeat beats
 
Filebeat 是一个轻量级的日志收集和传输工具。它通常用于从各种数据源收集日志,并将这些日志传输到Logstash或 Elasticsearch 进行进一步处理和分析。Filebeat 是一个边车代理,安装在需要监控的服务器上,用于高效地读取和转发日志数据。它旨在减少资源消耗,提供可靠的日志传输,并确保在出现故障时不会丢失日志数据。
三、ELK Stack集群安装
 
3.1 安装JAVA环境(所有ES节点)
 
#>>> 上传jdk1.8安装包
$ ll jdk-8u381-linux-x64.tar.gz 
-rw-r--r--. 1 root root 139273048 7月  31 10:23 jdk-8u381-linux-x64.tar.gz#>>> 解压jdk到指定目录
$ tar xf jdk-8u381-linux-x64.tar.gz  -C /usr/local/#>>> 创建软链接
$ cd /usr/local/
$ ln -s jdk1.8.0_381/ java#>>> 声明Java环境变量
$ cat >> /etc/profile.d/jdk.sh <<-EOF
#!/bin/bash
export JAVA_HOME=/usr/local/java
export PATH=\$PATH:\$JAVA_HOME/bin
EOF#>>> 重新加载配置文件
$ source /etc/profile.d/jdk.sh #>>> 测试java
$ java -version
java version "1.8.0_381"
Java(TM) SE Runtime Environment (build 1.8.0_381-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.381-b09, mixed mode)

3.2 安装ES集群
 
3.2.1 ES单节点部署
 
#>>> 解压es安装包到指定目录
$ tar xf elasticsearch-7.17.11-linux-x86_64.tar.gz  -C /usr/local#>>> 创建es软链接
$ cd /usr/local/
$ ln -s elasticsearch-7.17.11/ es#>>> 声明ES环境变量
$ cat >> /etc/profile.d/es.sh <<-EOF
#! /bin/bash
export ES_HOME=/usr/local/es
export PATH=\$PATH:\$ES_HOME/bin
EOF#>>> 重新加载环境变量
$ source /etc/profile.d/es.sh #>>> 启动es实例
$ elasticsearch
此时启动会有报错信息,记住详细看报错信息哟!报错信息如下:此服务不能以超级管理员的身份运行,需要单独创建
es普通用户。
解决方法如下操作:
#>>> 创建用于启动ES的用户
$ useradd es#>>> 查看用户是否创建
$ id es
uid=1000(es) gid=1000(es) 组=1000(es)#>>> 修改elasticsearch属主和数组
$ chown  -R es.es /usr/local/elasticsearch-7.17.11/
#>>> 查看是否修改成功
$ ll /usr/local/elasticsearch-7.17.11/
#>>> 切换es工作目录
$ cd /usr/local/elasticsearch-7.17.11/config/#>>> 备份es配置文件
$ cp elasticsearch.yml{,.bak}#>>> 修改文件相关参数
$ egrep -v  "^(#|$)" elasticsearch.yml
···
# es集群名称
cluster.name: study-elk-cluster
# 节点名称(一般以当前主机名一致)
node.name: elk01	
# 监听地址,,配置IP则知允许基于IP地址进行访问
network.host: 0.0.0.0	
# 自动发现节点IP
discovery.seed_hosts: ["192.168.100.160"]	
# 初始化master节点
cluster.initial_master_nodes: ["192.168.100.160"] 
# 添加以下参数关闭geoip数据库的更新,减少对带宽和存储的消耗,禁用自动更新。GeoIP根据IP地址确定地理位置的技术,用于日志分析、用户活动追踪等场景。
ingest.geoip.downloader.enabled: false
···#>>> 测试启动ES
$ su -c "elasticsearch" es
虽然
es能够正常启动,但是启动日志会出下报错信息,需要我们修改启动的内核参数,第一个为修改系统最大的文件描述符,第二个修改系统的虚拟内存。

#>>> 修改es需要的limits参数(重新连接会话框才能成功加载参数)
$ cat >>/etc/security/limits.d/elk.conf <<EOF
* soft nofile 65535
* hard nofile 131070
EOF# 参数解释:soft nofile 65535表示将文件描述符的软限制设置为65535。hard nofile 131070表示将文件描述符的硬限制设置为131070。#>>> 查看limits参数是否加载
$ ulimit -Sn
65535
$ ulimit -Hn
131070#>>> 修改内核参数
$ cat >> /etc/sysctl.d/elk.conf <<'EOF'
vm.max_map_count = 262144
EOF# 参数解释vm.max_map_count = 262144 用于设置单个进程可以拥有的内存映射的最大数量。#>>> 加载内核参数
$ sysctl -f /etc/sysctl.d/elk.conf
vm.max_map_count = 262144
$ sysctl -q vm.max_map_count
vm.max_map_count = 262144#>>> 测试启动ES
$ su -c "elasticsearch" es
#>>> 或者使用一下的参数实现后台运行
$ su -c "elasticsearch -d" es#>>> 测试节点
$ curl  192.168.100.160:9200
{"name" : "elk01",  # 节点的名称,一般以当前主机名命名。"cluster_name" : "study-elk-cluster",		#  Elasticsearch集群的名称,多个节点可以组成一个集群,共同提供搜索和存储功能。"cluster_uuid" : "EyOTgfNNSHOvUsQNLQ9f7Q", # 集群的唯一标识符(UUID)。每个Elasticsearch集群都有一个唯一的cluster_uuid,用于在集群内识别和管理节点之间的关系。"version" : {"number" : "7.17.11",  # Elasticsearch 实例的版本"build_flavor" : "default",  # 示这个版本没有特殊的自定义修改或变化。"build_type" : "tar",  # Elasticsearch 是通过 tar 包安装的。"build_hash" : "eeedb98c60326ea3d46caef960fb4c77958fb885",  # Git提交哈希。"build_date" : "2023-06-23T05:33:12.261262042Z", # 版本的构建日期和时间。"build_snapshot" : false,  # 此版本适用于生产"lucene_version" : "8.11.1",  # Lucene版本"minimum_wire_compatibility_version" : "6.8.0", # elasticsearch实例能够与运行 6.8.0 及以上版本的其他节点进行通讯。"minimum_index_compatibility_version" : "6.0.0-beta1"  # Elasticsearch兼容的最低索引版本。},"tagline" : "You Know, for Search" #  Elasticsearch的标语或宣传口号。
}
端口解释:
- 9200对外暴露端口。使用的- HTTP协议,- Elasticsearch的- REST API服务的默认端口。开发者和应用程序通过- HTTP协议与- Elasticsearch进行交互,发送查询、索引文档、管理集群等操作。
- 9300集群内部通讯端口。使用- TCP协议。 用于集群内的节点间同步和协调。节点之间通过这个端口交换数据,包括集群管理信息、索引和搜索操作等。
3.2.2 ES JAVA调优:堆(heap)内存大小
 
#>>> 查看运行的JAVA程序
$ jps  
14917 Jps
14713 Elasticsearch#>>> 查看ES堆内存大小
$ jmap -heap 14713(pid)

#>>> 修改堆内存大小(最大设置为32G,要么内存的一半)
$ vim /usr/local/elasticsearch-7.17.11/config/jvm.options
···
-Xms256m
-Xmx256m
···#>>> 删除数据、日志
$ rm -rf /usr/local/es/{data,logs}/*
#>>> 删除缓存数据
$ rm -rf /tmp/*#>>> 准备ES启动脚本并启动(加入systemd)
$ cat >> /usr/lib/systemd/system/es.service <<EOF
[Unit]
# 描述服务的简短说明,这里描述为 "ELK"。
Description=ELK
# 指定服务的启动顺序。表示该服务在网络目标 (network.target) 之后启动,确保网络服务已启动并可用。
After=network.target[Service]
# 指定服务的启动类型为 forking,表示服务启动后会产生一个子进程,并且主进程会在启动完成后退出。这通常用于后台运行的守护进程。
Type=forking
# 指定启动服务的命令
ExecStart=/usr/local/es/bin/elasticsearch -d
# 指定服务在退出后不重启。可以根据需要将其更改为 always 或 on-failure,以确保服务在失败后自动重启。
Restart=no
# 指定以 es 用户的身份运行服务。
User=es
# 指定服务所属的组为 es。
Group=es
# 设置进程打开文件的最大数量(文件描述符限制)
LimitNOFILE=131070[Install]
# 指定服务的目标,表示该服务在多用户模式下可用。
WantedBy=multi-user.target
EOF#>>> 重新加载systemd配置文件
$ systemctl daemon-reload#>>> 启动es
$ systemctl enable --now es
3.2.3 ES使用自定义JAVA环境配置
 
由于默认情况启动ES时,使用的ES家目录下的JAVA环境。如果需要自定义JAVA环境启动ES。请配置一下参数:
# Step 1:切换至ES命令存放目录
$ cd /usr/local/es/bin/# Step 2:修改ES默认启动脚本
$ vim elasticsearch# 脚本内添加以下参数:# 设置 Java 路径export JAVA_HOME=/usr/local/javaexport PATH=$JAVA_HOME/bin:$PATH# Java 选项:配置堆栈大小export JVM_OPTIONS="-Xms1g -Xmx4g"
 
 
# Step 3:重启ES
$ systemctl restart es# Step 4:查看es进程信息
$ ps -aux | grep java

3.2.3 部署ES高可用集群
 
#>>> 创建用于启动ES的用户
$ useradd es 
$ id es
uid=1000(elasticsearch) gid=1000(elasticsearch) 组=1000(elasticsearch)#>>> 创建ES数据目录和日志目录存放目录
$ mkdir -p /opt/{data,logs}
$ install -d /opt/{data,logs}/es -o es -g es#>>> 解压es安装包到指定目录
$ tar xf elasticsearch-7.17.11-linux-x86_64.tar.gz  -C /opt/#>>> 更改目录名
$ cd /opt/ && mv  elasticsearch-7.17.11  es#>>> 创建ES环境变量
$ vim  >> /etc/profile.d/es.sh <<-EOF
#! /bin/bash
export ES_HOME=/opt/es
export PATH=\$PATH:\$ES_HOME/bin
EOF#>>> 重新加载环境变量
$ source /etc/profile.d/es.sh #>>> 修改elasticsearch属主和数组
$ chown -R  es,es  /opt/es#>>> 修改es需要的limits参数(重新连接会话框才能成功加载参数)
$ cat >> /etc/security/limits.d/elk.conf <<-EOF
* soft nofile 65535
* hard nofile 131070
EOF#>>> 查看limits参数是否加载
$ ulimit -Sn
65535
$ ulimit -Hn
131070#>>> 修改内核参数
$ cat > /etc/sysctl.d/elk.conf <<EOF
vm.max_map_count = 262144
EOF#>>> 加载内核参数
$ sysctl -f /etc/sysctl.d/elk.conf
vm.max_map_count = 262144
$ sysctl -q vm.max_map_count
vm.max_map_count = 262144#>>> 修改堆内存大小(最大设置为32G,要么内存的一半)
$ vim /opt/elasticsearch-7.17.11/config/jvm.options
···
-Xms256m
-Xmx256m
···
#>>> elk01修改配置文件
$ egrep -v "^(#|$)" /opt/es/config/elasticsearch.yml 
cluster.name: study-elk-cluster
node.name: elk01
path.data: /opt/data/es	# 指定数据目录
path.logs: /opt/logs/es	# 指定日志目录
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
#>>> elk02修改配置文件
$ egrep -v "^(#|$)" /opt/es/config/elasticsearch.yml 
cluster.name: study-elk-cluster
node.name: elk02
path.data: /opt/data/es
path.logs: /opt/logs/es
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
#>>> elk03修改配置文件
$ egrep -v "^#|^$" /opt/es/config/elasticsearch.yml 
cluster.name: study-elk-cluster
node.name: elk03
path.data: /opt/data/es
path.logs: /opt/logs/es
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
#>>> 所有节点添加elk启动脚本
$ cat > /usr/lib/systemd/system/es.service <<EOF
[Unit]
Description=ELK
After=network.target
[Service]
Type=forking
ExecStart=/opt/es/bin/elasticsearch -d
Restart=no
User=es
Group=es
LimitNOFILE=131070
[Install]
WantedBy=multi-user.target
EOF#>>> 所有节点重新加载并启动
$ systemctl daemon-reload
$ systemctl restart es#>>> 测试集群
[root@elk01 ~]# curl 192.168.100.160:9200/_cat/nodes
192.168.100.160 40 59  8 0.14 0.10 0.06 cdfhilmrstw * elk01
192.168.100.161 54 30 11 0.55 0.29 0.11 cdfhilmrstw - elk02
192.168.100.162 48 28  8 0.29 0.16 0.06 cdfhilmrstw - elk03# 参数解释:第一列:每个节点的IP地址;第二列:每个节点的CPU使用率;第三列:每个节点的内存的使用率;第四列:每个节点的活跃分片数量;第五列:每个节点的1分钟、5分钟、15分钟平均负载;第六列:每个节点在集群中的角色;第七列:*代表主节点;主节点负责管理集群的元数据、分片分配和集群状态等任务。第八列:节点的名称
3.3 Kibana安装(elk03节点)
 
#>>> 安装kibana
$ yum localinstall -y kibana-7.17.11-x86_64.rpm #>>> 备份配置文件
$ cd /etc/kibana/
$ cp kibana.yml kibana.yml.bak#>>> 修改kibana配置文件
$ egrep -v "^(#|$)" kibana.yml 
# 服务端口
server.port: 5601
# 主机地址或者主机名
server.host: "0.0.0.0"
# 服务名称
server.name: "study-elk-kibana"
# ES主机组地址
elasticsearch.hosts: ["http://192.168.100.160:9200","http://192.168.100.161:9200","http://192.168.100.162:9200"]
# 修改语言
i18n.locale: "zh-CN"#>>> 启动Kibana
$ systemctl enable --now kibana#>>> 游览器IP+5601访问



3.4 Logstash安装(elk02)
 
#>>> 安装Logstash
$ yum localinstall -y logstash-7.17.11-x86_64.rpm #>>> 创建软连接
$ ln -s /usr/share/logstash/bin/logstash   /usr/bin/logstash#>>> 编写测试文件
$ mkdir conf-logstash
$ cat  conf-logstash/01-stdin-to-stdout.conf 
input {stdin {}
}
output{stdout {}
}#>>> 测试Logstash
$ logstash -f ~/conf-logstash/01-stdin-to-stdout.conf 

3.5 Filebeat安装(elk01)
 
边车代理,需要安装在每台服务器上,并且通过tail -f 读取指定目录的日志文件。“收割机” Beats
 帮助文档:https://www.elastic.co/guide/en/beats/filebeat/7.17/index.html
# Step 1:elk01主机安装Filebeat
$ yum localinstall -y filebeat-7.17.11-x86_64.rpm # Step 2:elk01主机测试Filebeat
$ mkdir /root/filebeat-config
$ cp /etc/filebeat/filebeat.yml  /root/filebeat-config/01-test.yml
$ cat /root/filebeat-config/filebeat-config/01-test.yml
filebeat.inputs:
- type: stdinenabled: trueoutput.console:pretty: true    # 启动美观格式输出# Step 2:elk01主机启动Filebeat实例
$ filebeat -e -c /opt/filebeat/filebeat-config/01-test.yml 
 
 
3.5.1 Filebeat架构
 
架构图:
 Input(数据源)收集方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/configuration-filebeat-options.html
 Output(输出地)推送方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/configuring-output.html
 
四、Filebeat日志收集
 
 Input方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-stdin.html
4.1 Filebeat标准输入和输出
 
 标准输入官方文档:https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-stdin.html
  标准输出官方文档:https://www.elastic.co/guide/en/beats/filebeat/7.17/console-output.html
  简介:使用终端输入从标准输入读取事件。
# Step 1:elk01主机创建Filebeat配置文件测试目录
$ mkdir filebeat-config# Step 2:elk01主机创建配置文件
$ vim filebeat-config/01-stdin-on-stdout.yml 
# 指定输入类型
filebeat.inputs:# 终端标准输入
- type: stdin
# 标准输出类型:终端输出output.console:#如果设置为 true,则写入 stdout 的事件将采用良好的格式。默认值为 false。pretty: true# Step 3:elk01主机测试
$ filebeat  -e -c /root/filebeat-config/01-stdin-on-stdout.yml 
 
 
4.2.2 Filebeat 基于Log类型进行输入并且自定义Tags
 
$ vim  filebeat-config/02-log-on-stdout.yml 
filebeat.inputs:
- type: logpaths:- /var/log/messages# 对当前的输入日志文件,指定独有的标签tags: ["system logs"]fields:log_type: system- type: logpaths:- /var/log/nginx/access.log# 对当前的输入日志文件,指定独有的标签 tags: ["nginx access_log"]fields:log_type: nginx_accessoutput.console:pretty: true#>>> 清除偏移量
$ > /var/lib/filebeat/registry/filebeat/log.json #>>> 启动Filebeat实例
$ filebeat  -e -c /root/filebeat-config/02-log-on-stdout.yml 
 
 
对于
Tags的设定,将Filebeat收集到的日志转发至ES集群时,可以针对不同的Tag设置不同的索引,方便查阅。
4.2.8 Filebeat输出数据至Elasticsearch集群
 
 官方链接:https://www.elastic.co/guide/en/beats/filebeat/7.17/elasticsearch-output.html#elasticsearch-output
  简介:Elasticsearch输出使用Elasticsearch HTTP API将事件直接发送到Elasticearch。
#>>> 编写Filebeat配置文件
$ vim filebeat-config/06-log-nginx-access-es.yml 
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["nginx_access"]fields:log_type: nginx_accessfields_under_root: true# 输出至ES集群
output.elasticsearch:# ES集群地址hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]#>>> 启动Filebeat实例
$ filebeat  -e -c /root/filebeat-config/06-log-nginx-access-es.yml 

 Kibana查看ES数据
 
 
 
 
 
通过索引模式来匹配索引

4.2.12 FIlebeat 收集JSON格式的nginx访问日志
 
#>>> 安装nginx
$ vim /etc/yum.repos.d/nginx.repo
[nginx-stable]
name=nginx stable repo
baseurl=http://nginx.org/packages/centos/$releasever/$basearch/
gpgcheck=0
enabled=1
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true$ yum install -y nginx#>>> 启动nginx
$ systemctl enable --now nginx#>>> 修改nginx访问日志格式
$ vim /etc/nginx/nginx.conf 
···log_format  test_nginx_json '{"@timestamp":"$time_iso8601",''"host":"$server_addr",''"clientip":"$remote_addr",''"SendBytes":$body_bytes_sent,''"responsetime":$request_time,''"upstreamtime":"$upstream_response_time",''"upstreamhost":"$upstream_addr",''"http_host":"$host",''"uri":"$uri",''"domain":"$host",''"xff":"$http_x_forwarded_for",''"referer":"$http_referer",''"tcp_xff":"$proxy_protocol_addr",''"http_user_agent":"$http_user_agent",''"status":"$status"}';access_log  /var/log/nginx/access.log  test_nginx_json;
···#>>> 检查语法格式
$ nginx -t#>>> 重启nginx
$ systemctl reload nginx#>>> 测试
$ curl localhost
$ cat /var/log/nginx/access.log 

#>>> 修改Filebeat配置(识别json格式日志,生成相关字段)
$ vim filebeat-config/10-log-nginx-log-shard-json-es.yml 
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access.log*tags: ["access"]
# 启动json格式  json.keys_under_root: trueoutput.elasticsearch:hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]#>>> 清除filebeat的指针偏移,用于从第一行重新读取日志文件
$ rm -rf /var/lib/filebeat/*#>>> 清楚以前的ES nginx索引,防止出现脏数据
略#>>> 重启filebeat
$ filebeat -e -c ~/filebeat-config/10-log-nginx-log-shard-json-es.yml 
五、Logstash 日志收集
 
 官网:https://www.elastic.co/guide/en/logstash/7.17/
  简介:Logstash是一个具有实时流水线功能的开源数据收集引擎。Logstash可以动态地统一来自不同来源的数据,并将数据规范化为您选择的目的地。为各种高级下游分析和可视化用例清理和民主化您的所有数据。虽然Logstash最初推动了日志收集的创新,但其功能远远超出了该用例。任何类型的事件都可以通过广泛的输入、过滤和输出插件进行丰富和转换。
5.1 Logstash安装
 
#>>> 安装
$ yum install -y logstash-7.17.11-x86_64.rpm #>>> 声明软连接
$ ln -s /usr/share/logstash/bin/logstash   /sbin/#>>> 创建Logstash 测试文件目录
$ mkdir logstash-config
5.3 Logstash基于File的形式进行input
 
 官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-file.html
#>>> 编写配置文件
$ vim 02-file-input.conf 
input {file {# 指定日志文件收集的路径path => ["/tmp/*.txt"]# 从日志文件的第一行进行读取,只会在第一次启动时从头读取start_position => "beginning" # 从日志文件的尾行读取日志,且偏移指针文件对原日志文件偏移量未记录# start_position => "end" }
}output {stdout {}
}#>>> 准备测试日志文件
$ echo 1111 > /tmp/1.txt#>>> 启动Logstash实例
$ logstash -f /root/stduy-logstash-config/02_input_file.conf 
 
 
#>>> 查看偏移量
$ cat /usr/share/logstash/data/plugins/inputs/file/.sincedb_820ddbbd098cfece4b56f4fcbf67a9bb
16789540 0 64768 4 1717082577.147625 /tmp/1.txt#>>> 查看日志文件
$ ll -i /tmp/1.txt 
16789540 -rw-r--r-- 1 root root 4 5月  30 23:20 /tmp/1.txt
5.4 Logstash基于Filebeat形式进行input
 
 Filebeat output Logstash官网:https://www.elastic.co/guide/en/beats/filebeat/7.17/logstash-output.html
  Logstash input Fliebeat官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-beats.html
#>>> 修改Filebeat配置文件
$ vim /root/filebeat-config/output-logstash.yml filebeat.inputs:
- type: logpaths:- /tmp/test.logtags: ["test"]fields:log_type: testfields_under_root: trueoutput.logstash:# Logstash主机地址hosts: ["192.168.174.142:5044"]#>>> 启动Filebeat实例
$ filebeat -e -c /root/filebeat-conifg/output-logstash.yml 
#>>> 修改Logstash配置文件
$ vim input-beats.conf 
input {beats {# 通讯端口号,默认5044port => 5044}
}output {stdout {}
}#>>> 启动Logstash
$ logstash -rf /root/logstash-config/input-beats.conf
5.5 Logstash基于ES形式进行output
 官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-outputs-elasticsearch.html
#>>> 修改Logstash配置文件
$ cat 08_output_es.conf 
input {beats {# 通讯端口号,默认5044port => 5044}
}output {elasticsearch {# ES集群IPhosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] # 索引模式名称index => "test-log-%{+yyyy.MM.dd}"}
}#>>> 启动Logstash
$ logstash -f /root/stduy-logstash-config/08_output_es.conf 
六、Logstash grok插件使用
 
 官网: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html
  简介:解析任意文本并对其进行结构化。Grok 是将非结构化日志数据解析为结构化和可查询内容的好方法。该工具非常适合系统日志,apache和其他Web服务器日志,mysql 日志,以及通常为人类编写的任何日志格式。
6.1 Logstash 内置正则使用
 
 简介:将非结构化的日志格式通过Grok内置的正则转化为结构化。
#>>> 日志格式
192.168.174.1 - - [01/Jun/2024:10:37:16 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0" "-"
192.168.174.1 - - [01/Jun/2024:10:37:16 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0" "-"#>>> 编写Filebeat配置文件
$ vim filebeat-out-logstash.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["access"]
output.logstash:hosts: ["192.168.174.142:5044"]#>>> 清除偏移量
$ rm -rf /var/lib/filebeat/*
#>>> 启动Filebeat实例             
$ filebeat  -e -c ~/filebeat-out-logstash.yml #>>> 编写Logstash配置文件
$ vim filter_grok.yml 
input {beats {port => 5044}
}filter {grok {match => {"message" => "%{HTTPD_COMMONLOG}" }}
}output {stdout {}elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] index => "test-%{+yyyy.MM.dd}"}
}#>>> 启动Logstash
$ logstash  -rf ~/filter_grok.yml 

6.4 Logstash删除指定字段
 
 简介:如果此筛选器成功,请从此事件中删除任意字段。
 未删除字段终端打印如下:
 
#>>> Filebeat配置文件编写
$ vim filebeat-out-logstash.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["access"]
output.logstash:hosts: ["192.168.174.142:5044"]#>>> 删除偏移量
$ rm -rf /var/lib/filebeat/*#>>> 启动Filebeat实例
$ filebeat  -e -c ~/filebeat-out-logstash.yml #>>> Logstash配置文件编写
$ vim filter_grok.yml 
input {beats {port => 5044}
}filter {grok {match => {"message" => "%{HTTPD_COMMONLOG}"}# You can also remove multiple fields at once: remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]}
}output {stdout {}elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "test-%{+yyyy.MM.dd}"}
}#>>> 启动Logstash实例
$ logstash -rf ~/filter_grok.yml 
删除指定字段后终端数据显示如下:
 
八、Logstash useragent插件分析用户客户端类型
 
 官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-useragent.html
#>>> Filebeat配置文件编写
$ vim filebeat-out-logstash.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["access"]json.keys_under_root: true  output.logstash:hosts: ["192.168.174.142:5044"]#>>> 删除偏移量
$ rm -rf /var/lib/filebeat/*#>>> 启动Filebeat实例
$ filebeat  -e -c ~/filebeat-out-logstash.yml #>>> Logstash配置文件编写
$ vim filter_grok.yml 
input {beats {port => 5044}
}filter {grok { remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]}useragent {source => "http_user_agent"target => "test_user_agent"remove_field => [ "agent","@version","tags","ecs","log","offset","type" ]}
}output {stdout {}elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "test-%{+yyyy.MM.dd}"}
}#>>> 启动Logstash实例
$ logstash -rf ~/filter_grok.yml 
九、Logstash多if分支
 
 官网:https://www.elastic.co/guide/en/logstash/7.17/event-dependent-configuration.html#metadata
  简介: 针对不同的日志类型。filter去做不同的处理
#>>> Filebeat配置文件
$ vim  filebeat-out-logstash.yml 
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logfields:log_type: nginx_accessfields_under_root: truejson.keys_under_root: true   - type: logpaths:- /var/log/nginx/error.logfields:log_type: nginx_errorfields_under_root: true- type: logpaths:- /var/log/messagesfields:log_type: system_logfields_under_root: trueoutput.logstash:hosts: ["192.168.174.142:5044"]#>>> 删除偏移量
$ rm -rf /var/lib/filebeat/*#>>> 启动Filebeat实例
$ filebeat  -e -c ~/filebeat-out-logstash.yml #>>> Logstash 配置文件准备
$ vim filter_grok.yml 
input {beats {port => 5044}
}filter {mutate {remove_field => ["agent","ephemeral_id","ecs","@version","tags","input","log","offest"]  }useragent {source => "http_user_agent" target => "study_user_agent"}
}output {stdout {}if [log_type] == "nginx_access" {  elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] index => "web-access-%{+yyyy.MM.dd}"} } else if [log_type] == "nginx_error" {elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "web-error-%{+yyyy.MM.dd}"  	 }}  else if [log_type] == "system_log" {elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "system-log-%{+yyyy.MM.dd}"     } }
}#>>> 启动Logstash
$ logstash  -rf ~/filter_grok.yml 
十、Kibana操作使用
 
10.1 Kibana手动创建索引模板
 

 
 
 
{
"number_of_replicas":3,
"number_of_shards":3
}

九、kafka简介和使用
 
9.1kafka相关概念
 
 官网:https://kafka.apache.org/
9.1.1 Kafka介绍
 
 Apache Kafka是一个开源的分布式事件流平台,为数以千计的公司提供高性能数据管道、流分析、 数据集成和任务关键型应用程序。
-  高吞吐量:使用延迟低至 2ms的机器集群以网络有限的吞吐量传递消息。
-  可伸缩:将生产集群扩展到 1000个代理、每天数万亿条消息、PB的数据和数十万个分区。弹性扩展和收缩存储和处理。
-  永久存储器:将数据流安全地存储在分布式、持久、容错的集群中。 
-  高可用性:在可用性区域上高效地扩展集群,或者跨地理区域连接单独的集群。 
9.1.2 Kafka相关专业术语
 
-  主题( Topic):主题是Kafka中用于对消息进行分类的逻辑分组,每个主题可以看作是消息的分类器。主题是多订阅者模式,即一个主题可以有多个消费者订阅。
-  分区( Partition):每个主题被分成一个或多个分区。分区是消息存储的基本单元。分区内的消息是有序的,但不同分区之间无序。每个分区可以分布在不同的Kafka服务器上,从而实现水平扩展。- leader partition负责对- kafka集群的读写操作,和客户端进行交互。
- follower partition负责去- leader partition同步数据,不可以和客户端进行交互。
 
-  偏移量( Offset):偏移量是分区中每条消息的唯一标识符。它是一个递增的整数,记录了消息在分区中的位置。消费者使用偏移量来跟踪读取消息的位置。
-  生产者( Producer):生产者是负责向Kafka主题发布消息的客户端应用程序。生产者将消息发送到指定的主题和分区。
-  消费者( Consumer):消费者是负责从Kafka主题读取消息的客户端应用程序。消费者通过订阅一个或多个主题来读取消息,并使用偏移量来跟踪读取进度。
-  消费者组( Consumer Group):消费者组是一组消费者实例,共同消费一个或多个主题的消息。
-  代理( Broker):代理是Kafka集群中的一个服务器节点,负责存储和传输消息数据。一个Kafka集群由多个代理组成,每个代理可以处理多个分区。
-  复制( Replication):Kafka中的复制机制将分区数据复制到多个代理上,以确保数据的高可用性和容错性。每个分区有一个领导副本(Leader)和若干个跟随副本(Follower)。所有的读写操作都由领导副本处理,跟随副本只需同步领导副本的数据。但是Leader和Follwer都属于副本。创建时副本数不能为0。
9.2 Zookeeper使用和配置
 
 官网链接:https://dlcdn.apache.org/zookeeper/
 下载链接:https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
9.2.1 简介
 Zookeeper 是 Apache 旗下的一个开源分布式协调服务,用于管理和协调分布式应用程序中的各种服务和组件,它提供了一系列高效且可靠的分布式数据一致性和协调机制。
9.2.2 Zookeeper集群部署:三节点😀
 
#>>> 解压安装包到指定目录
$ tar xf apache-zookeeper-3.8.2-bin.tar.gz  -C /opt/ && cd /opt/#>>> 更改目录名称
$ mv apache-zookeeper-3.8.2-bin/   zookeeper#>>> 配置zk环境变量
$ cat >> /etc/profile.d/zookeeper.sh <<-EOF
#!/bin/bash
export ZK_HOME=/opt/zookeeper
export PATH=\$PATH:\$ZK_HOME/bin
EOF#>>> 重新加载环境变量 
$ source /etc/profile.d/zookeeper.sh# 创建zk数据存放目录
$ mkdir -p /opt/data/zk#>>> 修改zk的配置文件(三节点)
$ cp /opt/zookeeper/conf/zoo_sample.cfg  /opt/zookeeper/conf/zoo.cfg$ egrep -v "^(#|$)" /opt/zookeeper/conf/zoo.cfg # Zookeeper的心跳间隔时间。服务器和客户端会通过心跳包维持连接状态。
tickTime=2000# 从节点最多可以等待10 个tickTime(即 10 * 2000 毫秒 = 20 秒)来与领导者同步数据状态。否则它将被认为是不可用的。
initLimit=10# 定义Zookeeper领导者和从节点之间的心跳和同步请求的最大允许延迟时间。领导者与从节点之间的响应时间如果超过了这个限制,从节点将被认为失去同步状态。(即 5 * 2000 毫秒 = 10 秒)。
syncLimit=5# 定义zk数据目录
dataDir=/opt/data/zk# 定义Zookeeper集群的客户端连接端口
clientPort=2181
server.140=192.168.174.140:2888:3888
server.141=192.168.174.141:2888:3888
server.142=192.168.174.142:2888:3888# 参数解释:# dataDir		zk数据目录# clientPort	端口号# server.140	zk节点唯一标识# 192.168.100.160	zk节点主机地址# 2888				集群内部通讯端口# 3888				leader选举端口# 创建ID文件(位置存放在zk的数据存放路径下)101节点
$ echo "140" > /opt/data/zk/myid102节点
$ echo "141" > /opt/data/zk/myid103节点
$ echo "142" > /opt/data/zk/myid#>>> 所有节点启动zk
$ zkServer.sh start#>>> 查看zk状态
$ zkServer.sh status
myid文件:在Zookeeper集群中,myid文件是每个Zookeeper服务器节点的重要配置文件之一,用于唯一标识集群中的每个服务器。

9.2.3 Zookeeper角色划分
 
在Zookeeper集群中,不同的服务器节点可以承担不同的角色,以确保集群的高可用性、数据一致性和故障恢复能力。主要角色包括:server.140=192.168.174.140:2888:3888:角色
-  领导者( Leader)-  处理所有写操作(如创建、更新、删除节点)并确保数据的一致性。 
-  管理并协调集群中其他节点的活动。 
-  负责为客户端请求生成唯一的事务 ID(zxid)。
-  在集群启动或领导者失效时,会进行领导者选举,选出新的领导者。 
-  定期发送心跳给跟随者,确保自己处于活动状态。 
 
-  
-  跟随者( Follower)- 处理所有读取操作(如读取节点数据)。
- 接收并转发客户端的写请求给领导者进行处理。
- 将领导者的事务日志同步到本地,确保数据一致性。
- 参与领导者选举。
- 与领导者保持同步,接收并应用领导者的事务。
 
-  观察者( Observer)- 处理读取操作,减轻领导者和跟随者的负担。
- 不参与领导者选举和事务投票,只同步领导者的事务日志。
- 提高集群的读取扩展能力,适合需要高读取吞吐量的场景。
 
-  客户端( Client)- 连接到集群中的任一节点进行读取或写入操作。
- 自动处理节点故障并重新连接到其他可用节点。
- 通过客户端API与Zookeeper集群进行交互。
 
9.2.4 zookeeper配置内存堆栈
 
#>>> 查看zk进程
$ jps
1367 Elasticsearch
5400 QuorumPeerMain
5593 Jps#>>> 查看zk的堆栈大小
$ jmap -heap 5400

zookeeper默认堆内存大小为1GB,一般设置为2GB或者4GB
#>>> 调节zookeeper推内存大小为256MB
$ vim /opt/zookeeper/conf/java.env
#! /bin/bash # 指定JDK安装路径 
export JAVA_HOME=/usr/local/java # 指定zookeeper的堆内存大小 
export JVMFLAGS="-Xms256m -Xmx256m $JVMFLAGS" #>>> 同步文件
$ scp /opt/zookeeper/conf/java.env  elk02:/opt/zookeeper/conf/
$ scp /opt/zookeeper/conf/java.env  elk03:/opt/zookeeper/conf/#>>> 所有节点重启zk
$ zkServer.sh restart#>>> 验证堆内存
$ jmap -heap `jps | grep QuorumPeerMain | awk '{print $1}'`
9.3 Kafka集群安装:三节点😀
 
 下载链接:https://kafka.apache.org/downloads
  官网:https://kafka.apache.org/
#>>> 解压安装包到指定目录
$ tar xf kafka_2.13-3.2.1.tgz  -C /opt/ &&  cd /opt/#>>> 修改目录名称
$ mv kafka_2.13-3.2.1/  kafka#>>> 配置kafka环境变量
$ cat >> /etc/profile.d/kafka.sh <<-EOF
#!/bin/bash
export KAFKA_HOME=/opt/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF#>>> 重新加载环境变量
$ source /etc/profile.d/kafka.sh #>>> 创建数据目录
$ mkdir /opt/data/kafka -p#>>> 修改kafka配置文件# 1.kafka101节点配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties
broker.id=140
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka# 2.kafka102节点配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties  
broker.id=141
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka	# /kafka为zk中的znode# 3.kafka103节点配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties  
broker.id=142
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka#>>> 所有节点启动kafka
$ kafka-server-start.sh -daemon /opt/kafka/config/server.properties #>>> zk节点查看kafka注册信息
$ zkCli.sh ls /kafka/brokers/ids | grep  "^\["
[140, 141, 142                                    #>>> kafka停止脚本
$ kafka-server-stop.sh
9.4 Kafka Topic日常操作
 
9.4.1 查看Topic相关信息
 
#>>> 查看集群中所有Topic
$ kafka-topics.sh \--bootstrap-server  192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--list#>>> 查看所有Topic详细信息
$ kafka-topics.sh   \--bootstrap-server  192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--describe#>>> 查看某个Topic信息
$ kafka-topics.sh \--bootstrap-server  192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--describe  --topic  Topice名称
9.4.2 创建Topic
 
#>>> 创建test-elk Topic
$ kafka-topics.sh  \--bootstrap-server  192.168.174.140:9092 \--create --topic test-elk#>>> 创建Topic,并指定副本数量
$ kafka-topics.sh   \--bootstrap-server  192.168.174.140:9092 \--create --topic test-elk-02  \--partitions 10 \--replication-factor 1#>>> 查看Topic的详细信息
$ kafka-topics.sh  \--bootstrap-server  192.168.174.140:9092  \--describe --topic test-elk-02

#>>> 创建Topic,并指定副本数量
$ kafka-topics.sh \--bootstrap-server  192.168.174.140:909 \--create --topic test-elk-03 \--partitions 10 \--replication-factor 2 $ kafka-topics.sh   \--bootstrap-server  192.168.174.140:9092  \--describe --topic test-elk-03

注意:副本数量不能大于Broker的数量
9.5 Filebeat收集日志至Kafka
 
官网:https://www.elastic.co/guide/en/beats/filebeat/7.17/kafka-output.html
$ cat filbeat-out-kafka.yml 
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logjson.keys_under_root: true   #- type: log
#  paths:
#    - /var/log/nginx/error.logoutput.kafka:# initial brokers for reading cluster metadatahosts: ["192.168.174.140:9092", "192.168.174.141:9092", "192.168.174.142:9092"]# message topic selection + partitioningtopic: "test-kafka-topic"$ filebeat  -e -c ~/filbeat-out-kafka.yml #>>>  kafka测试拉取数据
$ kafka-console-consumer.sh   \--topic test-kafka-topic \--bootstrap-server 192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--from-beginning
9.6 Logstash收集Kafka Topic日志
 
 官网:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-kafka.html
input {kafka {bootstrap_servers => "192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092"  # Kafka集群IPtopics => ["test-kafka-topic"]      # 指定Topic进行消费数据group_id => "test-kafka"     		# 指定消费者组codec => json {                     # 指定消费的数据是JSON格式。charset => "UTF-8"}}
}filter {mutate {remove_field => ["agent","ephemeral_id","ecs","@version","tags","input","log","offest"]  }if [log_type] == "nginx_access" {geoip {source => "clientip"}useragent {source => "http_user_agent" target => "study_user_agent"}
} 
}output {stdout {}#  if [log_type] == "nginx_access" {  
#    elasticsearch {
#      hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] 
#      index => "web-access-%{+yyyy.MM.dd}"
#    } 
#  } else if [log_type] == "nginx_error" {
#        elasticsearch {
#          hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
#          index => "web-error-%{+yyyy.MM.dd}"   
#    }
#  }  else if [log_type] == "system_log" {
#       elasticsearch {
#         hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
#         index => "system-log-%{+yyyy.MM.dd}"     
#    } 
#   }
}#>>> 启动Logstash实例
$ logstash  -rf ~/filter_grok.yml 
十、Elasticsearch Restful风格API实战
 
10.1 ES集群状态
 
-  绿色( Green)- 含义:集群健康状态正常,所有的主分片和副本分片都已分配。
- 解释:绿色状态表示集群中的所有数据都可以访问,所有分片(包括主分片和副本分片)都分配到集群中的节点上。
- 示例:假设集群有3个主分片,每个主分片有1个副本分片,那么绿色状态下这6个分片都已成功分配且正常运行。
 
-  黄色( Yellow)- 含义:集群健康状态部分正常,所有的主分片都已分配,但有一个或多个副本分片未分配。
- 解释:黄色状态表示集群中的所有主分片都可以访问,但一些副本分片由于某种原因(例如节点故障或资源不足)未能分配。这意味着数据是安全的,但没有高可用性,因为如果某个节点失败,它可能会导致数据无法访问。
- 示例:假设集群有3个主分片和每个主分片1个副本分片,如果有3个主分片和2个副本分片已分配,但1个副本分片未能分配,则集群为黄色状态。
 
-  红色( Red)-  含义:集群健康状态不正常,有一个或多个主分片未分配。 
-  解释:红色状态表示集群中有一些数据不可访问,因为主分片未能分配。此时,可能存在数据丢失的风险,需要立即采取措施来修复问题。 
-  示例:假设集群有3个主分片和每个主分片1个副本分片,如果有2个主分片和所有副本分片未能分配,则集群为红色状态。 
 
-  
10.2 Elasticsearch术语
 
Document文档,用户存储在ES的数据,ES最小单元,文档不可被拆分。文档使用JSON的对象存储类型。filed相当于数据表的字段,对文档数据根据不同属性进行分类标识。index索引,一个索引就是拥有相似特征文档的集合。shard分片,存储数据的地方,每个底层对应的使一个Lucene库,一个索引至少有一个或多个分片。replica副本,一个分片可以有0个或者多个副本。作用是对数据进行备份,一旦副本数量不为0,就会引入主分片(primary shard)和副本分片(replica shard)的概念。主分片(primary shard)实现数据的读写操作。副本分片(replica shard)可以实现数据的读操作,需要主分片同步数据,当主分片挂掉时,副本分片会变为主分片。Allocation分配。将分片分配给某个节点的过程,包括主分片和副本分片。如果副本分片,还包含从主分片复制数据的过程,此过程由Master节点调度完成。
- 上家公司每天所产生的日志量(5GB左右)
- ELastic stack技术栈有哪些组件 (beats Logstash elasticsearch Kibana)
- elasticsearch 使用场景?(日志收集场景:elk技术栈。搜索引擎(IK分词器))
- elasticsearch 数据迁移?
