Skip to content

Commit

Permalink
Extract Frame into own package (Comcast#11)
Browse files Browse the repository at this point in the history
Moves all frame related code into its own package (`frame`).

This was done since `frame.go` was duplicated in the `puslartest` package.
This removes that duplication, since now both `pulsar` and `pulsartest` packages can import `frame` with cyclic dependencies.
  • Loading branch information
awilliams authored Jul 27, 2018
1 parent 6501730 commit b33bb3f
Show file tree
Hide file tree
Showing 32 changed files with 251 additions and 549 deletions.
5 changes: 3 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/Comcast/pulsar-client-go/api"
"github.com/Comcast/pulsar-client-go/frame"
)

const (
Expand Down Expand Up @@ -103,7 +104,7 @@ func NewClient(cfg ClientConfig) (*Client, error) {
pubsub: newPubsub(cnx, dispatcher, subs, &reqID),
}

handler := func(f Frame) {
handler := func(f frame.Frame) {
// All message types can be handled in
// parallel, since their ordering should not matter
go c.handleFrame(f)
Expand Down Expand Up @@ -242,7 +243,7 @@ func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionNam

// handleFrame is called by the underlaying conn with
// all received Frames.
func (c *Client) handleFrame(f Frame) {
func (c *Client) handleFrame(f frame.Frame) {
var err error

msgType := f.BaseCmd.GetType()
Expand Down
4 changes: 3 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"io"
"testing"
"time"

"github.com/Comcast/pulsar-client-go/frame"
)

func TestClient_ServerInitiatedClose(t *testing.T) {
Expand Down Expand Up @@ -98,7 +100,7 @@ func TestClient_ClientInitiatedClose(t *testing.T) {
// send read errors to srvConnReadErr chan
srvConnReadErr := make(chan error, 1)
go func() {
srvConnReadErr <- srvConn.read(func(f Frame) {})
srvConnReadErr <- srvConn.read(func(f frame.Frame) {})
}()

// terminate the connection from the client's end
Expand Down
13 changes: 7 additions & 6 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/Comcast/pulsar-client-go/api"
"github.com/Comcast/pulsar-client-go/frame"
)

// newTCPConn creates a conn using a TCPv4 connection to the given
Expand Down Expand Up @@ -112,9 +113,9 @@ func (c *conn) closed() <-chan struct{} {
// will close the connection. Also if close() is called,
// read() will unblock. Once read returns, the conn should
// be considered unusable.
func (c *conn) read(frameHandler func(f Frame)) error {
func (c *conn) read(frameHandler func(f frame.Frame)) error {
for {
var f Frame
var f frame.Frame
if err := f.Decode(c.rc); err != nil {
// It's very possible that the connection is already closed at this
// point, since any connection closed errors would bubble up
Expand All @@ -131,15 +132,15 @@ func (c *conn) read(frameHandler func(f Frame)) error {
// sendSimpleCmd writes a "simple" frame to the wire. It
// is safe to use concurrently.
func (c *conn) sendSimpleCmd(cmd api.BaseCommand) error {
return c.writeFrame(&Frame{
return c.writeFrame(&frame.Frame{
BaseCmd: &cmd,
})
}

// sendPayloadCmd writes a "payload" frame to the wire. It
// is safe to use concurrently.
func (c *conn) sendPayloadCmd(cmd api.BaseCommand, metadata api.MessageMetadata, payload []byte) error {
return c.writeFrame(&Frame{
return c.writeFrame(&frame.Frame{
BaseCmd: &cmd,
Metadata: &metadata,
Payload: payload,
Expand All @@ -148,13 +149,13 @@ func (c *conn) sendPayloadCmd(cmd api.BaseCommand, metadata api.MessageMetadata,

var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, maxFrameSize))
return bytes.NewBuffer(make([]byte, 0, frame.MaxFrameSize))
},
}

// writeFrame encodes the given frame and writes
// it to the wire in a thread-safe manner.
func (c *conn) writeFrame(f *Frame) error {
func (c *conn) writeFrame(f *frame.Frame) error {
b := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(b)
b.Reset()
Expand Down
5 changes: 3 additions & 2 deletions conn_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/Comcast/pulsar-client-go/api"
"github.com/Comcast/pulsar-client-go/frame"
"github.com/golang/protobuf/proto"
)

Expand All @@ -31,10 +32,10 @@ func TestConn_Int_Connect(t *testing.T) {
t.Fatal(err)
}

responses := make(chan Frame)
responses := make(chan frame.Frame)
readErr := make(chan error, 1)
go func() {
readErr <- c.read(func(f Frame) {
readErr <- c.read(func(f frame.Frame) {
responses <- f
})
}()
Expand Down
59 changes: 30 additions & 29 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/Comcast/pulsar-client-go/api"
"github.com/Comcast/pulsar-client-go/frame"
"github.com/golang/protobuf/proto"
)

Expand All @@ -43,7 +44,7 @@ func (m *mockReadCloser) Close() error {
}

func TestConn_Read(t *testing.T) {
f := Frame{
f := frame.Frame{
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_CONNECTED.Enum(),
Connected: &api.CommandConnected{
Expand All @@ -65,8 +66,8 @@ func TestConn_Read(t *testing.T) {
closedc: make(chan struct{}),
}

var gotFrames []Frame
handler := func(f Frame) { gotFrames = append(gotFrames, f) }
var gotFrames []frame.Frame
handler := func(f frame.Frame) { gotFrames = append(gotFrames, f) }
// read should read the frame, then reach
// and return EOF
if err := c.read(handler); err != io.EOF {
Expand All @@ -93,7 +94,7 @@ func TestConn_Close(t *testing.T) {
}

// no-op
handler := func(f Frame) {}
handler := func(f frame.Frame) {}

// read should reach and return EOF
err := c.read(handler)
Expand All @@ -119,8 +120,8 @@ func TestConn_GarbageInput(t *testing.T) {
closedc: make(chan struct{}),
}

var gotFrames []Frame
handler := func(f Frame) {
var gotFrames []frame.Frame
handler := func(f frame.Frame) {
gotFrames = append(gotFrames, f)
}

Expand All @@ -142,7 +143,7 @@ func TestConn_GarbageInput(t *testing.T) {
}

func TestConn_TimeoutReader(t *testing.T) {
f := Frame{
f := frame.Frame{
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_CONNECTED.Enum(),
Connected: &api.CommandConnected{
Expand All @@ -167,8 +168,8 @@ func TestConn_TimeoutReader(t *testing.T) {
closedc: make(chan struct{}),
}

var gotFrames []Frame
handler := func(f Frame) {
var gotFrames []frame.Frame
handler := func(f frame.Frame) {
gotFrames = append(gotFrames, f)
}

Expand All @@ -186,7 +187,7 @@ func TestConn_TimeoutReader(t *testing.T) {
}

func TestConn_Read_SlowSrc(t *testing.T) {
f := Frame{
f := frame.Frame{
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_CONNECTED.Enum(),
Connected: &api.CommandConnected{
Expand All @@ -210,8 +211,8 @@ func TestConn_Read_SlowSrc(t *testing.T) {
closedc: make(chan struct{}),
}

var gotFrames []Frame
handler := func(f Frame) {
var gotFrames []frame.Frame
handler := func(f frame.Frame) {
gotFrames = append(gotFrames, f)
}
// read should read the frame, then reach
Expand All @@ -235,9 +236,9 @@ func TestConn_Read_MutliFrame(t *testing.T) {
N := 16

// create input frames
frames := make([]Frame, N)
frames := make([]frame.Frame, N)
for i := range frames {
frames[i] = Frame{
frames[i] = frame.Frame{
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_MESSAGE.Enum(),
Message: &api.CommandMessage{
Expand Down Expand Up @@ -272,8 +273,8 @@ func TestConn_Read_MutliFrame(t *testing.T) {
closedc: make(chan struct{}),
}

var gotFrames []Frame
handler := func(f Frame) { gotFrames = append(gotFrames, f) }
var gotFrames []frame.Frame
handler := func(f frame.Frame) { gotFrames = append(gotFrames, f) }
// read should read the frames, then reach
// and return EOF
if err := c.read(handler); err != io.EOF {
Expand All @@ -299,10 +300,10 @@ func TestConn_writeFrame(t *testing.T) {
// mapping of frame payload to frame.
// Since they will be written in an undetermined ordered,
// this helps look them up and match them.
frames := make([]Frame, N)
frames := make([]frame.Frame, N)
for i := 0; i < N; i++ {
payload := fmt.Sprintf("%02d - test message", i) // test expects that payload as string sorts properly
frames[i] = Frame{
frames[i] = frame.Frame{
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_MESSAGE.Enum(),
Message: &api.CommandMessage{
Expand Down Expand Up @@ -346,8 +347,8 @@ func TestConn_writeFrame(t *testing.T) {
}
})

var gotFrames []Frame
handler := func(f Frame) { gotFrames = append(gotFrames, f) }
var gotFrames []frame.Frame
handler := func(f frame.Frame) { gotFrames = append(gotFrames, f) }
// read the encoded frames, which the handler
// will store in `gotFrames`.
if err := c.read(handler); err != io.EOF {
Expand All @@ -372,7 +373,7 @@ func TestConn_writeFrame(t *testing.T) {
}

func TestConn_TCP_Read(t *testing.T) {
testFrames := map[string]Frame{
testFrames := map[string]frame.Frame{
"ping": {
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_PING.Enum(),
Expand Down Expand Up @@ -430,10 +431,10 @@ func TestConn_TCP_Read(t *testing.T) {
}

// start reading frames off the conn
received := make(chan Frame, len(testFrames))
received := make(chan frame.Frame, len(testFrames))
readErr := make(chan error, 1)
go func() {
readErr <- c.read(func(f Frame) { received <- f })
readErr <- c.read(func(f frame.Frame) { received <- f })
}()

// send frames from the Pulsar server to the conn, and
Expand All @@ -460,7 +461,7 @@ func TestConn_TCP_Read(t *testing.T) {
}

func TestConn_TCP_Write(t *testing.T) {
testFrames := map[string]Frame{
testFrames := map[string]frame.Frame{
"ping": {
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_PING.Enum(),
Expand Down Expand Up @@ -518,10 +519,10 @@ func TestConn_TCP_Write(t *testing.T) {
t.Fatal("timeout waiting for server to receive connection")
}

srvReceived := make(chan Frame)
srvReceived := make(chan frame.Frame)
go func() {
defer close(srvReceived)
srvConn.read(func(f Frame) {
srvConn.read(func(f frame.Frame) {
srvReceived <- f
})
}()
Expand Down Expand Up @@ -586,14 +587,14 @@ func TestConn_TCP_ReadLocalClose(t *testing.T) {
// send read errors to srvConnReadErr chan
srvConnReadErr := make(chan error, 1)
go func() {
srvConnReadErr <- srvConn.read(func(f Frame) {})
srvConnReadErr <- srvConn.read(func(f frame.Frame) {})
}()

// start reading from the conn.
// send read errors to readErr chan
readErr := make(chan error, 1)
go func() {
readErr <- c.read(func(f Frame) {})
readErr <- c.read(func(f frame.Frame) {})
}()

// close the connection from the local conn's end
Expand Down Expand Up @@ -650,7 +651,7 @@ func TestConn_TCP_ReadRemoteClose(t *testing.T) {
// Send read errors to readErr chan
readErr := make(chan error, 1)
go func() {
readErr <- c.read(func(f Frame) {})
readErr <- c.read(func(f frame.Frame) {})
}()

// server initiated connection closure
Expand Down
9 changes: 5 additions & 4 deletions connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/Comcast/pulsar-client-go/api"
"github.com/Comcast/pulsar-client-go/frame"
"github.com/golang/protobuf/proto"
)

Expand Down Expand Up @@ -48,7 +49,7 @@ func TestConnector(t *testing.T) {
connected := api.CommandConnected{
ServerVersion: proto.String("this is a test"),
}
f := Frame{
f := frame.Frame{
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_CONNECTED.Enum(),
Connected: &connected,
Expand Down Expand Up @@ -115,7 +116,7 @@ func TestConnector_Timeout(t *testing.T) {

// There should be nothing in the dispatcher,
// so `connected` should fail.
f := Frame{
f := frame.Frame{
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_CONNECTED.Enum(),
Connected: &api.CommandConnected{
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestConnector_Error(t *testing.T) {
RequestId: proto.Uint64(undefRequestID),
Message: proto.String("there was an error of sorts"),
}
f := Frame{
f := frame.Frame{
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_ERROR.Enum(),
Error: &errorMsg,
Expand All @@ -183,7 +184,7 @@ func TestConnector_Error(t *testing.T) {

// There should be nothing in the dispatcher,
// so `connected` should fail.
f = Frame{
f = frame.Frame{
BaseCmd: &api.BaseCommand{
Type: api.BaseCommand_CONNECTED.Enum(),
Connected: &api.CommandConnected{
Expand Down
Loading

0 comments on commit b33bb3f

Please sign in to comment.