forked from ava-labs/avalanchego
-
Notifications
You must be signed in to change notification settings - Fork 0
/
messages.go
173 lines (143 loc) · 5.33 KB
/
messages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright (C) 2019-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package message
import (
"fmt"
"strings"
"sync"
"time"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/constants"
)
var (
_ InboundMessage = &inboundMessage{}
_ OutboundMessage = &outboundMessage{}
)
// InboundMessage represents a set of fields for an inbound message that can be serialized into a byte stream
type InboundMessage interface {
fmt.Stringer
BytesSavedCompression() int
Op() Op
Get(Field) interface{}
NodeID() ids.ShortID
ExpirationTime() time.Time
OnFinishedHandling()
}
type inboundMessage struct {
op Op
bytesSavedCompression int
fields map[Field]interface{}
nodeID ids.ShortID
expirationTime time.Time
onFinishedHandling func()
}
// Op returns the value of the specified operation in this message
func (inMsg *inboundMessage) Op() Op { return inMsg.op }
// BytesSavedCompression returns the number of bytes this message saved due to
// compression. That is, the number of bytes we did not receive over the
// network due to the message being compressed. 0 for messages that were not
// compressed.
func (inMsg *inboundMessage) BytesSavedCompression() int { return inMsg.bytesSavedCompression }
// Field returns the value of the specified field in this message
func (inMsg *inboundMessage) Get(field Field) interface{} { return inMsg.fields[field] }
// NodeID returns the node that the msg was sent by.
func (inMsg *inboundMessage) NodeID() ids.ShortID { return inMsg.nodeID }
// ExpirationTime returns the time this message doesn't need to be responded to.
// A zero time means message does not expire.
func (inMsg *inboundMessage) ExpirationTime() time.Time { return inMsg.expirationTime }
// OnFinishedHandling is the function to be called once inboundMessage is
// complete.
func (inMsg *inboundMessage) OnFinishedHandling() {
if inMsg.onFinishedHandling != nil {
inMsg.onFinishedHandling()
}
}
func (inMsg *inboundMessage) String() string {
sb := strings.Builder{}
sb.WriteString(fmt.Sprintf("(Op: %s, NodeID: %s%s", inMsg.op, constants.NodeIDPrefix, inMsg.nodeID))
if requestIDIntf, exists := inMsg.fields[RequestID]; exists {
sb.WriteString(fmt.Sprintf(", RequestID: %d", requestIDIntf.(uint32)))
}
if !inMsg.expirationTime.IsZero() {
sb.WriteString(fmt.Sprintf(", Deadline: %d", inMsg.expirationTime.Unix()))
}
switch inMsg.op {
case GetAccepted, Accepted, Chits, AcceptedFrontier:
sb.WriteString(fmt.Sprintf(", NumContainerIDs: %d)", len(inMsg.fields[ContainerIDs].([][]byte))))
case Get, GetAncestors, Put, PushQuery, PullQuery:
sb.WriteString(fmt.Sprintf(", ContainerID: 0x%x)", inMsg.fields[ContainerID].([]byte)))
case Ancestors:
sb.WriteString(fmt.Sprintf(", NumContainers: %d)", len(inMsg.fields[MultiContainerBytes].([][]byte))))
case Notify:
sb.WriteString(fmt.Sprintf(", Notification: %d)", inMsg.fields[VMMessage].(uint32)))
case AppRequest, AppResponse, AppGossip:
sb.WriteString(fmt.Sprintf(", len(AppMsg): %d)", inMsg.fields[AppBytes].([]byte)))
default:
sb.WriteString(")")
}
return sb.String()
}
// OutboundMessage represents a set of fields for an outbound message that can
// be serialized into a byte stream
type OutboundMessage interface {
BytesSavedCompression() int
Bytes() []byte
Op() Op
BypassThrottling() bool
AddRef()
DecRef()
}
type outboundMessage struct {
bytes []byte
bytesSavedCompression int
op Op
bypassThrottling bool
refLock sync.Mutex
refs int
c *codec
}
// Op returns the value of the specified operation in this message
func (outMsg *outboundMessage) Op() Op { return outMsg.op }
// Bytes returns this message in bytes
func (outMsg *outboundMessage) Bytes() []byte { return outMsg.bytes }
// BytesSavedCompression returns the number of bytes this message saved due to
// compression. That is, the number of bytes we did not send over the
// network due to the message being compressed. 0 for messages that were not
// compressed.
func (outMsg *outboundMessage) BytesSavedCompression() int { return outMsg.bytesSavedCompression }
func (outMsg *outboundMessage) AddRef() {
outMsg.refLock.Lock()
defer outMsg.refLock.Unlock()
outMsg.refs++
}
// Once the reference count of this message goes to 0, the byte slice should not
// be inspected.
func (outMsg *outboundMessage) DecRef() {
outMsg.refLock.Lock()
defer outMsg.refLock.Unlock()
outMsg.refs--
if outMsg.refs == 0 {
outMsg.c.byteSlicePool.Put(outMsg.bytes)
}
}
// BypassThrottling when attempting to send this message
func (outMsg *outboundMessage) BypassThrottling() bool { return outMsg.bypassThrottling }
type TestMsg struct {
op Op
bytes []byte
bypassThrottling bool
}
func NewTestMsg(op Op, bytes []byte, bypassThrottling bool) *TestMsg {
return &TestMsg{
op: op,
bytes: bytes,
bypassThrottling: bypassThrottling,
}
}
func (m *TestMsg) Op() Op { return m.op }
func (*TestMsg) Get(Field) interface{} { return nil }
func (m *TestMsg) Bytes() []byte { return m.bytes }
func (*TestMsg) BytesSavedCompression() int { return 0 }
func (*TestMsg) AddRef() {}
func (*TestMsg) DecRef() {}
func (m *TestMsg) BypassThrottling() bool { return m.bypassThrottling }