Skip to content

Commit

Permalink
[feat] inject trace id to mq msg key
Browse files Browse the repository at this point in the history
  • Loading branch information
UnderTreeTech authored and sunqiang1 committed Oct 30, 2024
1 parent 07b12f6 commit 71582ed
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions pkg/broker/rocketmq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/UnderTreeTech/waterdrop/pkg/log"
"github.com/UnderTreeTech/waterdrop/pkg/trace"

"github.com/UnderTreeTech/waterdrop/pkg/utils/xstring"

Expand Down Expand Up @@ -95,7 +96,7 @@ func (p *Producer) Shutdown() error {

// SendSyncMsg send message sync
func (p *Producer) SendSyncMsg(ctx context.Context, content string, tags ...string) error {
msgs := getSendMsgs(p.config.Topic, []string{content}, tags...)
msgs := getSendMsgs(ctx, p.config.Topic, []string{content}, tags...)
_, err := p.producer.SendSync(ctx, msgs...)
if err != nil {
log.Error(ctx, "send msg fail", log.String("content", content), log.Any("tags", tags),
Expand All @@ -107,7 +108,7 @@ func (p *Producer) SendSyncMsg(ctx context.Context, content string, tags ...stri

// BatchSendSyncMsg batch send message sync
func (p *Producer) BatchSendSyncMsg(ctx context.Context, contents []string, tags ...string) error {
msgs := getSendMsgs(p.config.Topic, contents, tags...)
msgs := getSendMsgs(ctx, p.config.Topic, contents, tags...)
_, err := p.producer.SendSync(ctx, msgs...)
if err != nil {
log.Error(ctx, "send msg fail", log.Any("content", contents), log.Any("tags", tags),
Expand All @@ -119,7 +120,7 @@ func (p *Producer) BatchSendSyncMsg(ctx context.Context, contents []string, tags

// SendAsyncMsg send message async
func (p *Producer) SendAsyncMsg(ctx context.Context, content string, callback func(context.Context, *primitive.SendResult, error), tags ...string) error {
msgs := getSendMsgs(p.config.Topic, []string{content}, tags...)
msgs := getSendMsgs(ctx, p.config.Topic, []string{content}, tags...)
err := p.producer.SendAsync(ctx, callback, msgs...)
if err != nil {
log.Error(ctx, "async send msg fail", log.String("content", content), log.String("error", err.Error()))
Expand All @@ -129,12 +130,13 @@ func (p *Producer) SendAsyncMsg(ctx context.Context, content string, callback fu
}

// getSendMsgs format send message to primitive.Message
func getSendMsgs(topic string, contents []string, tags ...string) []*primitive.Message {
func getSendMsgs(ctx context.Context, topic string, contents []string, tags ...string) []*primitive.Message {
msgs := make([]*primitive.Message, 0, len(contents))
traceId := trace.TraceID(ctx)
for _, content := range contents {
bs := xstring.StringToBytes(content)
msg := primitive.NewMessage(topic, bs).
WithKeys([]string{xstring.RandomString(16)})
WithKeys([]string{xstring.RandomString(16) + traceId})
if len(tags) > 0 {
msg = msg.WithTag(tags[0])
}
Expand Down

0 comments on commit 71582ed

Please sign in to comment.