golang构建异步处理队列

目标
使用golang+redis完成同resque的异步处理队列。完成后台任务的异步调用
主要功能: 
   1. 异步调用任务
   2. 队列状态查询
redis数据结构及操作方法
1. 队列集合(固定大小的set) 默认有default队列 (不同业务不同队列)
       数据:
       queueName

       操作方法:
       add
       del
       list
       isset

2. 队列meta(可扩展的hash) (每个队列一个meta描述,记录优先级等信息)
       数据:
       priority 

       操作方法:
       set
       get
       del

3. 任务队列
    任务队列:(redis的queue)
    操作方法:
       push
       pull
       clear

       状态数据记录:
       len             --- 待处理
       runnging        --- 运行中
       finish          --- 已完成
       error            --- 错误   ( 每个队列对应一个errorlog)
接入
1. 添加服务worker
       同worker/log/log.go,实现Tasker接口,
       并且通过init函数注册,
       通过listservice验证是否注册成功
2. 添加队列: addqueue, listqueue验证
3. 往新添加的队列中push任务(post:json)
        字段: Worker 对应添加的服务 
             Params 服务所需所有参数,服务自己解析
4. 每个队列会有一个日志/tmp/goqueue/queue.log.Ymd (方便debug)
实现难点
1.根据队列任务的不同worker调用对应方法(同telegraf插件实现)


任务处理实现目录结构:
worker/
|-- all
|   `-- all.go
|-- log
|   `-- log.go
`-- worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
worker.go 定义任务接口Tasker,并初始化全局map
##########################
package worker
type Tasker interface {
// Worker method
Worker(string)
}
type Creator func() Tasker
var WKMap = map[string]Creator{}
func Add(name string, creator Creator) {
WKMap[name] = creator
}
##########################
all.go 调用功能worker的初始化
##########################
package all
import(
_ "worker/log"
)
##########################
log.go: 简单一个example
##########################
package log
import (
w "worker"
"logger"
)
func init() {
w.Add("log", func() w.Tasker {
return &Log{}
})
}
type Log struct{}
func (this *Log) Worker(params string) {
logger.Infoln(params)
}
##########################
调用:
##########################
if f,ok := w.WKMap[m.Worker]; ok {
w := f()
w.Worker(m.Params)
}
##########################
2. 队列任务过多时,不能无限制创建goroutine.必须控制goroutine数量
#######
type RunningTask struct {
  Queue      string
  Goroutines int            // 每个任务队列一个goroutine记录
  GrtMux     sync.RWMutex
  Shutdown   chan int
}


// 开启新的goroutine,先判断限制,OK的话GRT加1,否则等待
if grt := this.GetGRTNum(); grt < c.TASK_QUEUE_GRT_LIMIT {
      go this.RunTask(stateCh)
      this.ChangeGRTNum(true)
}


func (this *RunningTask) RunTask(stateCh chan *TaskStateCount) {
  defer this.ChangeGRTNum(false)        // 每个任务结束  defer 对应的goroutine数量减1
#######