Skip to content

Commit

Permalink
Merge pull request #2 from pdf/ng
Browse files Browse the repository at this point in the history
Breaking changes
  • Loading branch information
zhuyie authored Jan 22, 2017
2 parents 4e26228 + e21c426 commit a8fe896
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 92 deletions.
58 changes: 51 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,55 @@
# bidirpc
[![MIT licensed][1]][2] [![Build Status][3]][4] [![Coverage Statusd][5]][6]

[1]: https://img.shields.io/badge/license-MIT-blue.svg
[2]: LICENSE
[3]: https://travis-ci.org/zhuyie/bidirpc.svg?branch=master
[4]: https://travis-ci.org/zhuyie/bidirpc
[5]: https://codecov.io/gh/zhuyie/bidirpc/branch/master/graph/badge.svg
[6]: https://codecov.io/gh/zhuyie/bidirpc
[![GoDoc][1]][2] [![MIT licensed][3]][4] [![Build Status][5]][6] [![Coverage Statusd][7]][8]

[1]: https://godoc.org/github.com/zhuyie/bidirpc?status.svg
[2]: https://godoc.org/github.com/zhuyie/bidirpc
[3]: https://img.shields.io/badge/license-MIT-blue.svg
[4]: LICENSE
[5]: https://travis-ci.org/zhuyie/bidirpc.svg?branch=master
[6]: https://travis-ci.org/zhuyie/bidirpc
[7]: https://codecov.io/gh/zhuyie/bidirpc/branch/master/graph/badge.svg
[8]: https://codecov.io/gh/zhuyie/bidirpc

bidirpc is a simple bi-direction RPC library.

## Usage

```go

import (
"io"
"log"

"github.com/zhuyie/bidirpc"
)


var conn io.ReadWriteCloser

// Create a registry, and register your available services, Service follows
// net/rpc semantics
registry := bidirpc.NewRegistry()
registry.Register(&Service{})

// TODO: Establish your connection before passing it to the session

// Create a new session
session, err := bidirpc.NewSession(conn, Yin, registry, 0)
if err != nil {
log.Fatal(err)
}
// Clean up session resources
defer func() {
if err := session.Close(); err != nil {
log.Fatal(err)
}
}()

// Start the event loop, this is a blocking call, so place it in a goroutine
// if you need to move on. The call will return when the connection is
// terminated.
if err = session.Serve(); err != nil {
log.Fatal(err)
}
```
24 changes: 17 additions & 7 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,29 @@ func (s *BenchService) EchoString(args StringArgs, reply *StringReply) error {
}

var (
sessionYin *Session
sessionYang *Session
client *rpc.Client
server *rpc.Server
sessionYin *Session
sessionYang *Session
registryYin *Registry
registryYang *Registry
client *rpc.Client
server *rpc.Server
)

func init() {
service := &BenchService{}

connYin, connYang := net.Pipe()
sessionYin, _ = NewSession(connYin, true, 0)
sessionYang, _ = NewSession(connYang, false, 0)
sessionYin.Register(service)
registryYin = NewRegistry()
registryYin.Register(service)
registryYang = NewRegistry()
sessionYin, _ = NewSession(connYin, Yin, registryYin, 0)
sessionYang, _ = NewSession(connYang, Yang, registryYang, 0)
go func() {
_ = sessionYin.Serve()
}()
go func() {
_ = sessionYang.Serve()
}()

connServer, connClient := net.Pipe()
client = rpc.NewClient(connClient)
Expand Down
33 changes: 33 additions & 0 deletions registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package bidirpc

import "net/rpc"

// Registry provides the available RPC receivers
type Registry struct {
server *rpc.Server
}

// Register publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
// - exported method of exported type
// - two arguments, both of exported type
// - the second argument is a pointer
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
// The client accesses each method using a string of the form "Type.Method",
// where Type is the receiver's concrete type.
func (r *Registry) Register(rcvr interface{}) error {
return r.server.Register(rcvr)
}

// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (r *Registry) RegisterName(name string, rcvr interface{}) error {
return r.server.RegisterName(name, rcvr)
}

// NewRegistry instantiates a Registry
func NewRegistry() *Registry {
return &Registry{server: rpc.NewServer()}
}
99 changes: 49 additions & 50 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ import (
"sync"
)

// YinYang is used to determine which side of the connection the client is
// handling
type YinYang byte

var (
streamTypeYin byte = 1
streamTypeYang byte = 2
// Yin connection identifier
Yin YinYang = 1
// Yang connection identifier
Yang YinYang = 2
)

