telegraf插件化实现

tick监控栈

tick stack

Telegraf插件化实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
telegraf插件加载及调用: (nginx为例,别的插件类似)
一.功能插件
1. nginx.go
# // telegraf.go import : _ "github.com/influxdata/telegraf/plugins/inputs/all"
func init() {
inputs.Add("nginx", func() telegraf.Input {
return &Nginx{}
})
}
#
2. registry.go
#
package inputs
import "github.com/influxdata/telegraf"
type Creator func() telegraf.Input  //采集数据Input接口定义,主要方法:Gather (参考input.go)
注意: Creator 是函数类型,对应函数返回的是 满足Input接口的类型
var Inputs = map[string]Creator{} //插件集合映射, map["Nginx"]&Nginx{}
func Add(name string, creator Creator) {
Inputs[name] = creator
}
#
二.telegraf代理加载配置
1. 加载配置文件
#
c.LoadConfig(*fConfig) // telegraf.go
#
2.
#
c.LoadConfig() ---> c.addInput()
creator, ok := inputs.Inputs[name] // 全局Inputs
if !ok {
return fmt.Errorf("Undefined but requested input: %s", name)
}
input := creator()
rp := &internal_models.RunningInput{ //
Name: name,
Input: input,
Config: pluginConfig,
}
c.Inputs = append(c.Inputs, rp)
#
# running_input.go
type RunningInput struct {
Name string
Input telegraf.Input
Config *InputConfig
}
// InputConfig containing a name, interval, and filter
type InputConfig struct {
Name string
NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
Interval time.Duration
}
#
#
type Config struct {
Tags map[string]string
InputFilters []string
OutputFilters []string
Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
}
#
三.telegraf调用插件
1.入口: telegraf.go
#
ag, err := agent.NewAgent(c) // c 全局配置
ag.Run(shutdown)
#
2. ag.Run() // agent.go
#
// 优先启动服务类插件
//
//
// 插件类 gatherSeparate // 每个插件对应一个goroutine 各走各的计时器
//
//
// 代理并发 gatherParallel
#
3. 插件类
#
func (a *Agent) gatherSeparate(
shutdown chan struct{},
input *internal_models.RunningInput,
metricC chan telegraf.Metric,
) {
input.Input.Gather(acc) ---> package nginx; Gather()
}
#
四.数据写入: (channel通信传递) 10000带缓冲的通道
#
for _, o := range a.Config.Outputs { // 与inputs初始化类似 NewRunningOutput()
o.AddMetric(m)
}
#
#
type RunningOutput struct { // 类型组合 RunningOutput 除了AddMetric()方法外 还有
// 具体output的实现方法
Name string
Output telegraf.Output // Output interface 参考: output.go
Config *OutputConfig
Quiet bool
MetricBufferLimit int
MetricBatchSize int
metrics *buffer.Buffer
failMetrics *buffer.Buffer
}
#
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
err := ro.write(batch) --> influxdata
}
type Output interface {
// Connect to the Output
Connect() error
// Close any connections to the Output
Close() error
// Description returns a one-sentence description on the Output
Description() string
// SampleConfig returns the default configuration of the Output
SampleConfig() string
// Write takes in group of points to be written to the Output
Write(metrics []Metric) error
}