Skip to content

Commit

Permalink
Merge pull request #123 from hintjens/master
Browse files Browse the repository at this point in the history
Small incremental improvements
  • Loading branch information
sappo committed Oct 12, 2014
2 parents 2d2d829 + 746dff4 commit fcbebd2
Show file tree
Hide file tree
Showing 19 changed files with 1,548 additions and 44 deletions.
176 changes: 176 additions & 0 deletions include/zproto/example/binary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package example

import (
"bytes"
"encoding/binary"
"errors"
"fmt"

zmq "github.com/pebbe/zmq4"
)

// Deliver a multi-part message.
type Binary struct {
routingId []byte
sequence uint16
Flags [4]byte
PublicKey []byte
Identifier []byte
Address []byte
Content []byte
}

// New creates new Binary message.
func NewBinary() *Binary {
binary := &Binary{}
return binary
}

// String returns print friendly name.
func (b *Binary) String() string {
str := "ZPROTO_EXAMPLE_BINARY:\n"
str += fmt.Sprintf(" sequence = %v\n", b.sequence)
str += fmt.Sprintf(" Flags = %v\n", b.Flags)
str += fmt.Sprintf(" PublicKey = %v\n", b.PublicKey)
str += fmt.Sprintf(" Identifier = %v\n", b.Identifier)
str += fmt.Sprintf(" Address = %v\n", b.Address)
str += fmt.Sprintf(" Content = %v\n", b.Content)
return str
}

// Marshal serializes the message.
func (b *Binary) Marshal() ([]byte, error) {
// Calculate size of serialized data
bufferSize := 2 + 1 // Signature and message ID

// sequence is a 2-byte integer
bufferSize += 2

// Flags is a block of [4]byte
bufferSize += 4

// PublicKey is a block of []byte with one byte length
bufferSize += 1 + len(b.PublicKey)

// Identifier is a block of []byte with one byte length
bufferSize += 1 + len(b.Identifier)

// Now serialize the message
tmpBuf := make([]byte, bufferSize)
tmpBuf = tmpBuf[:0]
buffer := bytes.NewBuffer(tmpBuf)
binary.Write(buffer, binary.BigEndian, Signature)
binary.Write(buffer, binary.BigEndian, BinaryId)

// sequence
binary.Write(buffer, binary.BigEndian, b.sequence)

// Flags
binary.Write(buffer, binary.BigEndian, b.Flags)

// PublicKey
putBytes(buffer, b.PublicKey)

// Identifier
putBytes(buffer, b.Identifier)

return buffer.Bytes(), nil
}

// Unmarshals the message.
func (b *Binary) Unmarshal(frames ...[]byte) error {
if frames == nil {
return errors.New("Can't unmarshal empty message")
}

frame := frames[0]
frames = frames[1:]

buffer := bytes.NewBuffer(frame)

// Get and check protocol signature
var signature uint16
binary.Read(buffer, binary.BigEndian, &signature)
if signature != Signature {
return errors.New("invalid signature")
}

// Get message id and parse per message type
var id uint8
binary.Read(buffer, binary.BigEndian, &id)
if id != BinaryId {
return errors.New("malformed Binary message")
}
// sequence
binary.Read(buffer, binary.BigEndian, &b.sequence)
// Flags
binary.Read(buffer, binary.BigEndian, &b.Flags)
// PublicKey
b.PublicKey = getBytes(buffer)
// Identifier
b.Identifier = getBytes(buffer)
// Address
if 0 <= len(frames)-1 {
b.Address = frames[0]
}
// Content
if 1 <= len(frames)-1 {
b.Content = frames[1]
}

return nil
}

