telegraf插件加载及调用: (nginx为例,别的插件类似)
一.功能插件
1. nginx.go
# // telegraf.go import : _ "github.com/influxdata/telegraf/plugins/inputs/all"
func init() {
inputs.Add("nginx", func() telegraf.Input {
return &Nginx{}
})
}
#
2. registry.go
#
package inputs
import "github.com/influxdata/telegraf"
type Creator func() telegraf.Input //采集数据Input接口定义,主要方法:Gather (参考input.go)
注意: Creator 是函数类型,对应函数返回的是 满足Input接口的类型
var Inputs = map[string]Creator{} //插件集合映射, map["Nginx"]&Nginx{}
func Add(name string, creator Creator) {
Inputs[name] = creator
}
#
二.telegraf代理加载配置
1. 加载配置文件
#
c.LoadConfig(*fConfig) // telegraf.go
#
2.
#
c.LoadConfig() ---> c.addInput()
creator, ok := inputs.Inputs[name] // 全局Inputs
if !ok {
return fmt.Errorf("Undefined but requested input: %s", name)
}
input := creator()
rp := &internal_models.RunningInput{ //
Name: name,
Input: input,
Config: pluginConfig,
}
c.Inputs = append(c.Inputs, rp)
#
# running_input.go
type RunningInput struct {
Name string
Input telegraf.Input
Config *InputConfig
}
// InputConfig containing a name, interval, and filter
type InputConfig struct {
Name string
NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
Interval time.Duration
}
#
#
type Config struct {
Tags map[string]string
InputFilters []string
OutputFilters []string
Agent *AgentConfig
Inputs []*internal_models.RunningInput
Outputs []*internal_models.RunningOutput
}
#
三.telegraf调用插件
1.入口: telegraf.go
#
ag, err := agent.NewAgent(c) // c 全局配置
ag.Run(shutdown)
#
2. ag.Run() // agent.go
#
// 优先启动服务类插件
//
//
// 插件类 gatherSeparate // 每个插件对应一个goroutine 各走各的计时器
//
//
// 代理并发 gatherParallel
#
3. 插件类
#
func (a *Agent) gatherSeparate(
shutdown chan struct{},
input *internal_models.RunningInput,
metricC chan telegraf.Metric,
) {
input.Input.Gather(acc) ---> package nginx; Gather()
}
#
四.数据写入: (channel通信传递) 10000带缓冲的通道
#
for _, o := range a.Config.Outputs { // 与inputs初始化类似 NewRunningOutput()
o.AddMetric(m)
}
#
#
type RunningOutput struct { // 类型组合 RunningOutput 除了AddMetric()方法外 还有
// 具体output的实现方法
Name string
Output telegraf.Output // Output interface 参考: output.go
Config *OutputConfig
Quiet bool
MetricBufferLimit int
MetricBatchSize int
metrics *buffer.Buffer
failMetrics *buffer.Buffer
}
#
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
err := ro.write(batch) --> influxdata
}
type Output interface {
// Connect to the Output
Connect() error
// Close any connections to the Output
Close() error
// Description returns a one-sentence description on the Output
Description() string
// SampleConfig returns the default configuration of the Output
SampleConfig() string
// Write takes in group of points to be written to the Output
Write(metrics []Metric) error
}