Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add log nsq backend and nsq publish supported #12

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add all test
  • Loading branch information
[email protected] committed Apr 18, 2018
commit 331d1b58889673e125835a383787390266549b46
9 changes: 9 additions & 0 deletions config/samples/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ name: 'auxo'
debug: true
banner: true

global:
mq:
nsq:
nsqd_addr:
- "127.0.0.1:4150"
max_in_flight: 5
concurrent: 3
max_attempt: 2
channel_name: test.nsq
log:
loggers:
- level: info
Expand Down
256 changes: 256 additions & 0 deletions db/mq/mynsq/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package mynsq

import (
"errors"
"os"
"time"

"github.com/astaxie/beego/logs"
"github.com/cuigh/auxo/util/lazy"
gonsq "github.com/nsqio/go-nsq"
)

// max_retry=5
// 模型说明
// 系统中单一程序体只可以订阅一个通道,所有人都是同一个通道
// 但是可以对应不同的topic,topic(m)->channel(1)
// 所以channel定义在global config单一app为一个channel
// consumer 消费者结构体
// global:
// mq:
// nsq:
// nsqd_addr:
// - "127.0.0.1:4150"
// max_in_flight: 5
// concurrent: 3
// max_attempt: 2
// channel_name: test.nsq
type myConsumer struct {
isInit bool
Debug bool
channelName string
concurrent int
maxInFlight int
maxAttempt uint16
//addr 连接地址
nsqdAddr []string
// 各个topic的worker
topics map[string]*topicInfo
}

// topicInfo topic 信息结构体
type topicInfo struct {
topic string
maxInFlight int
concurrentNum int
config *gonsq.Config
handler gonsq.HandlerFunc
consumer *gonsq.Consumer
}

// 失败消息处理函数类型
type FailMessageFunc func(message FailMessage) (err error)

func (f FailMessageFunc) HandleFailMessage(message FailMessage) (err error) {
err = f(message)
return
}

// 失败消息处理接口,继承了该接口的接口都会调用该接口
type FailMessageHandler interface {
HandleFailMessage(message FailMessage) (err error)
}

type FailMessage struct {
Body []byte
Attempt uint16
Timestamp int64
MessageID string
FailMsg string
}

var (
mynsqConsumerValue = lazy.Value{New: consumerCreate}
)

// 不定义close
func MustGetConsumer() *myConsumer {
v, err := mynsqConsumerValue.Get()
if err != nil {
logs.Error("MustGetComsumer | must open comsumer failed")
os.Exit(-1)
}
return v.(*myConsumer)
}

func consumerCreate() (d interface{}, err error) {
options, err := loadOptions()
if err != nil {
logs.Error("consumerCreate | loadOptions| err=%v", err)
return nil, err
}
ret := &myConsumer{
nsqdAddr: make([]string, 0),
topics: make(map[string]*topicInfo),
}
err = ret.init(options, true)
if err != nil {
logs.Error("consumerCreate | ret.init | err=%v", err)
return nil, err
}
d = interface{}(ret)
return d, err
}

// Connect 连接
func (t *topicInfo) connect(channelName string, nsqdAddr []string, debug bool) {
if len(nsqdAddr) == 0 {
logs.Warn("nsqd地址为空,跳过连接,topic:", t.topic)
return
}
var (
retryNum = 0
sleepSeconds = 0
err error
)
t.consumer, err = gonsq.NewConsumer(t.topic, channelName, t.config)
if err != nil {
logs.Error("新建nsq consumer失败,err:%s,topic:%s,channel:%s", err.Error(), t.topic, channelName)
return
}
t.consumer.ChangeMaxInFlight(t.maxInFlight)
// t.consumer.AddConcurrentHandlers(gonsq.Handler(t.handler), t.concurrentNum)
t.consumer.AddHandler(gonsq.Handler(t.handler))
// 不断进行重连,直到连接成功
for {
// 只要连上了就不会退出的, 为空判断由入口保证
if len(nsqdAddr) == 1 {
err = t.consumer.ConnectToNSQD(nsqdAddr[0])
} else {
err = t.consumer.ConnectToNSQDs(nsqdAddr)
}
if err != nil {
logs.Warn("连接nsqd(addr:%v)失败,err:%s", nsqdAddr, err.Error())
retryNum++
sleepSeconds = 5
if retryNum%6 == 0 {
sleepSeconds = 30
}
time.Sleep(time.Duration(sleepSeconds) * time.Second)
continue
}
if debug {
t.consumer.SetLogger(logs.GetLogger(), gonsq.LogLevelDebug)
} else {
t.consumer.SetLogger(logs.GetLogger(), gonsq.LogLevelWarning)
}
logs.Info("连接nsqd(%v)成功", nsqdAddr)
break
}
<-t.consumer.StopChan
err = nil
return
}