// Sends marshaled data through 0mq socket.
func (b *Binary) Send(socket *zmq.Socket) (err error) {
frame, err := b.Marshal()
if err != nil {
return err
}

socType, err := socket.GetType()
if err != nil {
return err
}

// If we're sending to a ROUTER, we send the routingId first
if socType == zmq.ROUTER {
_, err = socket.SendBytes(b.routingId, zmq.SNDMORE)
if err != nil {
return err
}
}

// Now send the data frame
_, err = socket.SendBytes(frame, zmq.SNDMORE)
if err != nil {
return err
}
// Now send any frame fields, in order
_, err = socket.SendBytes(b.Address, zmq.SNDMORE)
_, err = socket.SendBytes(b.Content, 0)

return err
}

// RoutingId returns the routingId for this message, routingId should be set
// whenever talking to a ROUTER.
func (b *Binary) RoutingId() []byte {
return b.routingId
}

// SetRoutingId sets the routingId for this message, routingId should be set
// whenever talking to a ROUTER.
func (b *Binary) SetRoutingId(routingId []byte) {
b.routingId = routingId
}

// Setsequence sets the sequence.
func (b *Binary) SetSequence(sequence uint16) {
b.sequence = sequence
}

// sequence returns the sequence.
func (b *Binary) Sequence() uint16 {
return b.sequence
}
110 changes: 110 additions & 0 deletions include/zproto/example/binary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package example

import (
"testing"

zmq "github.com/pebbe/zmq4"
)

// Yay! Test function.
func TestBinary(t *testing.T) {

// Create pair of sockets we can send through

// Output socket
output, err := zmq.NewSocket(zmq.DEALER)
if err != nil {
t.Fatal(err)
}
defer output.Close()

routingId := "Shout"
output.SetIdentity(routingId)
err = output.Bind("inproc://selftest-binary")
if err != nil {
t.Fatal(err)
}
defer output.Unbind("inproc://selftest-binary")

// Input socket
input, err := zmq.NewSocket(zmq.ROUTER)
if err != nil {
t.Fatal(err)
}
defer input.Close()

err = input.Connect("inproc://selftest-binary")
if err != nil {
t.Fatal(err)
}
defer input.Disconnect("inproc://selftest-binary")

// Create a Binary message and send it through the wire
binary := NewBinary()

binary.sequence = 123

binary.Flags = [4]byte{'z', 'z', 'z', 'z'}

binary.PublicKey = []byte("Captcha Diem")

binary.Identifier = []byte("Captcha Diem")

binary.Address = []byte("Captcha Diem")

binary.Content = []byte("Captcha Diem")

err = binary.Send(output)
if err != nil {
t.Fatal(err)
}
transit, err := Recv(input)
if err != nil {
t.Fatal(err)
}

tr := transit.(*Binary)

if tr.sequence != 123 {
t.Fatalf("expected %d, got %d", 123, tr.sequence)
}

if len(tr.Flags) != 4 {
t.Fatalf("mismatch octets size for %s", "Flags")
}
for idx, b := range [4]byte{'z', 'z', 'z', 'z'} {
if tr.Flags[idx] != b {
t.Fatalf("mismatch octets value for %s", "Flags")
}
}

if string(tr.PublicKey) != "Captcha Diem" {
t.Fatalf("expected %s, got %s", "Captcha Diem", tr.PublicKey)
}

if string(tr.Identifier) != "Captcha Diem" {
t.Fatalf("expected %s, got %s", "Captcha Diem", tr.Identifier)
}

if string(tr.Address) != "Captcha Diem" {
t.Fatalf("expected %s, got %s", "Captcha Diem", tr.Address)
}

if string(tr.Content) != "Captcha Diem" {
t.Fatalf("expected %s, got %s", "Captcha Diem", tr.Content)
}

err = tr.Send(input)
if err != nil {
t.Fatal(err)
}

transit, err = Recv(output)
if err != nil {
t.Fatal(err)
}

if routingId != string(tr.RoutingId()) {
t.Fatalf("expected %s, got %s", routingId, string(tr.RoutingId()))
}
}
Loading

0 comments on commit fcbebd2

Please sign in to comment.