telegraf插件化实现

tick监控栈

tick stack

Telegraf插件化实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
telegraf插件加载及调用: (nginx为例,别的插件类似)
一.功能插件
1. nginx.go
# // telegraf.go import : _ "github.com/influxdata/telegraf/plugins/inputs/all"
func init() {
inputs.Add("nginx", func() telegraf.Input {
return &Nginx{}
})
}
#
2. registry.go
#
package inputs
import "github.com/influxdata/telegraf"
type Creator func() telegraf.Input  //采集数据Input接口定义,主要方法:Gather (参考input.go)
注意: Creator 是函数类型,对应函数返回的是 满足Input接口的类型
var Inputs = map[string]Creator{} //插件集合映射, map["Nginx"]&Nginx{}
func Add(name string, creator Creator) {
Inputs[name] = creator
}
#
二.telegraf代理加载配置
1. 加载配置文件
#
c.LoadConfig(*fConfig) // telegraf.go
#
2.
#
c.LoadConfig() ---> c.addInput()
creator, ok := inputs.Inputs[name] // 全局Inputs
if !ok {
return fmt.Errorf("Undefined but requested input: %s", name)
}
input := creator()
rp := &internal_models.RunningInput{ //
Name: name,
Input: input,
Config: pluginConfig,
}
c.Inputs = append(c.Inputs, rp)
#
# running_input.go
type RunningInput struct {
Name string
Input telegraf.Input
Config *InputConfig
}
// InputConfig containing a name, interval, and filter
type InputConfig struct {
Name string
NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
Interval time.Duration
}
#
#
type Config struct {
Tags map[string]string
InputFilters []string
OutputFilters []string
Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
}
#
三.telegraf调用插件
1.入口: telegraf.go
#
ag, err := agent.NewAgent(c) // c 全局配置
ag.Run(shutdown)
#
2. ag.Run() // agent.go
#
// 优先启动服务类插件
//
//
// 插件类 gatherSeparate // 每个插件对应一个goroutine 各走各的计时器
//
//
// 代理并发 gatherParallel
#
3. 插件类
#
func (a *Agent) gatherSeparate(
shutdown chan struct{},
input *internal_models.RunningInput,
metricC chan telegraf.Metric,
) {
input.Input.Gather(acc) ---> package nginx; Gather()
}
#
四.数据写入: (channel通信传递) 10000带缓冲的通道
#
for _, o := range a.Config.Outputs { // 与inputs初始化类似 NewRunningOutput()
o.AddMetric(m)
}
#
#
type RunningOutput struct { // 类型组合 RunningOutput 除了AddMetric()方法外 还有
// 具体output的实现方法
Name string
Output telegraf.Output // Output interface 参考: output.go
Config *OutputConfig
Quiet bool
MetricBufferLimit int
MetricBatchSize int
metrics *buffer.Buffer
failMetrics *buffer.Buffer
}
#
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
err := ro.write(batch) --> influxdata
}
type Output interface {
// Connect to the Output
Connect() error
// Close any connections to the Output
Close() error
// Description returns a one-sentence description on the Output
Description() string
// SampleConfig returns the default configuration of the Output
SampleConfig() string
// Write takes in group of points to be written to the Output
Write(metrics []Metric) error
}

php接口优化:合并请求

1.客户端与服务端之间协商

 在应用开发中,服务端与客户端完成功能的同时也得注意应用性能,用户体验。记得在我刚进部门的时候接手一个内部应用的接口开发工作。就一个App首页,客户端不停的要求各部分的接口,我也是一个一个的快速响应。最后发现应用越来越不如意了。还准备加缓存呢。leader当时帮忙抓包分析,一眼看出问题所在。接口太多了,一个首页先拿到一个接口的数据再根据里面的数据请求下一个接口,直到所有准备就绪,客户端才正常展示。😓 后来和客户端一起协商合并了一些接口处理,App就ok了.
接口开发工作中应该与客户端协商好接口。具体是情况而定:
    1.首页,分类页,App一键切换的页面进可能使用较少的接口提供数据(通过不同字段区分各组件所需的数据)
    2.客户端有时请求的可能只是一个配置信息。但是却没有正确的请求时机。这个也可以考虑和别的接口进行合并处理

