目标
使用golang+redis完成同resque的异步处理队列。完成后台任务的异步调用
主要功能:
1. 异步调用任务
2. 队列状态查询
redis数据结构及操作方法
1. 队列集合(固定大小的set) 默认有default队列 (不同业务不同队列)
数据:
queueName
操作方法:
add
del
list
isset
2. 队列meta(可扩展的hash) (每个队列一个meta描述,记录优先级等信息)
数据:
priority
操作方法:
set
get
del
3. 任务队列
任务队列:(redis的queue)
操作方法:
push
pull
clear
状态数据记录:
len --- 待处理
runnging --- 运行中
finish --- 已完成
error --- 错误 ( 每个队列对应一个errorlog)
接入
1. 添加服务worker
同worker/log/log.go,实现Tasker接口,
并且通过init函数注册,
通过listservice验证是否注册成功
2. 添加队列: addqueue, listqueue验证
3. 往新添加的队列中push任务(post:json)
字段: Worker 对应添加的服务
Params 服务所需所有参数,服务自己解析
4. 每个队列会有一个日志/tmp/goqueue/queue.log.Ymd (方便debug)
实现难点
1.根据队列任务的不同worker调用对应方法(同telegraf插件实现)
任务处理实现目录结构:
worker/
|-- all
| `-- all.go
|-- log
| `-- log.go
`-- worker.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| worker.go 定义任务接口Tasker,并初始化全局map ########################## package worker type Tasker interface { // Worker method Worker(string) } type Creator func() Tasker var WKMap = map[string]Creator{} func Add(name string, creator Creator) { WKMap[name] = creator } ########################## all.go 调用功能worker的初始化 ########################## package all import( _ "worker/log" ) ########################## log.go: 简单一个example ########################## package log import ( w "worker" "logger" ) func init() { w.Add("log", func() w.Tasker { return &Log{} }) } type Log struct{} func (this *Log) Worker(params string) { logger.Infoln(params) } ########################## 调用: ########################## if f,ok := w.WKMap[m.Worker]; ok { w := f() w.Worker(m.Params) } ##########################
|
2. 队列任务过多时,不能无限制创建goroutine.必须控制goroutine数量
#######
type RunningTask struct {
Queue string
Goroutines int // 每个任务队列一个goroutine记录
GrtMux sync.RWMutex
Shutdown chan int
}
// 开启新的goroutine,先判断限制,OK的话GRT加1,否则等待
if grt := this.GetGRTNum(); grt < c.TASK_QUEUE_GRT_LIMIT {
go this.RunTask(stateCh)
this.ChangeGRTNum(true)
}
func (this *RunningTask) RunTask(stateCh chan *TaskStateCount) {
defer this.ChangeGRTNum(false) // 每个任务结束 defer 对应的goroutine数量减1
#######