const (
Expand All @@ -20,23 +26,23 @@ const (
// Session is a bi-direction RPC connection.
type Session struct {
conn io.ReadWriteCloser
yinOrYang bool
yinOrYang YinYang
writeLock sync.Mutex
bp *bufferPool

streamYin *stream
streamYang *stream

client *rpc.Client
server *rpc.Server
client *rpc.Client
registry *Registry

closeLock sync.Mutex
closed bool
closedC chan struct{}
}

// NewSession creates a new session.
func NewSession(conn io.ReadWriteCloser, yinOrYang bool, bufferPoolSize int) (*Session, error) {
func NewSession(conn io.ReadWriteCloser, yinOrYang YinYang, registry *Registry, bufferPoolSize int) (*Session, error) {
if bufferPoolSize == 0 {
bufferPoolSize = defaultBufferPoolSize
}
Expand All @@ -47,45 +53,34 @@ func NewSession(conn io.ReadWriteCloser, yinOrYang bool, bufferPoolSize int) (*S
closedC: make(chan struct{}),
}

s.streamYin = newStream(s, streamTypeYin)
s.streamYang = newStream(s, streamTypeYang)
s.streamYin = newStream(s, byte(Yin))
s.streamYang = newStream(s, byte(Yang))

var cliCodec *clientCodec
var svrCodec *serverCodec
if yinOrYang {
if yinOrYang == Yin {
cliCodec = newClientCodec(s.streamYin)
svrCodec = newServerCodec(s.streamYang)
} else {
cliCodec = newClientCodec(s.streamYang)
svrCodec = newServerCodec(s.streamYin)
}
s.client = rpc.NewClientWithCodec(cliCodec)
s.server = rpc.NewServer()
s.registry = registry

go s.server.ServeCodec(svrCodec)
go s.readLoop()
go s.registry.server.ServeCodec(svrCodec)

return s, nil
}

// Register publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
// - exported method of exported type
// - two arguments, both of exported type
// - the second argument is a pointer
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
// The client accesses each method using a string of the form "Type.Method",
// where Type is the receiver's concrete type.
func (s *Session) Register(rcvr interface{}) error {
return s.server.Register(rcvr)
}
// Serve starts the event loop, this is a blocking call.
func (s *Session) Serve() error {
err := s.readLoop()
if err != nil && err != io.ErrClosedPipe && err != io.EOF {
return err
}

// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (s *Session) RegisterName(name string, rcvr interface{}) error {
return s.server.RegisterName(name, rcvr)
return nil
}

// Go invokes the function asynchronously. It returns the Call structure representing
Expand All @@ -103,29 +98,29 @@ func (s *Session) Call(serviceMethod string, args interface{}, reply interface{}

// Close closes the session.
func (s *Session) Close() error {
s.doClose(nil)
return nil
return s.doClose()
}

func (s *Session) readLoop() {
func (s *Session) readLoop() error {
var err error
var header [4]byte
var streamType byte
var bodyLen int
reader := io.LimitedReader{R: s.conn}
defer func() {
// Swallow the close error
_ = s.doClose()
}()

loop:
for {
_, err = io.ReadFull(s.conn, header[:])
if err != nil {
s.doClose(fmt.Errorf("read header error: %v", err))
break loop
return err
}

streamType, bodyLen = decodeHeader(header[:])
if (streamType != streamTypeYin && streamType != streamTypeYang) || (bodyLen <= 0) {
s.doClose(fmt.Errorf("read a invalid header"))
break loop
if (YinYang(streamType) != Yin && YinYang(streamType) != Yang) || (bodyLen <= 0) {
return fmt.Errorf("read a invalid header")
}

body := s.bp.Get()
Expand All @@ -134,20 +129,19 @@ loop:
_, err = io.Copy(body, &reader)
if err != nil {
s.bp.Put(body)
s.doClose(fmt.Errorf("read body error: %v", err))
break loop
return err
}

var inC *chan *bytes.Buffer
switch streamType {
case streamTypeYin:
switch YinYang(streamType) {
case Yin:
inC = &s.streamYin.inC
case streamTypeYang:
case Yang:
inC = &s.streamYang.inC
}
select {
case <-s.closedC:
break loop
return nil
case *inC <- body:
// do nothing
}
Expand All @@ -160,22 +154,27 @@ func (s *Session) write(bytes []byte) error {

_, err := s.conn.Write(bytes)
if err != nil {
s.doClose(fmt.Errorf("write error: %v", err))
if closeErr := s.doClose(); closeErr != nil {
return closeErr
}
}
return err
}

func (s *Session) doClose(err error) {
func (s *Session) doClose() error {
s.closeLock.Lock()
defer s.closeLock.Unlock()

if s.closed {
return
return nil
}
s.closed = true

//fmt.Printf("Session.doClose err=%v\n", err)
close(s.closedC)
s.conn.Close()
s.client.Close()
connErr := s.conn.Close()
err := s.client.Close()
if connErr != nil {
return connErr
}
return err
}
Loading

0 comments on commit a8fe896

Please sign in to comment.