nginx日志实时分析

需求
目的:实时监控web访问状态,准确定位线上故障。
已有方案: 
1.ELK stack : 采集数据到es中,es对每条日志索引存储。通过kibana多维度展示web
            状态。但是若web站点日志比较多。随之监控系统本身存储也将增大。
2.一些日志处理命令 : 可以直接查看当前访问情况,比较方便。但是无数据存储。无法查看
            历史状态。
方案
自己解析log实时解析。像ops监控server一样统一接口单位时间指标(访问量,错误数,
后台处理时间,响应时间)。然后实时统一push到监控系统的存储。通过dashboard查看。
type EMeta struct {
    Recv      int    // 接受字节大小
    Sent      int    // 发送字节大小
    Inter     string    // api
    Time      string    // 用于区分分钟时间
    Status    int        // 状态码
    ReqTime   float64    // 响应时间
    UpReqTime float64    // 处理时间
}
实现
1. nignx日志实时更新。需要一个tail功能
2. 解析日志。 接口名称,响应时间,响应码等等。
3. 单位时间内的统计,并求平均
4. 上报监控系统
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (*Alog) Gather() error { // 处理入口
go TailLog() // github.com/hpcloud/tail
/**
for {
t, _ := tail.TailFile(LogPathOB, tail.Config{Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}}) // 在一个for,防止日志切割
for line := range t.Lines {
LineCh <- line.Text // 通过一个有缓冲的ch传输到日志解析协程
}
fmt.Println("lf change!!!")
time.Sleep(10 * time.Second)
}
**/
go LogHandler()
/**
for {
line := <-LineCh
entity, _ := p.ParseString(line) // 解析日志到entity结构
EntityCh <- entity // 进入HandlerEveryMin
}
**/
go HandlerEveryMin()
/**
分钟内数据进行汇总统计
通过channel传入 SummaryGRT
**/
go SummaryGRT()
/**
sort() // 根据访问量取出topN
push() // 上报监控存储机
**/
select {}
}

图床域名挂了!

之前一个周末公司的图床域名不知道怎么回事挂了,直接造成App,Pc页面白了。唯一解决只能把
域名替换了。怎么替换?同事当时直接在框架处理完成之后,直接做字符串替换的。结果发现没用
。如下是index.php 框架处理入口。

1
2
3
4
5
ob_start();
$webApp->run();
$body = ob_get_clean();
changeImgDomain($body); // 临时处理,发现没用
echo $body;

最后,发现很多控制器里面,在输出数据之后直接die()结束处理流程了。也就是 $webApp()->run()
下面的没有执行了。幸亏当时使用lua直接在nginx里面直接替换响应数据里的域名了(运气好:1.找到
直接nginx里面处理的方案 2.线上有nignx+lua使用,环境OK),先把问题给解决了。

一些想法
1. 项目开发采用的框架,最好可以对直接杜绝die(),exit()来规范开发者。
2. 不能满足你的话。最好项目组可以自定义统一输出的方法。

golang构建异步处理队列

目标
使用golang+redis完成同resque的异步处理队列。完成后台任务的异步调用
主要功能: 
   1. 异步调用任务
   2. 队列状态查询
redis数据结构及操作方法
1. 队列集合(固定大小的set) 默认有default队列 (不同业务不同队列)
       数据:
       queueName

       操作方法:
       add
       del
       list
       isset

2. 队列meta(可扩展的hash) (每个队列一个meta描述,记录优先级等信息)
       数据:
       priority 

       操作方法:
       set
       get
       del

3. 任务队列
    任务队列:(redis的queue)
    操作方法:
       push
       pull
       clear

       状态数据记录:
       len             --- 待处理
       runnging        --- 运行中
       finish          --- 已完成
       error            --- 错误   ( 每个队列对应一个errorlog)
接入
1. 添加服务worker
       同worker/log/log.go,实现Tasker接口,
       并且通过init函数注册,
       通过listservice验证是否注册成功
2. 添加队列: addqueue, listqueue验证
3. 往新添加的队列中push任务(post:json)
        字段: Worker 对应添加的服务 
             Params 服务所需所有参数,服务自己解析
4. 每个队列会有一个日志/tmp/goqueue/queue.log.Ymd (方便debug)
实现难点
1.根据队列任务的不同worker调用对应方法(同telegraf插件实现)