// AddHandler 添加handler
func (c *myConsumer) AddHandler(topic string, handler gonsq.HandlerFunc, failHandler FailMessageFunc) *myConsumer {
var (
t = &topicInfo{}
ok bool
)
if t, ok = c.topics[topic]; !ok {
t = &topicInfo{}
t.concurrentNum = c.concurrent
t.maxInFlight = c.maxInFlight
t.config = gonsq.NewConfig()
t.config.MaxAttempts = c.maxAttempt
}

t.topic = topic
// 自定义 handler
t.handler = func(nm *gonsq.Message) (err error) {
err = handler(nm)
if err != nil && c.topics[topic].config.MaxAttempts > 0 && c.topics[topic].config.MaxAttempts == nm.Attempts && failHandler != nil {
messageID := make([]byte, 0)
for _, v := range nm.ID {
messageID = append(messageID, v)
}
failHandler(FailMessage{
MessageID: string(messageID),
Body: nm.Body,
Timestamp: nm.Timestamp,
FailMsg: err.Error(),
})
err = nil
}
return
}
c.topics[topic] = t
return c
}

// StopAll 停止
func (c *myConsumer) stop() {
for k := range c.topics {
c.topics[k].consumer.Stop()
}
}

// Run 运行
func (c *myConsumer) Run() (err error) {
defer c.stop()
if !c.isInit {
err = errors.New("consumer not init")
return
}
if len(c.nsqdAddr) == 0 {
err = errors.New("nsqd addr address required")
return
}
for _, topicInfo := range c.topics {
topicInfo.config.MaxAttempts = c.maxAttempt
topicInfo.config.MaxInFlight = c.maxInFlight
go topicInfo.connect(c.channelName, c.nsqdAddr, c.Debug)
}
neverBack := make(chan int)
<-neverBack
return
}

// Init 初始化
func (c *myConsumer) init(configSection *Options, debug bool) (err error) {
if len(configSection.NsqdAddr) > 0 {
c.nsqdAddr = configSection.NsqdAddr
}
if configSection.MaxInFlight > 0 {
c.maxInFlight = configSection.MaxInFlight
}
if configSection.Concurrent > 0 {
c.concurrent = configSection.Concurrent
}
if configSection.ChannelName != "" {
c.channelName = configSection.ChannelName
}
if c.channelName == "" {
err = errors.New("config channelName not found")
return
}
if configSection.MaxAttempt > 0 {
c.maxAttempt = uint16(configSection.MaxAttempt)
}

if c.maxInFlight < 1 {
c.maxInFlight = 1
}
if c.concurrent < 1 {
c.concurrent = 1
}

if c.maxInFlight < c.concurrent {
err = errors.New("max_in_flight should exceed than concurrent")
return
}
c.isInit = true

return
}
31 changes: 31 additions & 0 deletions db/mq/mynsq/nsq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package mynsq

import (
"github.com/cuigh/auxo/config"
"github.com/cuigh/auxo/errors"
)

const PkgName = "auxo.mq.nsq"

type Options struct {
NsqdAddr []string
NsqlookupdAddr []string
MaxInFlight int
Concurrent int
MaxAttempt int
ChannelName string
}

func loadOptions() (*Options, error) {
key := "global.mq.nsq"
if !config.Exist(key) {
return nil, errors.Format("can't find nsq config for [%s]", key)
}

opts := &Options{}
err := config.UnmarshalOption(key, opts)
if err != nil {
return nil, err
}
return opts, nil
}
38 changes: 38 additions & 0 deletions db/mq/mynsq/nsq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package mynsq_test

import (
"testing"

"github.com/astaxie/beego/logs"
"github.com/cuigh/auxo/config"
"github.com/cuigh/auxo/db/mq/mynsq"
gonsq "github.com/nsqio/go-nsq"
)

func init() {
config.AddFolder("../../../config/samples")
}

func Test_Pulish(t *testing.T) {
producer := mynsq.MustGetProducer()
producer.Publish("test_mynsq", "hello world!")
}

func Test_Consumer(t *testing.T) {
mynsq.MustGetConsumer().AddHandler("test_mynsq", testHandler(), testFailHandler()).Run()
}

func testHandler() gonsq.HandlerFunc {
return func(nm *gonsq.Message) error {
logs.Info(string(nm.Body))
return nil
}
}

func testFailHandler() mynsq.FailMessageFunc {
return func(message mynsq.FailMessage) (err error) {
logs.Error("error msg trigger,msg:", string(message.Body), ",messageid:", message.MessageID)
err = nil
return
}
}
Loading