forked from thrasher-corp/gocryptotrader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdispatch_types.go
80 lines (63 loc) · 2.08 KB
/
dispatch_types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package dispatch
import (
"sync"
"time"
"github.com/gofrs/uuid"
)
const (
// DefaultJobsLimit defines a maximum amount of jobs allowed in channel
DefaultJobsLimit = 100
// DefaultMaxWorkers is the package default worker ceiling amount
DefaultMaxWorkers = 10
// DefaultHandshakeTimeout defines a workers max length of time to wait on a
// an unbuffered channel for a receiver before moving on to next route
DefaultHandshakeTimeout = 200 * time.Nanosecond
)
// dispatcher is our main in memory instance with a stop/start mtx below
var dispatcher *Dispatcher
// Dispatcher defines an internal subsystem communication/change state publisher
type Dispatcher struct {
// routes refers to a subsystem uuid ticket map with associated publish
// channels, a relayer will be given a unique id through its job channel,
// then publish the data across the full registered channels for that uuid.
// See relayer() method below.
routes map[uuid.UUID][]chan interface{}
// rMtx protects the routes variable ensuring acceptable read/write access
rMtx sync.RWMutex
// Persistent buffered job queue for relayers
jobs chan job
// Dynamic channel pool; returns an unbuffered channel for routes map
outbound sync.Pool
// MaxWorkers defines max worker ceiling
maxWorkers int
// Dispatch status
running bool
// Unbufferd shutdown chan, sync wg for ensuring concurrency when only
// dropping a single relayer routine
shutdown chan struct{}
// Relayer shutdown tracking
wg sync.WaitGroup
// dispatcher write protection
m sync.RWMutex
}
// job defines a relaying job associated with a ticket which allows routing to
// routines that require specific data
type job struct {
Data interface{}
ID uuid.UUID
}
// Mux defines a new multiplexer for the dispatch system, these a generated
// per subsystem
type Mux struct {
// Reference to the main running dispatch service
d *Dispatcher
}
// Pipe defines an outbound object to the desired routine
type Pipe struct {
// Channel to get all our lovely information
C <-chan interface{}
// ID to tracked system
id uuid.UUID
// Reference to multiplexer
m *Mux
}