forked from italolelis/outboxer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
outboxer.go
157 lines (135 loc) · 4.25 KB
/
outboxer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Package outboxer is an implementation of the outbox pattern.
// The producer of messages can durably store those messages in a local outbox before sending to a Message Endpoint.
// The durable local storage may be implemented in the Message Channel directly, especially when combined
// with Idempotent Messages.
package outboxer
import (
"context"
"database/sql"
"errors"
"time"
)
var (
// ErrMissingEventStream is used when no event stream is provided
ErrMissingEventStream = errors.New("an event stream is required for the outboxer to work")
// ErrMissingDataStore is used when no data store is provided
ErrMissingDataStore = errors.New("a data store is required for the outboxer to work")
)
// ExecerContext defines the exec context method that is used within a transaction
type ExecerContext interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
// DataStore defines the data store methods
type DataStore interface {
// Tries to find the given message in the outbox.
GetEvents(ctx context.Context, batchSize int32) ([]*OutboxMessage, error)
Add(ctx context.Context, m *OutboxMessage) error
AddWithinTx(ctx context.Context, m *OutboxMessage, fn func(ExecerContext) error) error
SetAsDispatched(ctx context.Context, id int64) error
Remove(ctx context.Context, since time.Time, batchSize int32) error
}
// EventStream defines the event stream methods
type EventStream interface {
Send(context.Context, *OutboxMessage) error
}
// Outboxer implements the outbox pattern
type Outboxer struct {
ds DataStore
es EventStream
checkInterval time.Duration
cleanUpInterval time.Duration
cleanUpBefore time.Time
cleanUpBatchSize int32
messageBatchSize int32
errChan chan error
okChan chan struct{}
}
// New creates a new instance of Outboxer
func New(opts ...Option) (*Outboxer, error) {
o := Outboxer{
errChan: make(chan error),
okChan: make(chan struct{}),
messageBatchSize: 100,
cleanUpBatchSize: 100,
}
for _, opt := range opts {
opt(&o)
}
if o.ds == nil {
return nil, ErrMissingDataStore
}
if o.es == nil {
return nil, ErrMissingEventStream
}
return &o, nil
}
// ErrChan returns the error channel
func (o *Outboxer) ErrChan() <-chan error {
return o.errChan
}
// OkChan returns the ok channel that is used when each message is successfully delivered
func (o *Outboxer) OkChan() <-chan struct{} {
return o.okChan
}
// Send sends a message
func (o *Outboxer) Send(ctx context.Context, m *OutboxMessage) error {
return o.ds.Add(ctx, m)
}
// SendWithinTx encapsulate any database call within a transaction
func (o *Outboxer) SendWithinTx(ctx context.Context, evt *OutboxMessage, fn func(ExecerContext) error) error {
return o.ds.AddWithinTx(ctx, evt, fn)
}
// Start encapsulates two go routines. Starts the dispatcher, which is responsible for getting the messages
// from the data store and sending to the event stream.
// Starts the cleanup process, that makes sure old messages are removed from the data store.
func (o *Outboxer) Start(ctx context.Context) {
go o.StartDispatcher(ctx)
go o.StartCleanup(ctx)
}
// StartDispatcher starts the dispatcher, which is responsible for getting the messages
// from the data store and sending to the event stream.
func (o *Outboxer) StartDispatcher(ctx context.Context) {
ticker := time.NewTicker(o.checkInterval)
for {
select {
case <-ticker.C:
evts, err := o.ds.GetEvents(ctx, o.messageBatchSize)
if err != nil {
o.errChan <- err
break
}
for _, e := range evts {
if err := o.es.Send(ctx, e); err != nil {
o.errChan <- err
} else {
if err := o.ds.SetAsDispatched(ctx, e.ID); err != nil {
o.errChan <- err
} else {
o.okChan <- struct{}{}
}
}
}
case <-ctx.Done():
return
}
}
}
// StartCleanup starts the cleanup process, that makes sure old messages are removed from the data store.
func (o *Outboxer) StartCleanup(ctx context.Context) {
ticker := time.NewTicker(o.cleanUpInterval)
for {
select {
case <-ticker.C:
if err := o.ds.Remove(ctx, o.cleanUpBefore, o.cleanUpBatchSize); err != nil {
o.errChan <- err
}
case <-ctx.Done():
return
}
}
}
// Stop closes all channels
func (o *Outboxer) Stop() {
close(o.errChan)
close(o.okChan)
}