利用ELK+Kafka解决方案,搭建企业级实时日志分析平台

Source
版权声明: https://blog.csdn.net/Scirhh/article/details/83016456

   

    ELK是一整套软件的组合。是一整套完整的解决方案。分别是由Logstash(收集+分析)、ElasticSearch(搜索+存储)、Kibana(可视化展示)三款软件。ELK主要是为了在海量的日志系统里面实现分布式日志数据集中式管理和查询,便于监控以及排查故障。


Elasticsearch

    ElasticSearch 是一个基于 Lucene 的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于 RESTful API 的 web 接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。

安装


[-> ~# opt] wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.2.rpm    # 官网下载软件包
[-> ~# opt] yum -y install java-1.8.0     # 相关依赖包
[-> ~# opt] yum -y install epel-release
[-> ~# opt] yum localinstall -y elasticsearch-6.4.2.rpm    

    安装完成之后,我们修改ElasticSearch的配置文件,然后启动ElasticSearch服务

[-> ~#] vim /etc/elasticsearch/elasticsearch.yml
        # 将如下的内容的注释取消,并将相关信息改成你在配置时的实际参数
        ***注意:配置文件的参数,在冒号后都有一个空格,如没有空格,服务会启动失败,并且无任何提示
            
          cluster.name: ES-cluster   # 集群名称,可更改
          node.name: es1             # 为本机的主机名
          network.host: 192.168.4.1  # 为本机的IP
          http.port: 9200            # elasticsearch服务端口,可更改
          discovery.zen.ping.unicast.hosts: ["es1", "es2","es3"]    # 集群内的主机(主机名,不一定全部都要写)
          node.master: true                                # 节点是否被选举为master
          node.data: true                                  # 节点是否存储数据
          index.number_of_shards: 4                        # 索引分片的个数
          index.number_of_replicas: 1                      # 分片的副本个数
          path.conf: /etc/elasticsearch/                   # 配置文件的路径
          path.data: /data/es/data                         # 数据目录路径
          path.work: /data/es/elasticsearch                # 工作目录路径
          path.logs:  /data/es/logs/elasticsearch/logs/       # 日志文件路径
          path.plugins:  /data/es/plugins                  # 插件路径
          bootstrap.mlockall: true                         # 内存不向swap交换
          http.enabled: true                               # 启用http

         # 如果目录路径不存在的,需要先创建        

[-> ~#] /etc/init.d/elasticsearch start    # 启动服务
[-> ~#] systemctl enable elasticsearch     # 开机自启

    一个简单的ElasticSearch就搭建好了,如果我们要部署一个ES集群,那么我们只需要在所有主机上部署好Java环境,以及在所有主机上的/etc/hosts解析主机,如下:

    # 此操作是在ES集群主机上必做

[-> ~#] yum -y install java-1.8.0
[-> ~#] vim /etc/hosts     
        192.168.4.1 es1
        192.168.4.2 es2
        192.168.4.3 es3

# 将已经完成elasticsearch服务安装的主机,将、hosts文件、elasticsearch.yml配置文件拷贝一份到集群主机上,只要修改node.name: 为本机的主机名即可,然后将服务启动起来

    最后我们来查看下Es集群是否部署好,不成功则number_of_nodes永远为1,成功会有下图信息,:

[-> ~#] curl -i http://192.168.4.1:9200/_cluster/health?pretty
# 返回的信息包括集群名称、集群数量等,如果number_of_nodes显示的是实际得集群数量,则说明集群部署成功
{
  "cluster_name" : "ES-cluster",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 3,
  "number_of_data_nodes" : 3,
  "active_primary_shards" : 26,
  "active_shards" : 52,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

ElasticSearch插件安装


# 采用本地安装,也可以采用远程安装
[-> ~#] /usr/share/elasticsearch/bin/plugin install file:///data/es/plugins/elasticsearch-head-master.zip    
[-> ~#] /usr/share/elasticsearch/bin/plugin install file:///data/es/plugins/elasticsearch-kopf-master.zip
[-> ~#] /usr/share/elasticsearch/bin/plugin install file:///data/es/plugins/bigdesk-master.zip
[-> ~#] /usr/share/elasticsearch/bin/plugin list     # 查看已安装的插件 

插件安装完成之后,访问对应插件的Url,如果访问出现如下图,则成功:

head插件展示图(http://192.168.4.1:9200/_plugin/head

kopf插件展示图(http://192.168.4.1:9200/_plugin/kopf

Bigdesk展示图(http://192.168.4.1:9200/_plugin/bigdesk


Logstash

   logstash是一个数据采集、加工处理以及传输的工具,能够将所有的数据集中处理,使不同模式和不同数据的正常化,而且数据源能够随意轻松的添加。

      安装logstash软件包

[-> ~#] wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.2.rpm

      安装完成编写logstash.con日志收集文件测试:

[-> ~#] vim /etc/logstash/logstash.conf
    # 编写简单的收集日志文件

    input{
    stdin{}
}

output{
    elasticsearch{
      hosts => ["192.168.4.1:9200","192.168.4.2:9200","192.168.4.3:9200"]
    }
}

     编写完后,使用命令查看下logstash.conf是否无误 “ /opt/logstash/bin/logstash -f /etc/logstash/logstash.conf --configtest --verbose  ”,需要注意的是,yum安装的Logstash默认在/opt目录下。源码安装可以自行指定。

[-> ~#] /opt/logstash/bin/logstash -f /etc/logstash/logstash.conf --configtest --verbose

    # 如果出现 Configuration OK字样,则说明文件准确无误

    在确认logstash.conf文件无误后,我们利用logstash.conf里面的语句进行数据的收集,然后在屏幕上输入你想测试的数据。

详细将下图:

     在输入后,我们再回到Es查看是否有刚刚在屏幕上输入的数据。

    下图我们可以看到,数据已经分别写入到了刚刚搭建好的Es集群中。

    此外,我们再来看看刚刚收集的数据的详细信息:

  通过简单测试,可以得出我们的Es集群是处于健康状态并且可以收集到数据的。那么自动的收集日志数据,等我们搭建完Kafka时一同实现。


Kafka

Apache kafka是消息中间件的一种,是一种分布式的,基于发布/订阅的消息系统。能实现一个为处理实时数据提供一个统一、高吞吐、低延迟的平台,且拥有分布式的,可划分的,冗余备份的持久性的日志服务等特点。

    将下载的kafka tar包解压到指定的位置,并修改zookeeper集群文件以及kafka的server文件,如下:

[-> ~#] wget http://mirror.bit.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
[-> ~#] tar -xvf kafka_2.11-2.0.0.tgz -C /opt/
[-> ~#] cd /opt/kafka_2.11-2.0.0/

# zookeeper集群
[-> ~# kafka_2.11-2.0.0] vim config/zookeeper.properties
    dataDir=/data/zookeeper            # 指定zookeeper的目录,如果不存在则需要创建
    tickTime=2000                      # 客户端与服务器之间维持心跳的时间间隔
    initLimit=20    
    syncLimit=10
    server.1=192.168.4.1:2888:3888     # 2888表示的是zookeeper集群中Leader的端口
    server.2=192.168.4.2:2888:3888     # 3888表示的是如果Leader宕掉了,那么从3888端口中重新选举新的Leader
    server.3=192.168.4.3:2888:3888      

# kafka服务配置
[-> ~# kafka_2.11-2.0.0] vim config/server.properties
        broker.id=0        # 唯一的
        port=9092
        host.name=192.168.4.1
        log.dirs=/data/kafka-logs    # 注意这里的是kafka的存储目录。并不是存放日志的目录,当你启动kafka的时候,会自动在当前目录下生成一个log专门存放kafka的日志文件。如果没有需创建
        zookeeper.connect=192.168.4.1:2181,192.168.4.2:2181,192.168.4.3:2181    #这里可以是主机名,也可以是ip,但是填写的是主机名,就必须在/etc/hosts文件下进行解析才可以生效

    上述的操作是单机的kafka,如果我们需要搭建kafka集群,那么将修改好的kafka整个目录scp到集群服务器上即可。

    另外需要注意的是:修改每台主机配置文件下的个别参数,我们主要修改如下操作,以下操作均在server.properties上操作:

broker.id = x     # x:是任意数,只要不重复即可
host.name=xxx.xxx.xxx.xxx     这里填写的应为本机的IP地址

# 创建相关文件夹以及文件,如在创建了/data/zookeeper后,下面应该还得再执行
    echo 1 > /data/zookeeper/myid    每台机器操作的数字也不能一样

    当我们在集群主机上都修改好配置文件参数后,随后就是将zookeeper集群启动,以及将kafka服务启动起来。如下:

# 后台启动zookeeper,在集群主机上进行操作即可

[-> ~# ] /opt/kafka_2.11-2.0.0/bin/zookeeper-server-start.sh -daemon /opt/kafka_2.11-2.0.0/config/zookeeper.properties    

# 后台启动kafka服务,在集群主机上进行操作即可
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.11-2.0.0/config/server.properties

# 查看2888、3888、2181、9092端口是否都已经启动,如果2888、3888端口不存在,那么在/opt/kafka_2.11-2.0.0/log/下查看zookeeper.out以及server.log报错信息

[-> ~# ] netstat -antulp | grep -E '2181|2888|3888|9092'

    配置了zookeeper集群以及kafka集群,启动了服务如下图为成功,缺一则为失败:

然后,我们来测试下,kafka是否可用:

新增一个topic,然后scon为他分配一个分区,设置副本
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic scon

    # 这里需要非常注意,就是factor数不能超过broker数。broker数就是集群机器的总数。如,编者的集群机器数为3台,那么factor的数就不能大于3,只能<=3.

# 查看新创建的主题scon
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic scon

# 修改创建的scon主题,这里将副本数5,修改为6
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 6 --topic scon

# 删除创建的scon主题
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic scon

#注意:是否开启topic的删除功能:默认为false
    delete.topic.enable 在conf/server.properties中delete.topic.enable=true

   测试效果如下图:

    那么,Kafka的集群环境已经搭建完成,那么我们需要测试下,logstash是否能够将收集到的日志数据传递到kafka上,最后经过kafka, 再将数据传到我们的Es集群上。


    这里或许就有疑问了,Redis同样可以,为什么要选择kafka?

    我们都知道Redis是以key的hash方式来分散对列存储数据的,且Redis作为集群使用时,对应的应用对应一个Redis,在某种程度上会造成数据的倾斜性,从而导致数据的丢失。

    而从之前我们部署Kafka集群来看,kafka的一个topic(主题),可以有多个partition(副本),而且是均匀的分布在Kafka集群上,这就不会出现redis那样的数据倾斜性。Kafka同时也具备Redis的冗余机制,像Redis集群如果有一台机器宕掉是很有可能造成数据丢失,而Kafka因为是均匀的分布在集群主机上,即使宕掉一台机器,是不会影响使用。同时Kafka作为一个订阅消息系统,还具备每秒百万级别的高吞吐量,持久性的、分布式的特点等。

    接下来我们将编写logstash.conf文件,将收集日志文件并到kafka。如下:

[-> #~] vim /etc/logstash/logstash.conf

input{
  file {
    type => "weblog"            # 定义日志文件类型
    path => "/opt/access.log"    # 日志文件路径,这里收集web的日志文件
    start_position => "beginning"     # 从第一行开始读取数据
  }
}

output {
    kafka {
      bootstrap_servers => "192.168.4.1:9092,192.168.4.2:9092,192.168.4.3:9092"    # kafka集群主机
      topic_id => "web-log"     # 主题名称,会自动创建
      compression_type => "snappy"     # 压缩方式
    }
}


# 查看kafka,logstash是否成功将数据传入到kafka上

[-> #~] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --list --zookeeper localhost:2181 

    如出现如下图,则数据写入成功:


    这里可能就会有疑问了,收集时的分区数,副本数都是系统默认生成的,那么我该如何实现指定分区数,副本数?我们可以在收集时现行创建一个主题,然后再收集的时候就使用这个主题即可。如下:

[-> #~]  /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 
--replication-factor 3 --partitions 7 --topic Topic_Name    # 3个副本,7个分区

    好了,logstash将数据写入到kafka已经是没有问题,那么接下来就是:如何将kafka上的数据信息写入到我们部署好的Es集群上。当然,linux可是" 一切皆文件 "。所以,我们还是修改logstash.conf的配置文件。

    之所以使用kafka来进行数据存储,是为了减少Es集群前端的压力,所以加上了消息队列Kafka作为一个过渡。

   编写配置文件,如下:

[-> #~] vim /etc/logstash/logstash.conf

input {
   kafka {
     zk_connect => "192.168.4.1:2181,192.168.4.2:2181,192.168.4.3:2181"
     topic_id => "web-log"
     reset_beginning => false
     consumer_threads => 5
     decorate_events => true
    }
}

output {
    elasticsearch {
       hosts => ["192.168.4.2:9200","192.168.4.3:9200"]
       index => "sc-%{type}-%{+YYYY.MM.dd}"
       workers => 1
       flush_size => 20000
        idle_flush_time => 10
        template_overwrite => true
    }
}

    数据成功写入到Es集群,如下图所示:

   另外我们来看看详细的json数据信息:

 


Kibana

    Kibana是一个开源的分析与可视化平台,可以使用各种不同的图表、表格、地图等kibana能够很轻易地展示高级数据分析与可视化。

    安装并启动Kibana:

[-> #~ ] wget https://artifacts.elastic.co/downloads/kibana/kibana-6.4.2-x86_64.rpm
[-> #~ ] yum -y localinstall kibana-6.4.2-x86_64.rpm

    # 在启动Kibana之前,我们先修改kibana的配置文件,修改完成后再启动kibana服务
[-> #~ ] vim /opt/kibana/config/kibana.yml
    server.port: 5601         # kibana的端口,切记一定不能修改端口,此端口是被写死在文件内的
    server.host: "0.0.0.0"    #kibana的服务地址,可以是本地IP地址
    elasticsearch.url: "http://192.168.1.11:9200"   "    #ES的地址
    kibana.index: ".kibana"        #kibana的索引名称
    kibana.defaultAppId: "discover"
    elasticsearch.pingTimeout: 1500        #连通ES的超时时间
    elasticsearch.requestTimeoumeout: 30000    #ES请求超时时间
    elasticsearch.startupTimeoumeout: 5000

[-> #~ ] systemctl start kibana    # 启动kibana服务
[-> #~ ] systemctl enable kiban    # 开机自启动

   如果出现了如下图,则说明kibana服务跟Es集群已连通:

    接下来我们来访问kibana “ http://192.168.4.1:5601 ”,如下图:

    既然,Kibana已经搭建成功,那么接下来就是如何的使用Kibana,我们都知道Kibana是一个可视化的工具,它能制作不同的仪表盘以直观的方式展示。下面我们就来看看kibana是如何使用的。

    一、填写需要制作的index名称:

    二、选择时间段查看数据:

    三、选择图形类型制作仪表盘

    四、制作仪表盘

五、最后查看保存的仪表盘

    相对比杂乱无章的日志数据,以仪表盘的形式显现是不是更加清晰明了,且直击重点。在现如今每年全球的数据都会呈2倍的趋势增长的大数据时代。能够在海量的数据中能够实时且快速找到想要的重点,并且还能实时进行分布式的监控。或许这才是最为被大众所接受的。

  至此,ELK+Kafka架构已经搭建完成!

 本文旨在提供参考,如有错误,欢迎大家指正。帮助编者不断的改进!