EK实时日志分析

需求

之前负责处理客户端错误日志处理工作,对结果进行汇总报表并做一下可视化展示。常规日志处理流程:客户端通过打点接口将原始日志提交到前端机上,然后日志收集agent将日志集中到中心节点,中心节点将日志输出到消息队列中,然后各种数据统计,数据分析程序从消息队列直接消费处理,并持久化存储。如图所示(图示不涉及一些负载均衡,HA方案):
client1     \
             (api) Server1 ( log collect agent1)     \                                                      HDFS
client2    /                                         \                                            /
                                         (log collect center) -> MQ  -> Mysql ->日志处理,数据分析
client3    \                                       /                                              \
            (api) Server1 ( log collect agent2)   /                                                       .........
client4    /
我们选择将原始日志消费到mysql做持久化,也是通过mysql分析数据。将原始日志解析,不必要信息去重,将重要的字段直接mysql存储并对其索引。通过group by聚合数据,并同时Echarts对其可视化展示。刚开始速度还可以。日志每天增加2G,过了一段时间,运营反映页面卡死了,发现不能实时聚合数据,便使用redis对其缓存,使用脚本定期刷新缓存(目前日志180G)。(大量group by导致mysql 出现多个Sending data,影响同库表的查询,故对错误日志相关做了一次迁库).

通过mysql分析日志方案不足:
      1.实时性:数据量大时,不能实时汇总
      2.扩展性:当需要对一个维度汇总时,又是加索引,改程序,改页面

引入ELK

这里便引入了EK,说到EK,就是大家说的没有logstash的ELK! ELK stack包括如下:

 ElasticSearch:  搜索引擎。能够达到实时搜索,稳定,可靠,快速,安装使用方便。支持通过HTTP使用JSON进行数据索引。

 Logstash: 对应用程序日志进行收集管理,提供 Web 接口用于查询和统计。日志收集使用flume,scribe

 Kibana: 基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因

使用组合

1. elasticsearch

2. elasticsearch + kibana

3. logstash

4. elasticsearch + logstash + kibana

ES实践

结合现有环境日志收集这些我们已经有解决方案,直接使用组合2:EK分析数据即可:

1.使用elasticsearch索引数据

2.kibana负责可视化展示

一:ElasticSearch关键部分

1.与关系型数据库对比:

            Index                <--->   Database
            Mapping              <--->   Schema
            Index.Type           <--->   Table
            Document             <--->   Table.Row
            Document.Field       <--->   Table.Column

2.基于json,快速索引文档,(开发者要做的只是插入数据,检索数据)

        导入数据示例:
1
2
3
4
5
6
7
8
9
10
11
$baseUri = 'http://XX.XX.XX.XX:9200' . '/'.self::ES_INDEX.'/'.self::ES_TYPE.'/' . $fields['id'];
$ci = curl_init();
curl_setopt($ci, CURLOPT_URL, $baseUri);
curl_setopt($ci, CURLOPT_TIMEOUT, 200);
curl_setopt($ci, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ci, CURLOPT_FORBID_REUSE, 0);
curl_setopt($ci, CURLOPT_CUSTOMREQUEST, 'PUT');
curl_setopt($ci, CURLOPT_POSTFIELDS, json_encode($fields));
$response = curl_exec($ci);
curl_close($ci);
3.聚合支持

         Metrics Aggregations

              min, max, avg,stats, extended_stats,  value_count

         Bucket Aggregations

              term,range, date_range,ipv4_range,histogram, date_histogram

      聚合示例:


 4.bulk批量操作

  之前以为es bulk操作直接将操作的放到二维数组json即可,结果不行,网上查了一下需要在每个json后添加"\n"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private function fmtBulkData($bulkItem) {
$bulkData = '';
foreach ( $bulkItem as $item) {
$bulkData .= json_encode(array(
'delete' => array(
'_index' => self::ES_INDEX,
'_type' => self::ES_TYPE,
'_id' => $item,
),
)) . "\n";
}
return $bulkData;
}
private function doDelBulk($bulkData) {
$baseUri = 'http://XX.XX.XX.XX:9200' . '/_bulk';
$ci = curl_init();
curl_setopt($ci, CURLOPT_URL, $baseUri);
curl_setopt($ci, CURLOPT_TIMEOUT, 200);
curl_setopt($ci, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ci, CURLOPT_FORBID_REUSE, 0);
curl_setopt($ci, CURLOPT_CUSTOMREQUEST, 'POST');
curl_setopt($ci, CURLOPT_POSTFIELDS, $bulkData);
$response = curl_exec($ci);
curl_close($ci);
}

注意: 在一个index中,若一个field被自动映射成一种类型,之后就不能导入其他类型的数据。

Kibana功能介绍:

1.Discover 提供对其连接的elasticsearch中的index的检索支持(关键字过滤,index选择,过滤条件保存等等)
discover

2.Visualize:提供对索引数据聚合处理,并提供扇形图,条形图等等多种展示
visualize

3.Dashboard:对建立的Visualize模块化管理(可以在Setting中导出或导入)
dashboard

4.Setting:关联elasticsearch中的索引,以及对 Discover,Visualize,Dashboard的管理设置功能

#####使用EK分析日志解决了实时性,可扩展性!以后多一个维度就一个Visualize即可!

参考:
http://es.xiaoleilu.com/030_Data/55_Bulk.html
http://kibana.logstash.es/content/kibana/v4/discover.html