From b33bb3f8582da3dc789f60909d29547e065b87a4 Mon Sep 17 00:00:00 2001 From: Adam Williams Date: Fri, 27 Jul 2018 11:30:01 -0600 Subject: [PATCH] Extract Frame into own package (#11) Moves all frame related code into its own package (`frame`). This was done since `frame.go` was duplicated in the `puslartest` package. This removes that duplication, since now both `pulsar` and `pulsartest` packages can import `frame` with cyclic dependencies. --- client.go | 5 +- client_test.go | 4 +- conn.go | 13 +- conn_integration_test.go | 5 +- conn_test.go | 59 +-- connector_test.go | 9 +- consumer.go | 7 +- consumer_test.go | 13 +- discoverer_test.go | 7 +- frame/doc.go | 17 + frame.go => frame/frame.go | 26 +- frame_checksum.go => frame/frame_checksum.go | 2 +- .../frame_checksum_test.go | 2 +- frame_fuzz.go => frame/frame_fuzz.go | 2 +- frame_test.go => frame/frame_test.go | 4 +- frame/util_test.go | 74 ++++ framedispatcher.go | 22 +- framedispatcher_test.go | 21 +- fuzz/Makefile | 8 +- fuzz/README.md | 4 +- fuzz/corpus-gen/main.go | 4 +- managed_consumer_test.go | 9 +- managed_producer_test.go | 3 +- mocksender_test.go | 11 +- pinger_test.go | 3 +- producer.go | 3 +- producer_test.go | 9 +- pubsub_test.go | 9 +- pulsartest/frame.go | 359 ------------------ pulsartest/server.go | 23 +- subscriptions.go | 10 +- util_test.go | 53 --- 32 files changed, 251 insertions(+), 549 deletions(-) create mode 100644 frame/doc.go rename frame.go => frame/frame.go (94%) rename frame_checksum.go => frame/frame_checksum.go (98%) rename frame_checksum_test.go => frame/frame_checksum_test.go (98%) rename frame_fuzz.go => frame/frame_fuzz.go (99%) rename frame_test.go => frame/frame_test.go (98%) create mode 100644 frame/util_test.go delete mode 100644 pulsartest/frame.go diff --git a/client.go b/client.go index 0b4c20d..b3e6d4c 100644 --- a/client.go +++ b/client.go @@ -20,6 +20,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" ) const ( @@ -103,7 +104,7 @@ func NewClient(cfg ClientConfig) (*Client, error) { pubsub: newPubsub(cnx, dispatcher, subs, &reqID), } - handler := func(f Frame) { + handler := func(f frame.Frame) { // All message types can be handled in // parallel, since their ordering should not matter go c.handleFrame(f) @@ -242,7 +243,7 @@ func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionNam // handleFrame is called by the underlaying conn with // all received Frames. -func (c *Client) handleFrame(f Frame) { +func (c *Client) handleFrame(f frame.Frame) { var err error msgType := f.BaseCmd.GetType() diff --git a/client_test.go b/client_test.go index 27b5e7a..1ca0163 100644 --- a/client_test.go +++ b/client_test.go @@ -18,6 +18,8 @@ import ( "io" "testing" "time" + + "github.com/Comcast/pulsar-client-go/frame" ) func TestClient_ServerInitiatedClose(t *testing.T) { @@ -98,7 +100,7 @@ func TestClient_ClientInitiatedClose(t *testing.T) { // send read errors to srvConnReadErr chan srvConnReadErr := make(chan error, 1) go func() { - srvConnReadErr <- srvConn.read(func(f Frame) {}) + srvConnReadErr <- srvConn.read(func(f frame.Frame) {}) }() // terminate the connection from the client's end diff --git a/conn.go b/conn.go index 7cd9846..0429905 100644 --- a/conn.go +++ b/conn.go @@ -23,6 +23,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" ) // newTCPConn creates a conn using a TCPv4 connection to the given @@ -112,9 +113,9 @@ func (c *conn) closed() <-chan struct{} { // will close the connection. Also if close() is called, // read() will unblock. Once read returns, the conn should // be considered unusable. -func (c *conn) read(frameHandler func(f Frame)) error { +func (c *conn) read(frameHandler func(f frame.Frame)) error { for { - var f Frame + var f frame.Frame if err := f.Decode(c.rc); err != nil { // It's very possible that the connection is already closed at this // point, since any connection closed errors would bubble up @@ -131,7 +132,7 @@ func (c *conn) read(frameHandler func(f Frame)) error { // sendSimpleCmd writes a "simple" frame to the wire. It // is safe to use concurrently. func (c *conn) sendSimpleCmd(cmd api.BaseCommand) error { - return c.writeFrame(&Frame{ + return c.writeFrame(&frame.Frame{ BaseCmd: &cmd, }) } @@ -139,7 +140,7 @@ func (c *conn) sendSimpleCmd(cmd api.BaseCommand) error { // sendPayloadCmd writes a "payload" frame to the wire. It // is safe to use concurrently. func (c *conn) sendPayloadCmd(cmd api.BaseCommand, metadata api.MessageMetadata, payload []byte) error { - return c.writeFrame(&Frame{ + return c.writeFrame(&frame.Frame{ BaseCmd: &cmd, Metadata: &metadata, Payload: payload, @@ -148,13 +149,13 @@ func (c *conn) sendPayloadCmd(cmd api.BaseCommand, metadata api.MessageMetadata, var bufPool = sync.Pool{ New: func() interface{} { - return bytes.NewBuffer(make([]byte, 0, maxFrameSize)) + return bytes.NewBuffer(make([]byte, 0, frame.MaxFrameSize)) }, } // writeFrame encodes the given frame and writes // it to the wire in a thread-safe manner. -func (c *conn) writeFrame(f *Frame) error { +func (c *conn) writeFrame(f *frame.Frame) error { b := bufPool.Get().(*bytes.Buffer) defer bufPool.Put(b) b.Reset() diff --git a/conn_integration_test.go b/conn_integration_test.go index 75fd096..2ae095a 100644 --- a/conn_integration_test.go +++ b/conn_integration_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/golang/protobuf/proto" ) @@ -31,10 +32,10 @@ func TestConn_Int_Connect(t *testing.T) { t.Fatal(err) } - responses := make(chan Frame) + responses := make(chan frame.Frame) readErr := make(chan error, 1) go func() { - readErr <- c.read(func(f Frame) { + readErr <- c.read(func(f frame.Frame) { responses <- f }) }() diff --git a/conn_test.go b/conn_test.go index e94035a..f388f72 100644 --- a/conn_test.go +++ b/conn_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/golang/protobuf/proto" ) @@ -43,7 +44,7 @@ func (m *mockReadCloser) Close() error { } func TestConn_Read(t *testing.T) { - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Connected: &api.CommandConnected{ @@ -65,8 +66,8 @@ func TestConn_Read(t *testing.T) { closedc: make(chan struct{}), } - var gotFrames []Frame - handler := func(f Frame) { gotFrames = append(gotFrames, f) } + var gotFrames []frame.Frame + handler := func(f frame.Frame) { gotFrames = append(gotFrames, f) } // read should read the frame, then reach // and return EOF if err := c.read(handler); err != io.EOF { @@ -93,7 +94,7 @@ func TestConn_Close(t *testing.T) { } // no-op - handler := func(f Frame) {} + handler := func(f frame.Frame) {} // read should reach and return EOF err := c.read(handler) @@ -119,8 +120,8 @@ func TestConn_GarbageInput(t *testing.T) { closedc: make(chan struct{}), } - var gotFrames []Frame - handler := func(f Frame) { + var gotFrames []frame.Frame + handler := func(f frame.Frame) { gotFrames = append(gotFrames, f) } @@ -142,7 +143,7 @@ func TestConn_GarbageInput(t *testing.T) { } func TestConn_TimeoutReader(t *testing.T) { - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Connected: &api.CommandConnected{ @@ -167,8 +168,8 @@ func TestConn_TimeoutReader(t *testing.T) { closedc: make(chan struct{}), } - var gotFrames []Frame - handler := func(f Frame) { + var gotFrames []frame.Frame + handler := func(f frame.Frame) { gotFrames = append(gotFrames, f) } @@ -186,7 +187,7 @@ func TestConn_TimeoutReader(t *testing.T) { } func TestConn_Read_SlowSrc(t *testing.T) { - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Connected: &api.CommandConnected{ @@ -210,8 +211,8 @@ func TestConn_Read_SlowSrc(t *testing.T) { closedc: make(chan struct{}), } - var gotFrames []Frame - handler := func(f Frame) { + var gotFrames []frame.Frame + handler := func(f frame.Frame) { gotFrames = append(gotFrames, f) } // read should read the frame, then reach @@ -235,9 +236,9 @@ func TestConn_Read_MutliFrame(t *testing.T) { N := 16 // create input frames - frames := make([]Frame, N) + frames := make([]frame.Frame, N) for i := range frames { - frames[i] = Frame{ + frames[i] = frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_MESSAGE.Enum(), Message: &api.CommandMessage{ @@ -272,8 +273,8 @@ func TestConn_Read_MutliFrame(t *testing.T) { closedc: make(chan struct{}), } - var gotFrames []Frame - handler := func(f Frame) { gotFrames = append(gotFrames, f) } + var gotFrames []frame.Frame + handler := func(f frame.Frame) { gotFrames = append(gotFrames, f) } // read should read the frames, then reach // and return EOF if err := c.read(handler); err != io.EOF { @@ -299,10 +300,10 @@ func TestConn_writeFrame(t *testing.T) { // mapping of frame payload to frame. // Since they will be written in an undetermined ordered, // this helps look them up and match them. - frames := make([]Frame, N) + frames := make([]frame.Frame, N) for i := 0; i < N; i++ { payload := fmt.Sprintf("%02d - test message", i) // test expects that payload as string sorts properly - frames[i] = Frame{ + frames[i] = frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_MESSAGE.Enum(), Message: &api.CommandMessage{ @@ -346,8 +347,8 @@ func TestConn_writeFrame(t *testing.T) { } }) - var gotFrames []Frame - handler := func(f Frame) { gotFrames = append(gotFrames, f) } + var gotFrames []frame.Frame + handler := func(f frame.Frame) { gotFrames = append(gotFrames, f) } // read the encoded frames, which the handler // will store in `gotFrames`. if err := c.read(handler); err != io.EOF { @@ -372,7 +373,7 @@ func TestConn_writeFrame(t *testing.T) { } func TestConn_TCP_Read(t *testing.T) { - testFrames := map[string]Frame{ + testFrames := map[string]frame.Frame{ "ping": { BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_PING.Enum(), @@ -430,10 +431,10 @@ func TestConn_TCP_Read(t *testing.T) { } // start reading frames off the conn - received := make(chan Frame, len(testFrames)) + received := make(chan frame.Frame, len(testFrames)) readErr := make(chan error, 1) go func() { - readErr <- c.read(func(f Frame) { received <- f }) + readErr <- c.read(func(f frame.Frame) { received <- f }) }() // send frames from the Pulsar server to the conn, and @@ -460,7 +461,7 @@ func TestConn_TCP_Read(t *testing.T) { } func TestConn_TCP_Write(t *testing.T) { - testFrames := map[string]Frame{ + testFrames := map[string]frame.Frame{ "ping": { BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_PING.Enum(), @@ -518,10 +519,10 @@ func TestConn_TCP_Write(t *testing.T) { t.Fatal("timeout waiting for server to receive connection") } - srvReceived := make(chan Frame) + srvReceived := make(chan frame.Frame) go func() { defer close(srvReceived) - srvConn.read(func(f Frame) { + srvConn.read(func(f frame.Frame) { srvReceived <- f }) }() @@ -586,14 +587,14 @@ func TestConn_TCP_ReadLocalClose(t *testing.T) { // send read errors to srvConnReadErr chan srvConnReadErr := make(chan error, 1) go func() { - srvConnReadErr <- srvConn.read(func(f Frame) {}) + srvConnReadErr <- srvConn.read(func(f frame.Frame) {}) }() // start reading from the conn. // send read errors to readErr chan readErr := make(chan error, 1) go func() { - readErr <- c.read(func(f Frame) {}) + readErr <- c.read(func(f frame.Frame) {}) }() // close the connection from the local conn's end @@ -650,7 +651,7 @@ func TestConn_TCP_ReadRemoteClose(t *testing.T) { // Send read errors to readErr chan readErr := make(chan error, 1) go func() { - readErr <- c.read(func(f Frame) {}) + readErr <- c.read(func(f frame.Frame) {}) }() // server initiated connection closure diff --git a/connector_test.go b/connector_test.go index fa62fa7..a990b53 100644 --- a/connector_test.go +++ b/connector_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/golang/protobuf/proto" ) @@ -48,7 +49,7 @@ func TestConnector(t *testing.T) { connected := api.CommandConnected{ ServerVersion: proto.String("this is a test"), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Connected: &connected, @@ -115,7 +116,7 @@ func TestConnector_Timeout(t *testing.T) { // There should be nothing in the dispatcher, // so `connected` should fail. - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Connected: &api.CommandConnected{ @@ -157,7 +158,7 @@ func TestConnector_Error(t *testing.T) { RequestId: proto.Uint64(undefRequestID), Message: proto.String("there was an error of sorts"), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_ERROR.Enum(), Error: &errorMsg, @@ -183,7 +184,7 @@ func TestConnector_Error(t *testing.T) { // There should be nothing in the dispatcher, // so `connected` should fail. - f = Frame{ + f = frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Connected: &api.CommandConnected{ diff --git a/consumer.go b/consumer.go index 55b5805..8c76692 100644 --- a/consumer.go +++ b/consumer.go @@ -19,6 +19,7 @@ import ( "sync" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/golang/protobuf/proto" ) @@ -198,7 +199,7 @@ func (c *Consumer) Unsubscribe(ctx context.Context) error { // handleCloseConsumer should be called when a CLOSE_CONSUMER message is received // associated with this consumer. -func (c *Consumer) handleCloseConsumer(f Frame) error { +func (c *Consumer) handleCloseConsumer(f frame.Frame) error { c.mu.Lock() defer c.mu.Unlock() @@ -220,7 +221,7 @@ func (c *Consumer) ReachedEndOfTopic() <-chan struct{} { // handleReachedEndOfTopic should be called for all received REACHED_END_OF_TOPIC messages // associated with this consumer. -func (c *Consumer) handleReachedEndOfTopic(f Frame) error { +func (c *Consumer) handleReachedEndOfTopic(f frame.Frame) error { c.mu.Lock() defer c.mu.Unlock() @@ -300,7 +301,7 @@ func (c *Consumer) RedeliverOverflow(ctx context.Context) (int, error) { // handleMessage should be called for all MESSAGE messages received for // this consumer. -func (c *Consumer) handleMessage(f Frame) error { +func (c *Consumer) handleMessage(f frame.Frame) error { m := Message{ Topic: c.topic, consumerID: c.consumerID, diff --git a/consumer_test.go b/consumer_test.go index 9e358af..c639c22 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/golang/protobuf/proto" ) @@ -71,7 +72,7 @@ func TestConsumer_Close_Success(t *testing.T) { expected := api.CommandSuccess{ RequestId: proto.Uint64(id), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_SUCCESS.Enum(), Success: &expected, @@ -107,7 +108,7 @@ func TestConsumer_handleMessage(t *testing.T) { c := newConsumer(&ms, dispatcher, "test", &reqID, consID, make(chan Message, 1)) - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_MESSAGE.Enum(), Message: &api.CommandMessage{ @@ -157,7 +158,7 @@ func TestConsumer_handleMessage_fullQueue(t *testing.T) { queueSize := 3 c := newConsumer(&ms, dispatcher, "test", &reqID, consID, make(chan Message, queueSize)) - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_MESSAGE.Enum(), Message: &api.CommandMessage{ @@ -228,7 +229,7 @@ func TestConsumer_handleCloseConsumer(t *testing.T) { t.Logf("Closed() blocked") } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CLOSE_CONSUMER.Enum(), CloseConsumer: &api.CommandCloseConsumer{ @@ -265,7 +266,7 @@ func TestConsumer_handleReachedEndOfTopic(t *testing.T) { t.Logf("ReachedEndOfTopic() blocked") } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_REACHED_END_OF_TOPIC.Enum(), ReachedEndOfTopic: &api.CommandReachedEndOfTopic{ @@ -301,7 +302,7 @@ func TestConsumer_RedeliverOverflow(t *testing.T) { // the MessageIdData must be unique for each message, // otherwise the consumer will consider them duplicates // and not store them in overflow - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_MESSAGE.Enum(), Message: &api.CommandMessage{ diff --git a/discoverer_test.go b/discoverer_test.go index 0d5215f..3a0fdc1 100644 --- a/discoverer_test.go +++ b/discoverer_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/golang/protobuf/proto" ) @@ -52,7 +53,7 @@ func TestDiscoverer_PartitionedMetadata(t *testing.T) { RequestId: proto.Uint64(id), Message: proto.String("hi"), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_PARTITIONED_METADATA_RESPONSE.Enum(), PartitionMetadataResponse: &expected, @@ -103,7 +104,7 @@ func TestDiscoverer_LookupTopic(t *testing.T) { RequestId: proto.Uint64(id), Message: proto.String("hi"), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_LOOKUP_RESPONSE.Enum(), LookupTopicResponse: &expected, @@ -153,7 +154,7 @@ func TestDiscoverer_LookupTopic_BadRequestID(t *testing.T) { RequestId: proto.Uint64(id + 1), // incorrect RequestID Message: proto.String("hi"), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_LOOKUP_RESPONSE.Enum(), LookupTopicResponse: &expected, diff --git a/frame/doc.go b/frame/doc.go new file mode 100644 index 0000000..6a1492f --- /dev/null +++ b/frame/doc.go @@ -0,0 +1,17 @@ +// Copyright 2018 Comcast Cable Communications Management, LLC +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package frame provides the ability to encode and decode +// to and from Pulsar's custom binary protocol. The protocol +// is a light wrapper around protobuf messages. +package frame diff --git a/frame.go b/frame/frame.go similarity index 94% rename from frame.go rename to frame/frame.go index 8c95581..42a8e42 100644 --- a/frame.go +++ b/frame/frame.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pulsar +package frame import ( "bytes" @@ -23,11 +23,11 @@ import ( "github.com/golang/protobuf/proto" ) -// maxFrameSize is defined by the Pulsar spec with a single +// MaxFrameSize is defined by the Pulsar spec with a single // sentence: "The maximum allowable size of a single frame is 5 MB." // // https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Framing-5l6bym -const maxFrameSize = 5 * 1024 * 1024 // 5mb +const MaxFrameSize = 5 * 1024 * 1024 // 5mb // magicNumber is a 2-byte byte array (0x0e01) // identifying an optional checksum in the message, @@ -113,8 +113,8 @@ func (f *Frame) Decode(r io.Reader) error { // is the size of all the _following_ bytes). frameSize := int(totalSize) + 4 // ensure reasonable frameSize - if frameSize > maxFrameSize { - return fmt.Errorf("frame size (%d) cannot be greater than max frame size (%d)", frameSize, maxFrameSize) + if frameSize > MaxFrameSize { + return fmt.Errorf("frame size (%d) cannot be greater than max frame size (%d)", frameSize, MaxFrameSize) } // Wrap our reader so that we can only read @@ -130,8 +130,8 @@ func (f *Frame) Decode(r io.Reader) error { } cmdSize := binary.BigEndian.Uint32(buf32) // guard against allocating large buffer - if cmdSize > maxFrameSize { - return fmt.Errorf("frame command size (%d) cannot b greater than max frame size (%d)", cmdSize, maxFrameSize) + if cmdSize > MaxFrameSize { + return fmt.Errorf("frame command size (%d) cannot b greater than max frame size (%d)", cmdSize, MaxFrameSize) } // Read protobuf encoded BaseCommand @@ -193,8 +193,8 @@ func (f *Frame) Decode(r io.Reader) error { // Read metadataSize metadataSize := binary.BigEndian.Uint32(buf32) // guard against allocating large buffer - if metadataSize > maxFrameSize { - return fmt.Errorf("frame metadata size (%d) cannot b greater than max frame size (%d)", metadataSize, maxFrameSize) + if metadataSize > MaxFrameSize { + return fmt.Errorf("frame metadata size (%d) cannot b greater than max frame size (%d)", metadataSize, MaxFrameSize) } // Read protobuf encoded metadata @@ -211,8 +211,8 @@ func (f *Frame) Decode(r io.Reader) error { // the payload and can be any sequence of bytes. if lr.N > 0 { // guard against allocating large buffer - if lr.N > maxFrameSize { - return fmt.Errorf("frame payload size (%d) cannot be greater than max frame size (%d)", lr.N, maxFrameSize) + if lr.N > MaxFrameSize { + return fmt.Errorf("frame payload size (%d) cannot be greater than max frame size (%d)", lr.N, MaxFrameSize) } f.Payload = make([]byte, lr.N) if _, err = io.ReadFull(lr, f.Payload); err != nil { @@ -256,8 +256,8 @@ func (f *Frame) Encode(w io.Writer) error { totalSize += 6 + metadataSize + 4 + uint32(len(f.Payload)) } - if frameSize := totalSize + 4; frameSize > maxFrameSize { - return fmt.Errorf("encoded frame size (%d bytes) is larger than max allowed frame size (%d bytes)", frameSize, maxFrameSize) + if frameSize := totalSize + 4; frameSize > MaxFrameSize { + return fmt.Errorf("encoded frame size (%d bytes) is larger than max allowed frame size (%d bytes)", frameSize, MaxFrameSize) } // write totalSize diff --git a/frame_checksum.go b/frame/frame_checksum.go similarity index 98% rename from frame_checksum.go rename to frame/frame_checksum.go index f62f0b9..d05688f 100644 --- a/frame_checksum.go +++ b/frame/frame_checksum.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pulsar +package frame import ( "hash" diff --git a/frame_checksum_test.go b/frame/frame_checksum_test.go similarity index 98% rename from frame_checksum_test.go rename to frame/frame_checksum_test.go index 6507b19..10c3ae4 100644 --- a/frame_checksum_test.go +++ b/frame/frame_checksum_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pulsar +package frame import ( "bytes" diff --git a/frame_fuzz.go b/frame/frame_fuzz.go similarity index 99% rename from frame_fuzz.go rename to frame/frame_fuzz.go index cf8da1f..42cb195 100644 --- a/frame_fuzz.go +++ b/frame/frame_fuzz.go @@ -13,7 +13,7 @@ // +build gofuzz -package pulsar +package frame import ( "bytes" diff --git a/frame_test.go b/frame/frame_test.go similarity index 98% rename from frame_test.go rename to frame/frame_test.go index be99bcc..59890fa 100644 --- a/frame_test.go +++ b/frame/frame_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pulsar +package frame import ( "bytes" @@ -266,7 +266,7 @@ func TestFrameEncode_MaxFrameSize(t *testing.T) { SequenceId: proto.Uint64(0), PublishTime: proto.Uint64(1513027321000), }, - Payload: make([]byte, maxFrameSize), // payload + metadata + baseCmd will be > maxFrameSize + Payload: make([]byte, MaxFrameSize), // payload + metadata + baseCmd will be > maxFrameSize } var out bytes.Buffer diff --git a/frame/util_test.go b/frame/util_test.go new file mode 100644 index 0000000..b8b7b3d --- /dev/null +++ b/frame/util_test.go @@ -0,0 +1,74 @@ +// Copyright 2018 Comcast Cable Communications Management, LLC +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package frame + +import ( + "bufio" + "encoding/hex" + "fmt" + "strings" +) + +// ################ +// helper functions +// ################ + +// hexUndump is the inverse of hex.Dump. It massages the output of hex.Dump +// into a hex string, and then runs hex.DecodeString over it. +// Example line: +// 00000000 00 00 00 19 00 00 00 15 08 03 1a 11 0a 0d 50 75 |..............Pu| +// 8<------------------------------------------------>58 +// It is also capable of decoding BSD's hexdump output. + +func hexUndump(h string) []byte { + var out string + + s := bufio.NewScanner(strings.NewReader(strings.TrimSpace(h))) + for s.Scan() { + line := s.Text() + if len(line) == 0 { + panic(fmt.Sprintf("invalid hex line: %q", line)) + } + // first starting delimiter, which is the first space which separates the + // offset from the hex encoded data + firstSpace := strings.IndexRune(line, ' ') + if firstSpace == -1 { + panic(fmt.Sprintf("invalid hex line: %q", line)) + } + line = line[firstSpace:] + // possible ending delimiter, which is the start of ASCII representation + // of the data + if ascii := strings.IndexRune(line, '|'); ascii > 0 { + line = line[:ascii] + } + + // remove spaces between hex numbers + line = strings.Replace(line, " ", "", -1) + + end := 32 + if len(line) < end { + end = len(line) + } + out += line[:end] + } + if err := s.Err(); err != nil { + panic(err) + } + + b, err := hex.DecodeString(out) + if err != nil { + panic(err) + } + return b +} diff --git a/framedispatcher.go b/framedispatcher.go index 1b9e8aa..ea01851 100644 --- a/framedispatcher.go +++ b/framedispatcher.go @@ -17,6 +17,8 @@ import ( "errors" "fmt" "sync" + + "github.com/Comcast/pulsar-client-go/frame" ) // newFrameDispatcher returns an instantiated frameDispatcher. @@ -57,7 +59,7 @@ type frameDispatcher struct { // then the `done` channel is closed, signaling to the response // side that the response is not expected/needed. type asyncResp struct { - resp chan<- Frame + resp chan<- frame.Frame done <-chan struct{} } @@ -73,7 +75,7 @@ type prodSeqKey struct { // id (Pong, Connected responses). Only one outstanding global request // is allowed at a time. Callers should always call cancel, specifically // when they're not interested in the response. -func (f *frameDispatcher) registerGlobal() (response <-chan Frame, cancel func(), err error) { +func (f *frameDispatcher) registerGlobal() (response <-chan frame.Frame, cancel func(), err error) { var mu sync.Mutex done := make(chan struct{}) cancel = func() { @@ -91,7 +93,7 @@ func (f *frameDispatcher) registerGlobal() (response <-chan Frame, cancel func() done = nil } - resp := make(chan Frame) + resp := make(chan frame.Frame) f.globalMu.Lock() if f.global != nil { @@ -109,7 +111,7 @@ func (f *frameDispatcher) registerGlobal() (response <-chan Frame, cancel func() // notifyGlobal should be called with response frames that have // no identifying id (Pong, Connected). -func (f *frameDispatcher) notifyGlobal(frame Frame) error { +func (f *frameDispatcher) notifyGlobal(frame frame.Frame) error { f.globalMu.Lock() a := f.global // ensure additional calls to notify @@ -134,7 +136,7 @@ func (f *frameDispatcher) notifyGlobal(frame Frame) error { // id tuples to correlate them to their request. Callers should always call cancel, // specifically when they're not interested in the response. It is an error // to have multiple outstanding requests with the same id tuple. -func (f *frameDispatcher) registerProdSeqIDs(producerID, sequenceID uint64) (response <-chan Frame, cancel func(), err error) { +func (f *frameDispatcher) registerProdSeqIDs(producerID, sequenceID uint64) (response <-chan frame.Frame, cancel func(), err error) { key := prodSeqKey{producerID, sequenceID} var mu sync.Mutex @@ -154,7 +156,7 @@ func (f *frameDispatcher) registerProdSeqIDs(producerID, sequenceID uint64) (res done = nil } - resp := make(chan Frame) + resp := make(chan frame.Frame) f.prodSeqIDsMu.Lock() if _, ok := f.prodSeqIDs[key]; ok { @@ -172,7 +174,7 @@ func (f *frameDispatcher) registerProdSeqIDs(producerID, sequenceID uint64) (res // notifyProdSeqIDs should be called with response frames that have // (producerID, sequenceID) id tuples to correlate them to their requests. -func (f *frameDispatcher) notifyProdSeqIDs(producerID, sequenceID uint64, frame Frame) error { +func (f *frameDispatcher) notifyProdSeqIDs(producerID, sequenceID uint64, frame frame.Frame) error { key := prodSeqKey{producerID, sequenceID} f.prodSeqIDsMu.Lock() @@ -200,7 +202,7 @@ func (f *frameDispatcher) notifyProdSeqIDs(producerID, sequenceID uint64, frame // id to correlate them to their request. Callers should always call cancel, // specifically when they're not interested in the response. It is an error // to have multiple outstanding requests with the id. -func (f *frameDispatcher) registerReqID(requestID uint64) (response <-chan Frame, cancel func(), err error) { +func (f *frameDispatcher) registerReqID(requestID uint64) (response <-chan frame.Frame, cancel func(), err error) { var mu sync.Mutex done := make(chan struct{}) cancel = func() { @@ -218,7 +220,7 @@ func (f *frameDispatcher) registerReqID(requestID uint64) (response <-chan Frame done = nil } - resp := make(chan Frame) + resp := make(chan frame.Frame) f.reqIDMu.Lock() if _, ok := f.reqIDs[requestID]; ok { @@ -236,7 +238,7 @@ func (f *frameDispatcher) registerReqID(requestID uint64) (response <-chan Frame // notifyReqID should be called with response frames that have // a requestID to correlate them to their requests. -func (f *frameDispatcher) notifyReqID(requestID uint64, frame Frame) error { +func (f *frameDispatcher) notifyReqID(requestID uint64, frame frame.Frame) error { f.reqIDMu.Lock() // fetch response channel from cubbyhole a, ok := f.reqIDs[requestID] diff --git a/framedispatcher_test.go b/framedispatcher_test.go index 3ed4444..a4db4a4 100644 --- a/framedispatcher_test.go +++ b/framedispatcher_test.go @@ -18,11 +18,12 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" ) type dispatcherTestCase struct { - register func() (response <-chan Frame, cancel func(), err error) - notify func(frame Frame) error + register func() (response <-chan frame.Frame, cancel func(), err error) + notify func(frame frame.Frame) error } // dispatcherTestCases allows for the 3 types of dispatchers @@ -37,12 +38,12 @@ func dispatcherTestCases() map[string]dispatcherTestCase { notify: fd.notifyGlobal, }, "prodSeqID": { - register: func() (<-chan Frame, func(), error) { return fd.registerProdSeqIDs(1, 2) }, - notify: func(f Frame) error { return fd.notifyProdSeqIDs(1, 2, f) }, + register: func() (<-chan frame.Frame, func(), error) { return fd.registerProdSeqIDs(1, 2) }, + notify: func(f frame.Frame) error { return fd.notifyProdSeqIDs(1, 2, f) }, }, "reqID": { - register: func() (<-chan Frame, func(), error) { return fd.registerReqID(42) }, - notify: func(f Frame) error { return fd.notifyReqID(42, f) }, + register: func() (<-chan frame.Frame, func(), error) { return fd.registerReqID(42) }, + notify: func(f frame.Frame) error { return fd.notifyReqID(42, f) }, }, } } @@ -61,7 +62,7 @@ func TestFrameDispatcher_Success(t *testing.T) { } // response frame. The type and contents are arbitrary - expected := Frame{ + expected := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Pong: &api.CommandPong{}, @@ -132,7 +133,7 @@ func TestFrameDispatcher_DupNotify(t *testing.T) { defer cancel() // response frame. The type and contents are arbitrary - expected := Frame{ + expected := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Pong: &api.CommandPong{}, @@ -185,7 +186,7 @@ func TestFrameDispatcher_Unexpected(t *testing.T) { // no register is called // response frame. The type and contents are arbitrary - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Pong: &api.CommandPong{}, @@ -218,7 +219,7 @@ func TestFrameDispatcher_Timeout(t *testing.T) { cancel() // response frame. The type and contents are arbitrary - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Pong: &api.CommandPong{}, diff --git a/fuzz/Makefile b/fuzz/Makefile index 9c49c41..8e735ca 100644 --- a/fuzz/Makefile +++ b/fuzz/Makefile @@ -26,8 +26,8 @@ fuzz: pulsar-fuzz.zip go-fuzz -bin=./pulsar-fuzz.zip -workdir=$@-workdir # Create fuzz binary/zip for Fuzz func -pulsar-fuzz.zip: ../frame_fuzz.go ../frame.go - go-fuzz-build -func Fuzz -o $@ $(shell go list -e ../) +pulsar-fuzz.zip: ../frame/frame_fuzz.go ../frame/frame.go + go-fuzz-build -func Fuzz -o $@ $(shell go list -e ../frame) # Start fuzzing (FuzzReEncode function)! .PHONY: fuzz-reencode @@ -37,8 +37,8 @@ fuzz-reencode: pulsar-fuzzreencode.zip go-fuzz -bin=./pulsar-fuzzreencode.zip -workdir=$@-workdir # Create fuzz binary/zip for FuzzReEncode func -pulsar-fuzzreencode.zip: ../frame_fuzz.go ../frame.go - go-fuzz-build -func FuzzReEncode -o $@ $(shell go list -e ../) +pulsar-fuzzreencode.zip: ../frame/frame_fuzz.go ../frame/frame.go + go-fuzz-build -func FuzzReEncode -o $@ $(shell go list -e ../frame) # Build corpus-gen program pulsar-corpus-gen: corpus-gen/main.go diff --git a/fuzz/README.md b/fuzz/README.md index 108e0b6..89b5171 100644 --- a/fuzz/README.md +++ b/fuzz/README.md @@ -1,7 +1,7 @@ fuzz ==== -The [`Frame.Decode(r io.Reader)`](../frame.go) method decodes data from a TCP connection and +The [`Frame.Decode(r io.Reader)`](../frame/frame.go) method decodes data from a TCP connection and converts it into a Pulsar frame. Since it accepts traffic from the network, it a good candidate for fuzz testing. From the `go-fuzz` README: @@ -9,7 +9,7 @@ From the `go-fuzz` README: > and is especially useful for hardening of systems that parse inputs from potentially > malicious users (e.g. anything accepted over a network). -The [`frame_fuzz.go`](../frame_fuzz.go) file contains the two entrypoints to the fuzzer. +The [`frame_fuzz.go`](../frame/frame_fuzz.go) file contains the two entrypoints to the fuzzer. ## Prerequisites diff --git a/fuzz/corpus-gen/main.go b/fuzz/corpus-gen/main.go index 964e278..0cd81a6 100644 --- a/fuzz/corpus-gen/main.go +++ b/fuzz/corpus-gen/main.go @@ -21,7 +21,7 @@ import ( "os" "path" - pulsar "github.com/Comcast/pulsar-client-go" + "github.com/Comcast/pulsar-client-go/frame" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" @@ -63,7 +63,7 @@ func handlePacket(outputDir string, pkt gopacket.Packet) { return } - var f pulsar.Frame + var f frame.Frame if err := f.Decode(bytes.NewReader(data)); err != nil { fmt.Fprintln(os.Stderr, err, data) return diff --git a/managed_consumer_test.go b/managed_consumer_test.go index 514094a..c0dc511 100644 --- a/managed_consumer_test.go +++ b/managed_consumer_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/Comcast/pulsar-client-go/pulsartest" "github.com/golang/protobuf/proto" ) @@ -69,7 +70,7 @@ func TestManagedConsumer(t *testing.T) { // Send message to consumer payload := []byte("hola mundo") - message := pulsartest.Frame{ + message := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_MESSAGE.Enum(), Message: &api.CommandMessage{ @@ -152,11 +153,11 @@ func TestManagedConsumer_ReceiveAsync(t *testing.T) { go mc.ReceiveAsync(ctx, received) // send messages to consumer - sent := make([]pulsartest.Frame, queueSize+1) + sent := make([]frame.Frame, queueSize+1) for i := range sent { // Send message to consumer payload := []byte(fmt.Sprintf("%d hola mundo", i)) - sent[i] = pulsartest.Frame{ + sent[i] = frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_MESSAGE.Enum(), Message: &api.CommandMessage{ @@ -321,7 +322,7 @@ func TestManagedConsumer_ConsumerClosed(t *testing.T) { } // This will be sent to the client, closing the Consumer. - closeConsumer := pulsartest.Frame{ + closeConsumer := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CLOSE_CONSUMER.Enum(), CloseConsumer: &api.CommandCloseConsumer{ diff --git a/managed_producer_test.go b/managed_producer_test.go index a422b1f..c58792a 100644 --- a/managed_producer_test.go +++ b/managed_producer_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/Comcast/pulsar-client-go/pulsartest" "github.com/golang/protobuf/proto" ) @@ -237,7 +238,7 @@ func TestManagedProducer_ProducerClosed(t *testing.T) { } // This will be sent to the client, closing the Producer. - closeProducer := pulsartest.Frame{ + closeProducer := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CLOSE_PRODUCER.Enum(), CloseProducer: &api.CommandCloseProducer{ diff --git a/mocksender_test.go b/mocksender_test.go index 92edb0f..defe335 100644 --- a/mocksender_test.go +++ b/mocksender_test.go @@ -17,20 +17,21 @@ import ( "sync" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" ) // mockSender implements the sender interface type mockSender struct { mu sync.Mutex // protects following - frames []Frame + frames []frame.Frame closedc chan struct{} } -func (m *mockSender) getFrames() []Frame { +func (m *mockSender) getFrames() []frame.Frame { m.mu.Lock() defer m.mu.Unlock() - cp := make([]Frame, len(m.frames)) + cp := make([]frame.Frame, len(m.frames)) copy(cp, m.frames) return cp @@ -40,7 +41,7 @@ func (m *mockSender) sendSimpleCmd(cmd api.BaseCommand) error { m.mu.Lock() defer m.mu.Unlock() - m.frames = append(m.frames, Frame{ + m.frames = append(m.frames, frame.Frame{ BaseCmd: &cmd, }) @@ -51,7 +52,7 @@ func (m *mockSender) sendPayloadCmd(cmd api.BaseCommand, metadata api.MessageMet m.mu.Lock() defer m.mu.Unlock() - m.frames = append(m.frames, Frame{ + m.frames = append(m.frames, frame.Frame{ BaseCmd: &cmd, Metadata: &metadata, Payload: payload, diff --git a/pinger_test.go b/pinger_test.go index 4ba3733..e6a1123 100644 --- a/pinger_test.go +++ b/pinger_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" ) func TestPinger_HandlePing(t *testing.T) { @@ -56,7 +57,7 @@ func TestPinger_Ping(t *testing.T) { time.Sleep(100 * time.Millisecond) - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_PONG.Enum(), Pong: &api.CommandPong{}, diff --git a/producer.go b/producer.go index 3551c44..8191ff5 100644 --- a/producer.go +++ b/producer.go @@ -21,6 +21,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/golang/protobuf/proto" ) @@ -188,7 +189,7 @@ func (p *Producer) Close(ctx context.Context) error { // // When receiving the CloseProducer, the client is expected to go through the service discovery lookup again and recreate the producer again. The TCP connection is not being affected. // https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#command-closeproducer -func (p *Producer) handleCloseProducer(f Frame) error { +func (p *Producer) handleCloseProducer(f frame.Frame) error { p.mu.Lock() defer p.mu.Unlock() diff --git a/producer_test.go b/producer_test.go index c5a543a..46bf62e 100644 --- a/producer_test.go +++ b/producer_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/golang/protobuf/proto" ) @@ -54,7 +55,7 @@ func TestProducer_Send_Success(t *testing.T) { ProducerId: proto.Uint64(prodID), SequenceId: proto.Uint64(0), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_SEND_RECEIPT.Enum(), SendReceipt: &expected, @@ -106,7 +107,7 @@ func TestProducer_Send_Error(t *testing.T) { // Allow goroutine time to complete time.Sleep(100 * time.Millisecond) - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_SEND_ERROR.Enum(), SendError: &api.CommandSendError{ @@ -160,7 +161,7 @@ func TestProducer_Close_Success(t *testing.T) { expected := api.CommandSuccess{ RequestId: proto.Uint64(id), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_SUCCESS.Enum(), Success: &expected, @@ -203,7 +204,7 @@ func TestProducer_handleCloseProducer(t *testing.T) { t.Logf("Closed() blocked") } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CLOSE_PRODUCER.Enum(), CloseProducer: &api.CommandCloseProducer{ diff --git a/pubsub_test.go b/pubsub_test.go index b50c5a5..7133bcf 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/golang/protobuf/proto" ) @@ -57,7 +58,7 @@ func TestPubsub_Subscribe_Success(t *testing.T) { success := api.CommandSuccess{ RequestId: proto.Uint64(id), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_SUCCESS.Enum(), Success: &success, @@ -116,7 +117,7 @@ func TestPubsub_Subscribe_Error(t *testing.T) { RequestId: proto.Uint64(id), Message: proto.String("oh noo"), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_ERROR.Enum(), Error: &cmdErr, @@ -175,7 +176,7 @@ func TestPubsub_Producer_Success(t *testing.T) { LastSequenceId: proto.Int64(-1), ProducerName: proto.String(prodName), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_PRODUCER_SUCCESS.Enum(), ProducerSuccess: &success, @@ -241,7 +242,7 @@ func TestPubsub_Producer_Error(t *testing.T) { RequestId: proto.Uint64(id), Message: proto.String("oh noo"), } - f := Frame{ + f := frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_ERROR.Enum(), Error: &cmdErr, diff --git a/pulsartest/frame.go b/pulsartest/frame.go deleted file mode 100644 index e75482d..0000000 --- a/pulsartest/frame.go +++ /dev/null @@ -1,359 +0,0 @@ -// Copyright 2018 Comcast Cable Communications Management, LLC -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pulsartest - -import ( - "bytes" - "encoding/binary" - "fmt" - "hash" - "hash/crc32" - "io" - - "github.com/Comcast/pulsar-client-go/api" - "github.com/golang/protobuf/proto" -) - -// ============== -// NOTE: Frame is duplicated from the pulsar package. This avoids -// cyclical imports. -// ============== - -// maxFrameSize is defined by the Pulsar spec with a single -// sentence: "The maximum allowable size of a single frame is 5 MB." -// -// https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Framing-5l6bym -const maxFrameSize = 5 * 1024 * 1024 // 5mb - -// magicNumber is a 2-byte byte array (0x0e01) -// identifying an optional checksum in the message, -// as defined by the pulsar protocol -// https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Payloadcommands-kbk8xf -var magicNumber = [...]byte{0x0e, 0x01} - -// Frame represents a pulsar message frame. -// It can be used to encode and decode messages -// to and from the Pulsar binary wire format. -// -// The binary protocol is outlined here: -// https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/ -// But the Java source should be considered the canonical format. -// -// All sizes are passed as 4-byte unsigned big endian integers. -// -// "Simple" command frame format: -// -// +------------------------------------------------------------------------+ -// | totalSize (uint32) | commandSize (uint32) | message (protobuf encoded) | -// | 4 bytes | 4 bytes | var length | -// |====================|======================|============================| -// | size of everything | size of the message | | -// | following these 4 | | | -// | bytes | | | -// +------------------------------------------------------------------------+ -// -// "Payload" command frame format (It has the same 3 fields as a "simple" command, plus the following): -// -// +-------------------------------------------------------------------------------------------------------------------------------------------------+ -// | "Simple" fields | magicNumber (0x0e01) | checksum (CRC32-C) | metadataSize (uint32) | metadata (protobuf encoded) | payload (bytes) | -// | var length | 2 bytes | 4 bytes | 4 bytes | var length | totalSize - (SUM others) | -// |=================|======================|====================|=======================|=============================|=============================| -// | | OPTIONAL If present, | OPTIONAL Checksum | size of the metadata | | Any sequence of bytes, | -// | | indicates following | of the following | | | possibly compressed and | -// | | 4 bytes are checksum | bytes | | | or encrypted (see metadata) | -// +-------------------------------------------------------------------------------------------------------------------------------------------------+ -// -type Frame struct { - // BaseCmd is a required field - BaseCmd *api.BaseCommand - - // The following fields are optional. - // If present, the frame is a "Payload" - // command, as opposed to a "Simple" command - // if there's only the BaseCmd. - Metadata *api.MessageMetadata - Payload []byte -} - -// Equal returns true if the other Frame is -// equal to the receiver frame, false otherwise. -func (f *Frame) Equal(other Frame) bool { - if !proto.Equal(f.BaseCmd, other.BaseCmd) { - return false - } - - if !proto.Equal(f.Metadata, other.Metadata) { - return false - } - - return bytes.Equal(f.Payload, other.Payload) -} - -// Decode the pulsar binary protocol from r into -// the receiver frame. Returns any errors encountered. -func (f *Frame) Decode(r io.Reader) error { - var err error - - // reusable buffer for 4-byte uint32s - buf32 := make([]byte, 4) - - // Read totalSize - // totalSize: The size of the frame, - // counting everything that comes after it (in bytes) - if _, err = io.ReadFull(r, buf32); err != nil { - return err - } - totalSize := binary.BigEndian.Uint32(buf32) - - // frameSize is the total length of the frame (totalSize - // is the size of all the _following_ bytes). - frameSize := int(totalSize) + 4 - // ensure reasonable frameSize - if frameSize > maxFrameSize { - return fmt.Errorf("frame size (%d) cannot be greater than max frame size (%d)", frameSize, maxFrameSize) - } - - // Wrap our reader so that we can only read - // bytes from our frame - lr := &io.LimitedReader{ - N: int64(totalSize), - R: r, - } - - // Read cmdSize - if _, err = io.ReadFull(lr, buf32); err != nil { - return err - } - cmdSize := binary.BigEndian.Uint32(buf32) - // guard against allocating large buffer - if cmdSize > maxFrameSize { - return fmt.Errorf("frame command size (%d) cannot b greater than max frame size (%d)", cmdSize, maxFrameSize) - } - - // Read protobuf encoded BaseCommand - cmdBuf := make([]byte, cmdSize) - if _, err = io.ReadFull(lr, cmdBuf); err != nil { - return err - } - f.BaseCmd = new(api.BaseCommand) - if err = proto.Unmarshal(cmdBuf, f.BaseCmd); err != nil { - return err - } - - // There are 3 possibilities for the following fields: - // - EOF: If so, this is a "simple" command. No more parsing required. - // - 2-byte magic number: Indicates the following 4 bytes are a checksum - // - 4-byte metadata size - - // The message may optionally stop here. If so, - // this is a "simple" command. - if lr.N <= 0 { - return nil - } - - // Optionally, the next 2 bytes may be the magicNumber. If - // so, it indicates that the following 4 bytes are a checksum. - // If not, the following 2 bytes (plus the 2 bytes already read), - // are the metadataSize, which is why a 4 byte buffer is used. - if _, err = io.ReadFull(lr, buf32); err != nil { - return err - } - - // Check for magicNumber which indicates a checksum - var chksum frameChecksum - var expectedChksum []byte - if magicNumber[0] == buf32[0] && magicNumber[1] == buf32[1] { - expectedChksum = make([]byte, 4) - - // We already read the 2-byte magicNumber and the - // initial 2 bytes of the checksum - expectedChksum[0] = buf32[2] - expectedChksum[1] = buf32[3] - - // Read the remaining 2 bytes of the checksum - if _, err = io.ReadFull(lr, expectedChksum[2:]); err != nil { - return err - } - - // Use a tee reader to compute the checksum - // of everything consumed after this point - lr.R = io.TeeReader(lr.R, &chksum) - - // Fill buffer with metadata size, which is what it - // would already contain if there were no magic number / checksum - if _, err = io.ReadFull(lr, buf32); err != nil { - return err - } - } - - // Read metadataSize - metadataSize := binary.BigEndian.Uint32(buf32) - // guard against allocating large buffer - if metadataSize > maxFrameSize { - return fmt.Errorf("frame metadata size (%d) cannot b greater than max frame size (%d)", metadataSize, maxFrameSize) - } - - // Read protobuf encoded metadata - metaBuf := make([]byte, metadataSize) - if _, err = io.ReadFull(lr, metaBuf); err != nil { - return err - } - f.Metadata = new(api.MessageMetadata) - if err = proto.Unmarshal(metaBuf, f.Metadata); err != nil { - return err - } - - // Anything left in the frame is considered - // the payload and can be any sequence of bytes. - if lr.N > 0 { - // guard against allocating large buffer - if lr.N > maxFrameSize { - return fmt.Errorf("frame payload size (%d) cannot be greater than max frame size (%d)", lr.N, maxFrameSize) - } - f.Payload = make([]byte, lr.N) - if _, err = io.ReadFull(lr, f.Payload); err != nil { - return err - } - } - - if computed := chksum.compute(); !bytes.Equal(computed, expectedChksum) { - return fmt.Errorf("checksum mismatch: computed (0x%X) does not match given checksum (0x%X)", computed, expectedChksum) - } - - return nil -} - -// Encode writes the pulsar binary protocol encoded -// frame into w. -func (f *Frame) Encode(w io.Writer) error { - // encode baseCommand - encodedBaseCmd, err := proto.Marshal(f.BaseCmd) - if err != nil { - return err - } - cmdSize := uint32(len(encodedBaseCmd)) - - var metadataSize uint32 - var encodedMetadata []byte - // Check if this is a "simple" command, ie - // no metadata nor payload - if f.Metadata != nil { - if encodedMetadata, err = proto.Marshal(f.Metadata); err != nil { - return err - } - metadataSize = uint32(len(encodedMetadata)) - } - - // - // | totalSize (4) | cmdSize (4) | cmd (...) | magic+checksum (6) | metadataSize (4) | metadata (...) | payload (...) | - // - totalSize := cmdSize + 4 - if metadataSize > 0 { - totalSize += 6 + metadataSize + 4 + uint32(len(f.Payload)) - } - - if frameSize := totalSize + 4; frameSize > maxFrameSize { - return fmt.Errorf("encoded frame size (%d bytes) is larger than max allowed frame size (%d bytes)", frameSize, maxFrameSize) - } - - // write totalSize - if err = binary.Write(w, binary.BigEndian, totalSize); err != nil { - return err - } - - // write cmdSize - if err = binary.Write(w, binary.BigEndian, cmdSize); err != nil { - return err - } - - // write baseCommand - buf := bytes.NewReader(encodedBaseCmd) - if _, err = io.Copy(w, buf); err != nil { - return err - } - - if metadataSize == 0 { - // this is a "simple" command - // (no metadata, payload) - return nil - } - - // write magic number to indicate that a checksum follows - buf.Reset(magicNumber[:]) - if _, err = io.Copy(w, buf); err != nil { - return err - } - - // build checksum - var chksum frameChecksum - if err = binary.Write(&chksum, binary.BigEndian, metadataSize); err != nil { - return err - } - if _, err = chksum.Write(encodedMetadata); err != nil { - return err - } - if _, err = chksum.Write(f.Payload); err != nil { - return err - } - - // write checksum - buf.Reset(chksum.compute()) - if _, err = io.Copy(w, buf); err != nil { - return err - } - - // write metadataSize - if err = binary.Write(w, binary.BigEndian, metadataSize); err != nil { - return err - } - - // write metadata - buf.Reset(encodedMetadata) - if _, err = io.Copy(w, buf); err != nil { - return err - } - - // write payload - buf.Reset(f.Payload) - _, err = io.Copy(w, buf) - return err -} - -// crc32cTbl holds the precomputed crc32 hash table -// used by Pulsar (crc32c) -var crc32cTbl = crc32.MakeTable(crc32.Castagnoli) - -// frameChecksum handles computing the Frame checksum, both -// when decoding and encoding. The empty value is valid and -// represents no checksum. It is not thread-safe. -type frameChecksum struct { - h hash.Hash32 -} - -// Write updates the hash with given bytes. -func (f *frameChecksum) Write(p []byte) (int, error) { - if f.h == nil { - f.h = crc32.New(crc32cTbl) - } - return f.h.Write(p) -} - -// compute returns the computed checksum. If nothing -// was written to the checksum, nil is returned. -func (f *frameChecksum) compute() []byte { - if f.h == nil { - return nil - } - return f.h.Sum(nil) -} diff --git a/pulsartest/server.go b/pulsartest/server.go index b46f849..64f92aa 100644 --- a/pulsartest/server.go +++ b/pulsartest/server.go @@ -23,6 +23,7 @@ import ( "sync" "github.com/Comcast/pulsar-client-go/api" + "github.com/Comcast/pulsar-client-go/frame" "github.com/golang/protobuf/proto" ) @@ -45,7 +46,7 @@ func NewServer(ctx context.Context) (*Server, error) { // buffer a reasonable (and arbitrary) amount // so that all frames received during a single test // will be buffered. - received := make(chan Frame, 128) + received := make(chan frame.Frame, 128) srv := Server{ Addr: fmt.Sprintf("pulsar://%s", l.Addr().String()), @@ -87,7 +88,7 @@ func NewServer(ctx context.Context) (*Server, error) { }() for { - var f Frame + var f frame.Frame if err := f.Decode(c); err != nil { return } @@ -115,7 +116,7 @@ func NewServer(ctx context.Context) (*Server, error) { // Server emulates a Pulsar server type Server struct { Addr string - Received <-chan Frame + Received <-chan frame.Frame trmu sync.Mutex topicLookupResps map[string]topicLookupResp // map of topic -> topicLookupResp @@ -192,7 +193,7 @@ func (m *Server) TotalNumConns() int { } // Broadcast sends the given frame to all connected clients. -func (m *Server) Broadcast(f Frame) error { +func (m *Server) Broadcast(f frame.Frame) error { var b bytes.Buffer if err := f.Encode(&b); err != nil { return err @@ -226,7 +227,7 @@ func (m *Server) CloseAll() error { return nil } -func (m *Server) handleFrame(f Frame, remoteAddr string) *Frame { +func (m *Server) handleFrame(f frame.Frame, remoteAddr string) *frame.Frame { msgType := f.BaseCmd.GetType() switch msgType { @@ -240,7 +241,7 @@ func (m *Server) handleFrame(f Frame, remoteAddr string) *Frame { return nil } - return &Frame{ + return &frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_CONNECTED.Enum(), Connected: &api.CommandConnected{ @@ -259,7 +260,7 @@ func (m *Server) handleFrame(f Frame, remoteAddr string) *Frame { return nil } - return &Frame{ + return &frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_PONG.Enum(), Pong: &api.CommandPong{}, @@ -283,7 +284,7 @@ func (m *Server) handleFrame(f Frame, remoteAddr string) *Frame { } m.trmu.Unlock() - return &Frame{ + return &frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_LOOKUP_RESPONSE.Enum(), LookupTopicResponse: &api.CommandLookupTopicResponse{ @@ -304,7 +305,7 @@ func (m *Server) handleFrame(f Frame, remoteAddr string) *Frame { // allow Producers to be created case api.BaseCommand_PRODUCER: - return &Frame{ + return &frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_PRODUCER_SUCCESS.Enum(), ProducerSuccess: &api.CommandProducerSuccess{ @@ -316,7 +317,7 @@ func (m *Server) handleFrame(f Frame, remoteAddr string) *Frame { // allow Consumers to be created case api.BaseCommand_SUBSCRIBE: - return &Frame{ + return &frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_SUCCESS.Enum(), Success: &api.CommandSuccess{ @@ -326,7 +327,7 @@ func (m *Server) handleFrame(f Frame, remoteAddr string) *Frame { } case api.BaseCommand_SEND: - return &Frame{ + return &frame.Frame{ BaseCmd: &api.BaseCommand{ Type: api.BaseCommand_SEND_RECEIPT.Enum(), SendReceipt: &api.CommandSendReceipt{ diff --git a/subscriptions.go b/subscriptions.go index 94c0d22..f8ca2f7 100644 --- a/subscriptions.go +++ b/subscriptions.go @@ -15,6 +15,8 @@ package pulsar import ( "sync" + + "github.com/Comcast/pulsar-client-go/frame" ) // newSubscriptions returns a ready-to-use subscriptions. @@ -47,7 +49,7 @@ func (s *subscriptions) delConsumer(c *Consumer) { s.cmu.Unlock() } -func (s *subscriptions) handleCloseConsumer(consumerID uint64, f Frame) error { +func (s *subscriptions) handleCloseConsumer(consumerID uint64, f frame.Frame) error { s.cmu.Lock() defer s.cmu.Unlock() @@ -61,7 +63,7 @@ func (s *subscriptions) handleCloseConsumer(consumerID uint64, f Frame) error { return c.handleCloseConsumer(f) } -func (s *subscriptions) handleReachedEndOfTopic(consumerID uint64, f Frame) error { +func (s *subscriptions) handleReachedEndOfTopic(consumerID uint64, f frame.Frame) error { s.cmu.Lock() defer s.cmu.Unlock() @@ -73,7 +75,7 @@ func (s *subscriptions) handleReachedEndOfTopic(consumerID uint64, f Frame) erro return c.handleReachedEndOfTopic(f) } -func (s *subscriptions) handleMessage(consumerID uint64, f Frame) error { +func (s *subscriptions) handleMessage(consumerID uint64, f frame.Frame) error { s.cmu.RLock() c, ok := s.consumers[consumerID] s.cmu.RUnlock() @@ -97,7 +99,7 @@ func (s *subscriptions) delProducer(p *Producer) { s.pmu.Unlock() } -func (s *subscriptions) handleCloseProducer(producerID uint64, f Frame) error { +func (s *subscriptions) handleCloseProducer(producerID uint64, f frame.Frame) error { s.pmu.Lock() defer s.pmu.Unlock() diff --git a/util_test.go b/util_test.go index 69a597d..e645d9c 100644 --- a/util_test.go +++ b/util_test.go @@ -14,11 +14,7 @@ package pulsar import ( - "bufio" - "encoding/hex" - "fmt" "math/rand" - "strings" "sync" "time" ) @@ -27,55 +23,6 @@ import ( // helper functions // ################ -// hexUndump is the inverse of hex.Dump. It massages the output of hex.Dump -// into a hex string, and then runs hex.DecodeString over it. -// Example line: -// 00000000 00 00 00 19 00 00 00 15 08 03 1a 11 0a 0d 50 75 |..............Pu| -// 8<------------------------------------------------>58 -// It is also capable of decoding BSD's hexdump output. - -func hexUndump(h string) []byte { - var out string - - s := bufio.NewScanner(strings.NewReader(strings.TrimSpace(h))) - for s.Scan() { - line := s.Text() - if len(line) == 0 { - panic(fmt.Sprintf("invalid hex line: %q", line)) - } - // first starting delimiter, which is the first space which separates the - // offset from the hex encoded data - firstSpace := strings.IndexRune(line, ' ') - if firstSpace == -1 { - panic(fmt.Sprintf("invalid hex line: %q", line)) - } - line = line[firstSpace:] - // possible ending delimiter, which is the start of ASCII representation - // of the data - if ascii := strings.IndexRune(line, '|'); ascii > 0 { - line = line[:ascii] - } - - // remove spaces between hex numbers - line = strings.Replace(line, " ", "", -1) - - end := 32 - if len(line) < end { - end = len(line) - } - out += line[:end] - } - if err := s.Err(); err != nil { - panic(err) - } - - b, err := hex.DecodeString(out) - if err != nil { - panic(err) - } - return b -} - var ( randStringChars = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") randStringMu = new(sync.Mutex) //protects randStringRand, which isn't threadsafe