任务处理实现目录结构:
worker/
|-- all
|   `-- all.go
|-- log
|   `-- log.go
`-- worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
worker.go 定义任务接口Tasker,并初始化全局map
##########################
package worker
type Tasker interface {
// Worker method
Worker(string)
}
type Creator func() Tasker
var WKMap = map[string]Creator{}
func Add(name string, creator Creator) {
WKMap[name] = creator
}
##########################
all.go 调用功能worker的初始化
##########################
package all
import(
_ "worker/log"
)
##########################
log.go: 简单一个example
##########################
package log
import (
w "worker"
"logger"
)
func init() {
w.Add("log", func() w.Tasker {
return &Log{}
})
}
type Log struct{}
func (this *Log) Worker(params string) {
logger.Infoln(params)
}
##########################
调用:
##########################
if f,ok := w.WKMap[m.Worker]; ok {
w := f()
w.Worker(m.Params)
}
##########################
2. 队列任务过多时,不能无限制创建goroutine.必须控制goroutine数量
#######
type RunningTask struct {
  Queue      string
  Goroutines int            // 每个任务队列一个goroutine记录
  GrtMux     sync.RWMutex
  Shutdown   chan int
}


// 开启新的goroutine,先判断限制,OK的话GRT加1,否则等待
if grt := this.GetGRTNum(); grt < c.TASK_QUEUE_GRT_LIMIT {
      go this.RunTask(stateCh)
      this.ChangeGRTNum(true)
}


func (this *RunningTask) RunTask(stateCh chan *TaskStateCount) {
  defer this.ChangeGRTNum(false)        // 每个任务结束  defer 对应的goroutine数量减1
#######

对比tick与open-falcon

架构图

open-falcon

open-falcon

tick

tick

费用

F: 开源免费
T: 部分免费,企业版特殊收费

扩展性

F: 大多组件可以水平扩展
T: 免费版 influxDB单点,不支持集群

Agent

F: agent留有api接口,用户可以直接上报数据到agent.这个设计也比较灵活
T: 所有收集项都预编进telegraf,根据配置启动

部署

F: 组件多扩展性好。部署的组件也就多。部署成本高一些
T: 免费版就四个组件。企业版目前不了解

易用性

F: 报警,用户管理,收集数据等比较通用
T: 对于agent不支持不方便接入。但是telegraf支持多个output

数据模型

数据模型tick优于open-falcon的。对于一个接口的数据可能有访问量,错误数,响应时间,上层处理时间等。tick一条数据直接包含所有指标。open-falcon需要多个。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
open-falcon
{
"endpoint": "test-endpoint",
"metric": "test-metric2",
"timestamp": ts,
"step": 60,
"value": 2,
"counterType": "GAUGE",
"tags": "idc=lg,loc=beijing",
}
tick
{
"metric" : "api",
"fileds" : visit=3,ok=2,err=1,reqtime=0.3
"tags" : idc=lg,api=/index/index
"timestamp" :
}

EK实时日志分析

需求

之前负责处理客户端错误日志处理工作,对结果进行汇总报表并做一下可视化展示。常规日志处理流程:客户端通过打点接口将原始日志提交到前端机上,然后日志收集agent将日志集中到中心节点,中心节点将日志输出到消息队列中,然后各种数据统计,数据分析程序从消息队列直接消费处理,并持久化存储。如图所示(图示不涉及一些负载均衡,HA方案):
client1     \
             (api) Server1 ( log collect agent1)     \                                                      HDFS
client2    /                                         \                                            /
                                         (log collect center) -> MQ  -> Mysql ->日志处理,数据分析
client3    \                                       /                                              \
            (api) Server1 ( log collect agent2)   /                                                       .........
client4    /
我们选择将原始日志消费到mysql做持久化,也是通过mysql分析数据。将原始日志解析,不必要信息去重,将重要的字段直接mysql存储并对其索引。通过group by聚合数据,并同时Echarts对其可视化展示。刚开始速度还可以。日志每天增加2G,过了一段时间,运营反映页面卡死了,发现不能实时聚合数据,便使用redis对其缓存,使用脚本定期刷新缓存(目前日志180G)。(大量group by导致mysql 出现多个Sending data,影响同库表的查询,故对错误日志相关做了一次迁库).

通过mysql分析日志方案不足:
      1.实时性:数据量大时,不能实时汇总
      2.扩展性:当需要对一个维度汇总时,又是加索引,改程序,改页面

引入ELK

这里便引入了EK,说到EK,就是大家说的没有logstash的ELK! ELK stack包括如下:

 ElasticSearch:  搜索引擎。能够达到实时搜索,稳定,可靠,快速,安装使用方便。支持通过HTTP使用JSON进行数据索引。

 Logstash: 对应用程序日志进行收集管理,提供 Web 接口用于查询和统计。日志收集使用flume,scribe

 Kibana: 基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因

使用组合

1. elasticsearch

2. elasticsearch + kibana

3. logstash

4. elasticsearch + logstash + kibana

ES实践

结合现有环境日志收集这些我们已经有解决方案,直接使用组合2:EK分析数据即可:

1.使用elasticsearch索引数据

2.kibana负责可视化展示

一:ElasticSearch关键部分

1.与关系型数据库对比:

            Index                <--->   Database
            Mapping              <--->   Schema
            Index.Type           <--->   Table
            Document             <--->   Table.Row
            Document.Field       <--->   Table.Column

2.基于json,快速索引文档,(开发者要做的只是插入数据,检索数据)

        导入数据示例:
1
2
3
4
5
6
7
8
9
10
11
$baseUri = 'http://XX.XX.XX.XX:9200' . '/'.self::ES_INDEX.'/'.self::ES_TYPE.'/' . $fields['id'];
$ci = curl_init();
curl_setopt($ci, CURLOPT_URL, $baseUri);
curl_setopt($ci, CURLOPT_TIMEOUT, 200);
curl_setopt($ci, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ci, CURLOPT_FORBID_REUSE, 0);
curl_setopt($ci, CURLOPT_CUSTOMREQUEST, 'PUT');
curl_setopt($ci, CURLOPT_POSTFIELDS, json_encode($fields));
$response = curl_exec($ci);
curl_close($ci);
3.聚合支持

         Metrics Aggregations

              min, max, avg,stats, extended_stats,  value_count

         Bucket Aggregations

              term,range, date_range,ipv4_range,histogram, date_histogram

      聚合示例:


 4.bulk批量操作

  之前以为es bulk操作直接将操作的放到二维数组json即可,结果不行,网上查了一下需要在每个json后添加"\n"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private function fmtBulkData($bulkItem) {
$bulkData = '';
foreach ( $bulkItem as $item) {
$bulkData .= json_encode(array(
'delete' => array(
'_index' => self::ES_INDEX,
'_type' => self::ES_TYPE,
'_id' => $item,
),
)) . "\n";
}
return $bulkData;
}
private function doDelBulk($bulkData) {
$baseUri = 'http://XX.XX.XX.XX:9200' . '/_bulk';
$ci = curl_init();
curl_setopt($ci, CURLOPT_URL, $baseUri);
curl_setopt($ci, CURLOPT_TIMEOUT, 200);
curl_setopt($ci, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ci, CURLOPT_FORBID_REUSE, 0);
curl_setopt($ci, CURLOPT_CUSTOMREQUEST, 'POST');
curl_setopt($ci, CURLOPT_POSTFIELDS, $bulkData);
$response = curl_exec($ci);
curl_close($ci);
}

注意: 在一个index中,若一个field被自动映射成一种类型,之后就不能导入其他类型的数据。

Kibana功能介绍:

1.Discover 提供对其连接的elasticsearch中的index的检索支持(关键字过滤,index选择,过滤条件保存等等)
discover

2.Visualize:提供对索引数据聚合处理,并提供扇形图,条形图等等多种展示
visualize

3.Dashboard:对建立的Visualize模块化管理(可以在Setting中导出或导入)
dashboard

4.Setting:关联elasticsearch中的索引,以及对 Discover,Visualize,Dashboard的管理设置功能

#####使用EK分析日志解决了实时性,可扩展性!以后多一个维度就一个Visualize即可!

参考:
http://es.xiaoleilu.com/030_Data/55_Bulk.html
http://kibana.logstash.es/content/kibana/v4/discover.html