2.使用curl_multi请求第三方服务

今天看到同事接手的一个项目出问题了。上去一看,一个接口依赖7,8个接口。每步都是串行处理,更6的是还有一个foreach里面不知道有多少个curl访问。php-fpm不支持fork并发处理,在使用没有支持异步处理的框架的时候。我们可以使用curl_multi并发请求依赖服务。之前串行处理1s,可能现在只需要100ms就可以响应了。像下面直接传入多个url,得到Arr[url_key]=Response 的数组。自己合并就行。也可以适当加入重试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private function getMultiUrlContents($urls, $timeout = 1) {
$mh = curl_multi_init();
$chArray = array();
foreach ($urls as $i => $url) {
$chArray[$i] = curl_init($url);
curl_setopt($chArray[$i], CURLOPT_CONNECTTIMEOUT, $timeout);
curl_setopt($chArray[$i], CURLOPT_RETURNTRANSFER, true);
curl_multi_add_handle($mh, $chArray[$i]);
}
do {
curl_multi_exec($mh, $running);
curl_multi_select($mh);
} while ($running > 0);
$res = array();
foreach ($urls as $i => $url) {
$res[$url] = curl_multi_getcontent($chArray[$i]);
}
foreach ($urls as $i => $url) {
curl_multi_remove_handle($mh, $chArray[$i]);
}
curl_multi_close($mh);
return $res;
}

3.使用redis管道

同样在请求redis的时候也可以使用管道合并请求。

未合并前:请求数量由count($keys)决定

1
2
3
foreach ($keys as $v) {
$all[$v] = $redis->hgetall($v);
}

合并后:只有一个请求

1
2
3
4
5
$redis->multi(Redis::PIPELINE);
foreach ($keys as $v) {
$redis->hgetall($v);
}
$all = $redis->exec();

4.尽量使用join

有时要同两个不同表里面获取关联数据。直接join就行
1
select B.id from A left join A on A.id=B.Aid;

并非下面的操作:

1
2
3
4
5
list = select id from A;
for id in list
do
select id from b where Aid=id
done

总结

> 接口开发中合并请求直接带来的是效率,响应速度。但是在实现上开发者必须多一下处理。像提到的第一点中不光在技术上处理,更在设计上多思考。 

《高扩展性网站的50条原则》读书笔记

一.化简方程

1.不要过度设计
2.设计时就考虑扩展性
3.把方案一简再简
4.减少dns查找
5.尽可能减少对象
6.使用同一品牌的网络设备

二.分布工作

7.横向复制(x轴原则)
8.拆分不同的东西(y轴原则)  动词拆分操作,名词拆分资源
9.拆分相近的东西 (数据切片)

三.横向扩展设计

10.设计横向扩展方案
11.采用经济型系统
12.横向扩展数据中心
13.利用云计算进行设计

四.使用正确的工具

14.合理使用数据库
15.防火墙,到处都是防火墙
16.积极利用日志文件

五.不要重复工作

17.不要立即检查刚做过的工作
18.停止重定向
19.放松时序约束

六.积极使用缓存

20.利用cdn
21.使用过期头
22.缓存ajax调用
23.利用页面缓存
24.利用应用缓存
25.利用对象缓存
26.把对象缓存到自己的层上

七.从错误中吸取教训

27.积极地学习
28.不要依靠qa发现失误
29.没有回退功能的设计是失败的设计
30.讨论失败并从中吸取教训

八.数据库原则

31.注意代价高的关系
32.使用类型正确的数据库锁
33.不要使用多阶段提交
34.不要使用select for update
35.不要选择所有数据

九.容错设计与故障控制

36.采取隔离故障的泳道
37.绝对不要信任单点鼓掌
38.避免系统串联
39.确保能够启用禁用功能

十.避免或分发状态

40.努力实现无状态
41.尽量在浏览器端维护会话
42.利用分布式缓存存放状态

十一.异步通信与总线

43.尽可能使用异步通信
44.确保消息总线能够扩展
45.避免消息总线过度拥挤

十二.其他原则

46.慎用第三方解决方案扩展
47.清除 归档 成本合理的存储
48.删除事务处理中的商业智能
49.设计能够监控的应用
50.要能胜任