From de50c8bac642bd7693248cb68e0b5c552e55f5bb Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Mon, 12 Sep 2022 09:58:38 -0700 Subject: [PATCH] message: rename structs in preparation of proto modes (#1974) --- message/builder_test.go | 16 +-- message/codec.go | 16 +-- message/codec_test.go | 72 ++++++++--- message/creator.go | 4 +- message/inbound_msg_builder.go | 196 +++++++++++++++++------------ message/internal_msg_builder.go | 48 ++++--- message/messages.go | 31 +++-- message/messages_benchmark_test.go | 12 +- message/messages_test.go | 16 ++- message/outbound_msg_builder.go | 54 ++++---- 10 files changed, 280 insertions(+), 185 deletions(-) diff --git a/message/builder_test.go b/message/builder_test.go index e9de945413e3..c08fc0087c94 100644 --- a/message/builder_test.go +++ b/message/builder_test.go @@ -33,8 +33,8 @@ func init() { panic(err) } TestCodec = codec - UncompressingBuilder = NewOutboundBuilder(codec, false /*compress*/) - TestInboundMsgBuilder = NewInboundBuilder(codec) + UncompressingBuilder = NewOutboundBuilderWithPacker(codec, false /*compress*/) + TestInboundMsgBuilder = NewInboundBuilderWithPacker(codec) } func TestBuildVersion(t *testing.T) { @@ -190,7 +190,7 @@ func TestBuildPut(t *testing.T) { container := []byte{2} for _, compress := range []bool{false, true} { - builder := NewOutboundBuilder(TestCodec, compress) + builder := NewOutboundBuilderWithPacker(TestCodec, compress) msg, err := builder.Put(chainID, requestID, containerID, container) require.NoError(t, err) require.NotNil(t, msg) @@ -215,7 +215,7 @@ func TestBuildPushQuery(t *testing.T) { container := []byte{2} for _, compress := range []bool{false, true} { - builder := NewOutboundBuilder(TestCodec, compress) + builder := NewOutboundBuilderWithPacker(TestCodec, compress) msg, err := builder.PushQuery(chainID, requestID, time.Duration(deadline), containerID, container) require.NoError(t, err) require.NotNil(t, msg) @@ -311,7 +311,7 @@ func TestBuildAncestors(t *testing.T) { containers := [][]byte{container[:], container2[:]} for _, compress := range []bool{false, true} { - builder := NewOutboundBuilder(TestCodec, compress) + builder := NewOutboundBuilderWithPacker(TestCodec, compress) msg, err := builder.Ancestors(chainID, requestID, containers) require.NoError(t, err) require.NotNil(t, msg) @@ -335,7 +335,7 @@ func TestBuildAppRequestMsg(t *testing.T) { deadline := uint64(time.Now().Unix()) for _, compress := range []bool{false, true} { - builder := NewOutboundBuilder(TestCodec, compress) + builder := NewOutboundBuilderWithPacker(TestCodec, compress) msg, err := builder.AppRequest(chainID, 1, time.Duration(deadline), appRequestBytes) require.NoError(t, err) require.NotNil(t, msg) @@ -355,7 +355,7 @@ func TestBuildAppResponseMsg(t *testing.T) { appResponseBytes[len(appResponseBytes)-1] = 1 for _, compress := range []bool{false, true} { - builder := NewOutboundBuilder(TestCodec, compress) + builder := NewOutboundBuilderWithPacker(TestCodec, compress) msg, err := builder.AppResponse(chainID, 1, appResponseBytes) require.NoError(t, err) require.NotNil(t, msg) @@ -378,7 +378,7 @@ func TestBuildAppGossipMsg(t *testing.T) { appGossipBytes[len(appGossipBytes)-1] = 1 for _, compress := range []bool{false, true} { - testBuilder := NewOutboundBuilder(TestCodec, compress) + testBuilder := NewOutboundBuilderWithPacker(TestCodec, compress) msg, err := testBuilder.AppGossip(chainID, appGossipBytes) require.NoError(t, err) require.NotNil(t, msg) diff --git a/message/codec.go b/message/codec.go index d8b97b336ceb..b7cbd1aa643d 100644 --- a/message/codec.go +++ b/message/codec.go @@ -247,12 +247,14 @@ func (c *codec) Parse(bytes []byte, nodeID ids.NodeID, onFinishedHandling func() expirationTime = c.clock.Time().Add(deadlineDuration) } - return &inboundMessage{ - op: op, - fields: fieldValues, - bytesSavedCompression: bytesSaved, - nodeID: nodeID, - expirationTime: expirationTime, - onFinishedHandling: onFinishedHandling, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: op, + bytesSavedCompression: bytesSaved, + nodeID: nodeID, + expirationTime: expirationTime, + onFinishedHandling: onFinishedHandling, + }, + fields: fieldValues, }, nil } diff --git a/message/codec_test.go b/message/codec_test.go index ccc33f929b3c..219d70ef2cd7 100644 --- a/message/codec_test.go +++ b/message/codec_test.go @@ -65,8 +65,10 @@ func TestDeadlineOverride(t *testing.T) { require.NoError(t, err) id := ids.GenerateTestID() - m := inboundMessage{ - op: PushQuery, + m := inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: PushQuery, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -82,7 +84,7 @@ func TestDeadlineOverride(t *testing.T) { unpackedIntf, err := c.Parse(packedIntf.Bytes(), dummyNodeID, dummyOnFinishedHandling) require.NoError(t, err, "failed to parse w/ compression on operation %s", m.op) - unpacked := unpackedIntf.(*inboundMessage) + unpacked := unpackedIntf.(*inboundMessageWithPacker) require.NotEqual(t, unpacked.ExpirationTime(), time.Now().Add(1337*time.Hour)) require.True(t, time.Since(unpacked.ExpirationTime()) <= 10*time.Second) } @@ -98,9 +100,11 @@ func TestCodecPackParseGzip(t *testing.T) { require.NoError(t, err) cert := tlsCert.Leaf - msgs := []inboundMessage{ + msgs := []inboundMessageWithPacker{ { - op: Version, + inboundMessage: inboundMessage{ + op: Version, + }, fields: map[Field]interface{}{ NetworkID: uint32(0), NodeID: uint32(1337), @@ -113,7 +117,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: PeerList, + inboundMessage: inboundMessage{ + op: PeerList, + }, fields: map[Field]interface{}{ Peers: []ips.ClaimedIPPort{ { @@ -126,17 +132,23 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: Ping, + inboundMessage: inboundMessage{ + op: Ping, + }, fields: map[Field]interface{}{}, }, { - op: Pong, + inboundMessage: inboundMessage{ + op: Pong, + }, fields: map[Field]interface{}{ Uptime: uint8(80), }, }, { - op: GetAcceptedFrontier, + inboundMessage: inboundMessage{ + op: GetAcceptedFrontier, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -144,7 +156,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: AcceptedFrontier, + inboundMessage: inboundMessage{ + op: AcceptedFrontier, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -152,7 +166,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: GetAccepted, + inboundMessage: inboundMessage{ + op: GetAccepted, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -161,7 +177,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: Accepted, + inboundMessage: inboundMessage{ + op: Accepted, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -169,7 +187,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: Ancestors, + inboundMessage: inboundMessage{ + op: Ancestors, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -177,7 +197,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: Get, + inboundMessage: inboundMessage{ + op: Get, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -186,7 +208,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: Put, + inboundMessage: inboundMessage{ + op: Put, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -195,7 +219,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: PushQuery, + inboundMessage: inboundMessage{ + op: PushQuery, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -205,7 +231,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: PullQuery, + inboundMessage: inboundMessage{ + op: PullQuery, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -214,7 +242,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: Chits, + inboundMessage: inboundMessage{ + op: Chits, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -222,7 +252,9 @@ func TestCodecPackParseGzip(t *testing.T) { }, }, { - op: ChitsV2, + inboundMessage: inboundMessage{ + op: ChitsV2, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(1337), @@ -238,7 +270,7 @@ func TestCodecPackParseGzip(t *testing.T) { unpackedIntf, err := c.Parse(packedIntf.Bytes(), dummyNodeID, dummyOnFinishedHandling) require.NoError(t, err, "failed to parse w/ compression on operation %s", m.op) - unpacked := unpackedIntf.(*inboundMessage) + unpacked := unpackedIntf.(*inboundMessageWithPacker) require.EqualValues(t, len(m.fields), len(unpacked.fields)) } diff --git a/message/creator.go b/message/creator.go index d45ca26ce854..091a7b48218d 100644 --- a/message/creator.go +++ b/message/creator.go @@ -33,8 +33,8 @@ func NewCreator(metrics prometheus.Registerer, compressionEnabled bool, parentNa return nil, err } return &creator{ - OutboundMsgBuilder: NewOutboundBuilder(codec, compressionEnabled), - InboundMsgBuilder: NewInboundBuilder(codec), + OutboundMsgBuilder: NewOutboundBuilderWithPacker(codec, compressionEnabled), + InboundMsgBuilder: NewInboundBuilderWithPacker(codec), InternalMsgBuilder: NewInternalBuilder(), }, nil } diff --git a/message/inbound_msg_builder.go b/message/inbound_msg_builder.go index b531ff401909..b565ec807bb4 100644 --- a/message/inbound_msg_builder.go +++ b/message/inbound_msg_builder.go @@ -10,7 +10,7 @@ import ( "github.com/ava-labs/avalanchego/utils/timer/mockable" ) -var _ InboundMsgBuilder = &inMsgBuilder{} +var _ InboundMsgBuilder = &inMsgBuilderWithPacker{} type InboundMsgBuilder interface { Parser @@ -144,59 +144,63 @@ type InboundMsgBuilder interface { ) InboundMessage // used in UTs only } -type inMsgBuilder struct { +type inMsgBuilderWithPacker struct { Codec clock mockable.Clock } -func NewInboundBuilder(c Codec) InboundMsgBuilder { - return &inMsgBuilder{ +func NewInboundBuilderWithPacker(c Codec) InboundMsgBuilder { + return &inMsgBuilderWithPacker{ Codec: c, } } -func (b *inMsgBuilder) SetTime(t time.Time) { +func (b *inMsgBuilderWithPacker) SetTime(t time.Time) { b.clock.Set(t) b.Codec.SetTime(t) } -func (b *inMsgBuilder) InboundGetStateSummaryFrontier( +func (b *inMsgBuilderWithPacker) InboundGetStateSummaryFrontier( chainID ids.ID, requestID uint32, deadline time.Duration, nodeID ids.NodeID, ) InboundMessage { received := b.clock.Time() - return &inboundMessage{ - op: GetStateSummaryFrontier, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: GetStateSummaryFrontier, + nodeID: nodeID, + expirationTime: received.Add(deadline), + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, Deadline: uint64(deadline), }, - nodeID: nodeID, - expirationTime: received.Add(deadline), } } -func (b *inMsgBuilder) InboundStateSummaryFrontier( +func (b *inMsgBuilderWithPacker) InboundStateSummaryFrontier( chainID ids.ID, requestID uint32, summary []byte, nodeID ids.NodeID, ) InboundMessage { - return &inboundMessage{ - op: StateSummaryFrontier, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: StateSummaryFrontier, + nodeID: nodeID, + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, SummaryBytes: summary, }, - nodeID: nodeID, } } -func (b *inMsgBuilder) InboundGetAcceptedStateSummary( +func (b *inMsgBuilderWithPacker) InboundGetAcceptedStateSummary( chainID ids.ID, requestID uint32, heights []uint64, @@ -204,20 +208,22 @@ func (b *inMsgBuilder) InboundGetAcceptedStateSummary( nodeID ids.NodeID, ) InboundMessage { received := b.clock.Time() - return &inboundMessage{ - op: GetAcceptedStateSummary, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: GetAcceptedStateSummary, + nodeID: nodeID, + expirationTime: received.Add(deadline), + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, Deadline: uint64(deadline), SummaryHeights: heights, }, - nodeID: nodeID, - expirationTime: received.Add(deadline), } } -func (b *inMsgBuilder) InboundAcceptedStateSummary( +func (b *inMsgBuilderWithPacker) InboundAcceptedStateSummary( chainID ids.ID, requestID uint32, summaryIDs []ids.ID, @@ -225,37 +231,41 @@ func (b *inMsgBuilder) InboundAcceptedStateSummary( ) InboundMessage { summaryIDBytes := make([][]byte, len(summaryIDs)) encodeIDs(summaryIDs, summaryIDBytes) - return &inboundMessage{ - op: AcceptedStateSummary, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: AcceptedStateSummary, + nodeID: nodeID, + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, SummaryIDs: summaryIDBytes, }, - nodeID: nodeID, } } -func (b *inMsgBuilder) InboundGetAcceptedFrontier( +func (b *inMsgBuilderWithPacker) InboundGetAcceptedFrontier( chainID ids.ID, requestID uint32, deadline time.Duration, nodeID ids.NodeID, ) InboundMessage { received := b.clock.Time() - return &inboundMessage{ - op: GetAcceptedFrontier, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: GetAcceptedFrontier, + nodeID: nodeID, + expirationTime: received.Add(deadline), + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, Deadline: uint64(deadline), }, - nodeID: nodeID, - expirationTime: received.Add(deadline), } } -func (b *inMsgBuilder) InboundAcceptedFrontier( +func (b *inMsgBuilderWithPacker) InboundAcceptedFrontier( chainID ids.ID, requestID uint32, containerIDs []ids.ID, @@ -263,18 +273,20 @@ func (b *inMsgBuilder) InboundAcceptedFrontier( ) InboundMessage { containerIDBytes := make([][]byte, len(containerIDs)) encodeIDs(containerIDs, containerIDBytes) - return &inboundMessage{ - op: AcceptedFrontier, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: AcceptedFrontier, + nodeID: nodeID, + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, ContainerIDs: containerIDBytes, }, - nodeID: nodeID, } } -func (b *inMsgBuilder) InboundGetAccepted( +func (b *inMsgBuilderWithPacker) InboundGetAccepted( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -284,20 +296,22 @@ func (b *inMsgBuilder) InboundGetAccepted( received := b.clock.Time() containerIDBytes := make([][]byte, len(containerIDs)) encodeIDs(containerIDs, containerIDBytes) - return &inboundMessage{ - op: GetAccepted, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: GetAccepted, + nodeID: nodeID, + expirationTime: received.Add(deadline), + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, Deadline: uint64(deadline), ContainerIDs: containerIDBytes, }, - nodeID: nodeID, - expirationTime: received.Add(deadline), } } -func (b *inMsgBuilder) InboundAccepted( +func (b *inMsgBuilderWithPacker) InboundAccepted( chainID ids.ID, requestID uint32, containerIDs []ids.ID, @@ -305,18 +319,20 @@ func (b *inMsgBuilder) InboundAccepted( ) InboundMessage { containerIDBytes := make([][]byte, len(containerIDs)) encodeIDs(containerIDs, containerIDBytes) - return &inboundMessage{ - op: Accepted, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Accepted, + nodeID: nodeID, + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, ContainerIDs: containerIDBytes, }, - nodeID: nodeID, } } -func (b *inMsgBuilder) InboundPushQuery( +func (b *inMsgBuilderWithPacker) InboundPushQuery( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -325,8 +341,12 @@ func (b *inMsgBuilder) InboundPushQuery( nodeID ids.NodeID, ) InboundMessage { received := b.clock.Time() - return &inboundMessage{ - op: PushQuery, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: PushQuery, + nodeID: nodeID, + expirationTime: received.Add(deadline), + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, @@ -334,12 +354,10 @@ func (b *inMsgBuilder) InboundPushQuery( ContainerID: containerID[:], ContainerBytes: container, }, - nodeID: nodeID, - expirationTime: received.Add(deadline), } } -func (b *inMsgBuilder) InboundPullQuery( +func (b *inMsgBuilderWithPacker) InboundPullQuery( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -347,20 +365,22 @@ func (b *inMsgBuilder) InboundPullQuery( nodeID ids.NodeID, ) InboundMessage { received := b.clock.Time() - return &inboundMessage{ - op: PullQuery, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: PullQuery, + nodeID: nodeID, + expirationTime: received.Add(deadline), + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, Deadline: uint64(deadline), ContainerID: containerID[:], }, - nodeID: nodeID, - expirationTime: received.Add(deadline), } } -func (b *inMsgBuilder) InboundChits( +func (b *inMsgBuilderWithPacker) InboundChits( chainID ids.ID, requestID uint32, containerIDs []ids.ID, @@ -368,21 +388,23 @@ func (b *inMsgBuilder) InboundChits( ) InboundMessage { containerIDBytes := make([][]byte, len(containerIDs)) encodeIDs(containerIDs, containerIDBytes) - return &inboundMessage{ - op: Chits, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Chits, + nodeID: nodeID, + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, ContainerIDs: containerIDBytes, }, - nodeID: nodeID, } } // The first "containerIDs" is always populated for backward compatibilities with old "Chits" // Only after the DAG is linearized, the second "containerID" will be populated // with the new snowman chain containers. -func (b *inMsgBuilder) InboundChitsV2( +func (b *inMsgBuilderWithPacker) InboundChitsV2( chainID ids.ID, requestID uint32, containerIDs []ids.ID, @@ -392,19 +414,21 @@ func (b *inMsgBuilder) InboundChitsV2( containerIDBytes := make([][]byte, len(containerIDs)) encodeIDs(containerIDs, containerIDBytes) - return &inboundMessage{ - op: ChitsV2, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: ChitsV2, + nodeID: nodeID, + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, ContainerIDs: containerIDBytes, ContainerID: containerID[:], }, - nodeID: nodeID, } } -func (b *inMsgBuilder) InboundAppRequest( +func (b *inMsgBuilderWithPacker) InboundAppRequest( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -412,37 +436,41 @@ func (b *inMsgBuilder) InboundAppRequest( nodeID ids.NodeID, ) InboundMessage { received := b.clock.Time() - return &inboundMessage{ - op: AppRequest, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: AppRequest, + nodeID: nodeID, + expirationTime: received.Add(deadline), + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, Deadline: uint64(deadline), AppBytes: msg, }, - nodeID: nodeID, - expirationTime: received.Add(deadline), } } -func (b *inMsgBuilder) InboundAppResponse( +func (b *inMsgBuilderWithPacker) InboundAppResponse( chainID ids.ID, requestID uint32, msg []byte, nodeID ids.NodeID, ) InboundMessage { - return &inboundMessage{ - op: AppResponse, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: AppResponse, + nodeID: nodeID, + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, AppBytes: msg, }, - nodeID: nodeID, } } -func (b *inMsgBuilder) InboundGet( +func (b *inMsgBuilderWithPacker) InboundGet( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -450,52 +478,58 @@ func (b *inMsgBuilder) InboundGet( nodeID ids.NodeID, ) InboundMessage { // used in UTs only received := b.clock.Time() - return &inboundMessage{ - op: Put, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Put, + nodeID: nodeID, + expirationTime: received.Add(deadline), + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, Deadline: uint64(deadline), ContainerID: containerID[:], }, - nodeID: nodeID, - expirationTime: received.Add(deadline), } } -func (b *inMsgBuilder) InboundPut( +func (b *inMsgBuilderWithPacker) InboundPut( chainID ids.ID, requestID uint32, containerID ids.ID, container []byte, nodeID ids.NodeID, ) InboundMessage { // used in UTs only - return &inboundMessage{ - op: Put, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Put, + nodeID: nodeID, + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, ContainerID: containerID[:], ContainerBytes: container, }, - nodeID: nodeID, } } -func (b *inMsgBuilder) InboundAncestors( +func (b *inMsgBuilderWithPacker) InboundAncestors( chainID ids.ID, requestID uint32, containers [][]byte, nodeID ids.NodeID, ) InboundMessage { // used in UTs only - return &inboundMessage{ - op: Ancestors, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Ancestors, + nodeID: nodeID, + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, MultiContainerBytes: containers, }, - nodeID: nodeID, } } diff --git a/message/internal_msg_builder.go b/message/internal_msg_builder.go index 9f84a6f151c9..3f988eed865f 100644 --- a/message/internal_msg_builder.go +++ b/message/internal_msg_builder.go @@ -37,37 +37,45 @@ func (internalMsgBuilder) InternalFailedRequest( chainID ids.ID, requestID uint32, ) InboundMessage { - return &inboundMessage{ - op: op, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: op, + nodeID: nodeID, + }, fields: map[Field]interface{}{ ChainID: chainID[:], RequestID: requestID, }, - nodeID: nodeID, } } func (internalMsgBuilder) InternalTimeout(nodeID ids.NodeID) InboundMessage { - return &inboundMessage{ - op: Timeout, - nodeID: nodeID, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Timeout, + nodeID: nodeID, + }, } } func (internalMsgBuilder) InternalConnected(nodeID ids.NodeID, nodeVersion *version.Application) InboundMessage { - return &inboundMessage{ - op: Connected, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Connected, + nodeID: nodeID, + }, fields: map[Field]interface{}{ VersionStruct: nodeVersion, }, - nodeID: nodeID, } } func (internalMsgBuilder) InternalDisconnected(nodeID ids.NodeID) InboundMessage { - return &inboundMessage{ - op: Disconnected, - nodeID: nodeID, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Disconnected, + nodeID: nodeID, + }, } } @@ -75,20 +83,24 @@ func (internalMsgBuilder) InternalVMMessage( nodeID ids.NodeID, notification uint32, ) InboundMessage { - return &inboundMessage{ - op: Notify, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Notify, + nodeID: nodeID, + }, fields: map[Field]interface{}{ VMMessage: notification, }, - nodeID: nodeID, } } func (internalMsgBuilder) InternalGossipRequest( nodeID ids.NodeID, ) InboundMessage { - return &inboundMessage{ - op: GossipRequest, - nodeID: nodeID, + return &inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: GossipRequest, + nodeID: nodeID, + }, } } diff --git a/message/messages.go b/message/messages.go index 93464d8874c5..4df0c2a600fb 100644 --- a/message/messages.go +++ b/message/messages.go @@ -18,7 +18,7 @@ import ( ) var ( - _ InboundMessage = &inboundMessage{} + _ InboundMessage = &inboundMessageWithPacker{} _ OutboundMessage = &outboundMessageWithPacker{} _ OutboundMessage = &outboundMessageWithProto{} ) @@ -38,7 +38,6 @@ type InboundMessage interface { type inboundMessage struct { op Op bytesSavedCompression int - fields map[Field]interface{} nodeID ids.NodeID expirationTime time.Time onFinishedHandling func() @@ -51,10 +50,9 @@ func (inMsg *inboundMessage) Op() Op { return inMsg.op } // compression. That is, the number of bytes we did not receive over the // network due to the message being compressed. 0 for messages that were not // compressed. -func (inMsg *inboundMessage) BytesSavedCompression() int { return inMsg.bytesSavedCompression } - -// Field returns the value of the specified field in this message -func (inMsg *inboundMessage) Get(field Field) interface{} { return inMsg.fields[field] } +func (inMsg *inboundMessage) BytesSavedCompression() int { + return inMsg.bytesSavedCompression +} // NodeID returns the node that the msg was sent by. func (inMsg *inboundMessage) NodeID() ids.NodeID { return inMsg.nodeID } @@ -71,7 +69,16 @@ func (inMsg *inboundMessage) OnFinishedHandling() { } } -func (inMsg *inboundMessage) String() string { +type inboundMessageWithPacker struct { + inboundMessage + + fields map[Field]interface{} +} + +// Field returns the value of the specified field in this message +func (inMsg *inboundMessageWithPacker) Get(field Field) interface{} { return inMsg.fields[field] } + +func (inMsg *inboundMessageWithPacker) String() string { sb := strings.Builder{} sb.WriteString(fmt.Sprintf("(Op: %s, NodeID: %s", inMsg.op, inMsg.nodeID)) if requestIDIntf, exists := inMsg.fields[RequestID]; exists { @@ -162,12 +169,12 @@ func (outMsg *outboundMessageWithPacker) DecRef() { } // TODO: add other compression algorithms with extended interface -type msgCreatorProtobuf struct { +type msgBuilderProtobuf struct { gzipCompressor compression.Compressor } -func newMsgCreatorProtobuf(maxCompressSize int64) *msgCreatorProtobuf { - return &msgCreatorProtobuf{ +func newMsgBuilderProtobuf(maxCompressSize int64) *msgBuilderProtobuf { + return &msgBuilderProtobuf{ gzipCompressor: compression.NewGzipCompressor(maxCompressSize), } } @@ -179,7 +186,7 @@ func newMsgCreatorProtobuf(maxCompressSize int64) *msgCreatorProtobuf { // NOTE THAT the passed message will be modified if compression is enabled. // TODO: find a way to not in-place modify the message // TODO: implement parsing tests for inbound messages -func (mc *msgCreatorProtobuf) marshal(m *p2ppb.Message, gzipCompress bool) ([]byte, int, error) { +func (mc *msgBuilderProtobuf) marshal(m *p2ppb.Message, gzipCompress bool) ([]byte, int, error) { b, err := proto.Marshal(m) if err != nil { return nil, 0, err @@ -215,7 +222,7 @@ func (mc *msgCreatorProtobuf) marshal(m *p2ppb.Message, gzipCompress bool) ([]by // NOTE THAT the passed message will be updated if compression is enabled. // TODO: find a way to not in-place modify the message -func (mc *msgCreatorProtobuf) createOutbound(op Op, msg *p2ppb.Message, gzipCompress bool, bypassThrottling bool) (*outboundMessageWithProto, error) { +func (mc *msgBuilderProtobuf) createOutbound(op Op, msg *p2ppb.Message, gzipCompress bool, bypassThrottling bool) (*outboundMessageWithProto, error) { b, saved, err := mc.marshal(msg, gzipCompress) if err != nil { return nil, err diff --git a/message/messages_benchmark_test.go b/message/messages_benchmark_test.go index b91ba85a0ad8..9217ead5b07d 100644 --- a/message/messages_benchmark_test.go +++ b/message/messages_benchmark_test.go @@ -40,8 +40,10 @@ func BenchmarkMarshalVersion(b *testing.B) { b.StopTimer() id := ids.GenerateTestID() - inboundMsg := inboundMessage{ - op: Version, + inboundMsg := inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Version, + }, fields: map[Field]interface{}{ NetworkID: uint32(1337), NodeID: uint32(0), @@ -119,8 +121,10 @@ func BenchmarkUnmarshalVersion(b *testing.B) { b.StopTimer() id := ids.GenerateTestID() - inboundMsg := inboundMessage{ - op: Version, + inboundMsg := inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Version, + }, fields: map[Field]interface{}{ NetworkID: uint32(1337), NodeID: uint32(0), diff --git a/message/messages_test.go b/message/messages_test.go index 841900ab79f6..d76fc148a6c6 100644 --- a/message/messages_test.go +++ b/message/messages_test.go @@ -32,8 +32,10 @@ func TestProtoMarshalSizeVersion(t *testing.T) { require := require.New(t) id := ids.GenerateTestID() - inboundMsg := inboundMessage{ - op: Version, + inboundMsg := inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Version, + }, fields: map[Field]interface{}{ NetworkID: uint32(1337), NodeID: uint32(0), @@ -79,8 +81,10 @@ func TestProtoMarshalSizeAncestors(t *testing.T) { require := require.New(t) id := ids.GenerateTestID() - inboundMsg := inboundMessage{ - op: Ancestors, + inboundMsg := inboundMessageWithPacker{ + inboundMessage: inboundMessage{ + op: Ancestors, + }, fields: map[Field]interface{}{ ChainID: id[:], RequestID: uint32(12345), @@ -116,7 +120,7 @@ func TestProtoMarshalSizeAncestors(t *testing.T) { }, } - mc := newMsgCreatorProtobuf(2 * units.MiB) + mc := newMsgBuilderProtobuf(2 * units.MiB) b, _, err := mc.marshal(&protoMsg, compressible) require.NoError(err) @@ -130,7 +134,7 @@ func TestNewOutboundMessageWithProto(t *testing.T) { t.Parallel() require := require.New(t) - mc := newMsgCreatorProtobuf(math.MaxInt64) + mc := newMsgBuilderProtobuf(math.MaxInt64) id := ids.GenerateTestID() tt := []struct { diff --git a/message/outbound_msg_builder.go b/message/outbound_msg_builder.go index 5e26f2a870ce..c0cb9ac4c855 100644 --- a/message/outbound_msg_builder.go +++ b/message/outbound_msg_builder.go @@ -10,7 +10,7 @@ import ( "github.com/ava-labs/avalanchego/utils/ips" ) -var _ OutboundMsgBuilder = &outMsgBuilder{} +var _ OutboundMsgBuilder = &outMsgBuilderWithPacker{} // OutboundMsgBuilder builds outbound messages. Outbound messages are returned // with a reference count of 1. Once the reference count hits 0, the message @@ -159,19 +159,19 @@ type OutboundMsgBuilder interface { ) (OutboundMessage, error) } -type outMsgBuilder struct { +type outMsgBuilderWithPacker struct { c Codec compress bool } -func NewOutboundBuilder(c Codec, enableCompression bool) OutboundMsgBuilder { - return &outMsgBuilder{ +func NewOutboundBuilderWithPacker(c Codec, enableCompression bool) OutboundMsgBuilder { + return &outMsgBuilderWithPacker{ c: c, compress: enableCompression, } } -func (b *outMsgBuilder) Version( +func (b *outMsgBuilderWithPacker) Version( networkID uint32, myTime uint64, ip ips.IPPort, @@ -202,7 +202,7 @@ func (b *outMsgBuilder) Version( ) } -func (b *outMsgBuilder) PeerList(peers []ips.ClaimedIPPort, bypassThrottling bool) (OutboundMessage, error) { +func (b *outMsgBuilderWithPacker) PeerList(peers []ips.ClaimedIPPort, bypassThrottling bool) (OutboundMessage, error) { return b.c.Pack( PeerList, map[Field]interface{}{ @@ -213,7 +213,7 @@ func (b *outMsgBuilder) PeerList(peers []ips.ClaimedIPPort, bypassThrottling boo ) } -func (b *outMsgBuilder) Ping() (OutboundMessage, error) { +func (b *outMsgBuilderWithPacker) Ping() (OutboundMessage, error) { return b.c.Pack( Ping, nil, @@ -222,7 +222,7 @@ func (b *outMsgBuilder) Ping() (OutboundMessage, error) { ) } -func (b *outMsgBuilder) Pong(uptimePercentage uint8) (OutboundMessage, error) { +func (b *outMsgBuilderWithPacker) Pong(uptimePercentage uint8) (OutboundMessage, error) { return b.c.Pack( Pong, map[Field]interface{}{ @@ -233,7 +233,7 @@ func (b *outMsgBuilder) Pong(uptimePercentage uint8) (OutboundMessage, error) { ) } -func (b *outMsgBuilder) GetStateSummaryFrontier( +func (b *outMsgBuilderWithPacker) GetStateSummaryFrontier( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -250,7 +250,7 @@ func (b *outMsgBuilder) GetStateSummaryFrontier( ) } -func (b *outMsgBuilder) StateSummaryFrontier( +func (b *outMsgBuilderWithPacker) StateSummaryFrontier( chainID ids.ID, requestID uint32, summary []byte, @@ -267,7 +267,7 @@ func (b *outMsgBuilder) StateSummaryFrontier( ) } -func (b *outMsgBuilder) GetAcceptedStateSummary( +func (b *outMsgBuilderWithPacker) GetAcceptedStateSummary( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -286,7 +286,7 @@ func (b *outMsgBuilder) GetAcceptedStateSummary( ) } -func (b *outMsgBuilder) AcceptedStateSummary( +func (b *outMsgBuilderWithPacker) AcceptedStateSummary( chainID ids.ID, requestID uint32, summaryIDs []ids.ID, @@ -305,7 +305,7 @@ func (b *outMsgBuilder) AcceptedStateSummary( ) } -func (b *outMsgBuilder) GetAcceptedFrontier( +func (b *outMsgBuilderWithPacker) GetAcceptedFrontier( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -322,7 +322,7 @@ func (b *outMsgBuilder) GetAcceptedFrontier( ) } -func (b *outMsgBuilder) AcceptedFrontier( +func (b *outMsgBuilderWithPacker) AcceptedFrontier( chainID ids.ID, requestID uint32, containerIDs []ids.ID, @@ -341,7 +341,7 @@ func (b *outMsgBuilder) AcceptedFrontier( ) } -func (b *outMsgBuilder) GetAccepted( +func (b *outMsgBuilderWithPacker) GetAccepted( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -362,7 +362,7 @@ func (b *outMsgBuilder) GetAccepted( ) } -func (b *outMsgBuilder) Accepted( +func (b *outMsgBuilderWithPacker) Accepted( chainID ids.ID, requestID uint32, containerIDs []ids.ID, @@ -381,7 +381,7 @@ func (b *outMsgBuilder) Accepted( ) } -func (b *outMsgBuilder) GetAncestors( +func (b *outMsgBuilderWithPacker) GetAncestors( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -400,7 +400,7 @@ func (b *outMsgBuilder) GetAncestors( ) } -func (b *outMsgBuilder) Ancestors( +func (b *outMsgBuilderWithPacker) Ancestors( chainID ids.ID, requestID uint32, containers [][]byte, @@ -417,7 +417,7 @@ func (b *outMsgBuilder) Ancestors( ) } -func (b *outMsgBuilder) Get( +func (b *outMsgBuilderWithPacker) Get( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -436,7 +436,7 @@ func (b *outMsgBuilder) Get( ) } -func (b *outMsgBuilder) Put( +func (b *outMsgBuilderWithPacker) Put( chainID ids.ID, requestID uint32, containerID ids.ID, @@ -455,7 +455,7 @@ func (b *outMsgBuilder) Put( ) } -func (b *outMsgBuilder) PushQuery( +func (b *outMsgBuilderWithPacker) PushQuery( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -476,7 +476,7 @@ func (b *outMsgBuilder) PushQuery( ) } -func (b *outMsgBuilder) PullQuery( +func (b *outMsgBuilderWithPacker) PullQuery( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -495,7 +495,7 @@ func (b *outMsgBuilder) PullQuery( ) } -func (b *outMsgBuilder) Chits( +func (b *outMsgBuilderWithPacker) Chits( chainID ids.ID, requestID uint32, containerIDs []ids.ID, @@ -514,7 +514,7 @@ func (b *outMsgBuilder) Chits( ) } -func (b *outMsgBuilder) ChitsV2( +func (b *outMsgBuilderWithPacker) ChitsV2( chainID ids.ID, requestID uint32, containerIDs []ids.ID, @@ -537,7 +537,7 @@ func (b *outMsgBuilder) ChitsV2( } // Application level request -func (b *outMsgBuilder) AppRequest( +func (b *outMsgBuilderWithPacker) AppRequest( chainID ids.ID, requestID uint32, deadline time.Duration, @@ -557,7 +557,7 @@ func (b *outMsgBuilder) AppRequest( } // Application level response -func (b *outMsgBuilder) AppResponse(chainID ids.ID, requestID uint32, msg []byte) (OutboundMessage, error) { +func (b *outMsgBuilderWithPacker) AppResponse(chainID ids.ID, requestID uint32, msg []byte) (OutboundMessage, error) { return b.c.Pack( AppResponse, map[Field]interface{}{ @@ -571,7 +571,7 @@ func (b *outMsgBuilder) AppResponse(chainID ids.ID, requestID uint32, msg []byte } // Application level gossiped message -func (b *outMsgBuilder) AppGossip(chainID ids.ID, msg []byte) (OutboundMessage, error) { +func (b *outMsgBuilderWithPacker) AppGossip(chainID ids.ID, msg []byte) (OutboundMessage, error) { return b.c.Pack( AppGossip, map[Field]interface{}{