Skip to content

Commit

Permalink
Write to the underlying conn from the stream directly, avoid the over…
Browse files Browse the repository at this point in the history
…head of the channel sync operations.
  • Loading branch information
zhuyie committed Jan 17, 2017
1 parent af4b7e9 commit 24dd35a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 61 deletions.
61 changes: 22 additions & 39 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
type Session struct {
conn io.ReadWriteCloser
yinOrYang bool
writeLock sync.Mutex

streamYin *stream
streamYang *stream
Expand All @@ -30,33 +31,29 @@ type Session struct {

// NewSession creates a new session.
func NewSession(conn io.ReadWriteCloser, yinOrYang bool) (*Session, error) {
closedC := make(chan struct{})
streamYin := newStream(streamTypeYin, closedC)
streamYang := newStream(streamTypeYang, closedC)
s := &Session{
conn: conn,
yinOrYang: yinOrYang,
closedC: make(chan struct{}),
}

s.streamYin = newStream(s, streamTypeYin)
s.streamYang = newStream(s, streamTypeYang)

var cliCodec *clientCodec
var svrCodec *serverCodec
if yinOrYang {
cliCodec = newClientCodec(streamYin)
svrCodec = newServerCodec(streamYang)
cliCodec = newClientCodec(s.streamYin)
svrCodec = newServerCodec(s.streamYang)
} else {
cliCodec = newClientCodec(streamYang)
svrCodec = newServerCodec(streamYin)
cliCodec = newClientCodec(s.streamYang)
svrCodec = newServerCodec(s.streamYin)
}
s.client = rpc.NewClientWithCodec(cliCodec)
s.server = rpc.NewServer()

s := &Session{
conn: conn,
yinOrYang: yinOrYang,
streamYin: streamYin,
streamYang: streamYang,
client: rpc.NewClientWithCodec(cliCodec),
server: rpc.NewServer(),
closedC: closedC,
}
go s.server.ServeCodec(svrCodec)

go s.readLoop()
go s.writeLoop()

return s, nil
}
Expand Down Expand Up @@ -142,29 +139,15 @@ loop:
}
}

func (s *Session) writeLoop() {
var err error
loop:
for {
select {
case <-s.closedC:
break loop
func (s *Session) write(bytes []byte) error {
s.writeLock.Lock()
defer s.writeLock.Unlock()

case bytes := <-s.streamYin.outC:
_, err = s.conn.Write(bytes)
if err != nil {
s.doClose(fmt.Errorf("write stream error: %v", err))
break loop
}

case bytes := <-s.streamYang.outC:
_, err = s.conn.Write(bytes)
if err != nil {
s.doClose(fmt.Errorf("write stream error: %v", err))
break loop
}
}
_, err := s.conn.Write(bytes)
if err != nil {
s.doClose(fmt.Errorf("write error: %v", err))
}
return err
}

func (s *Session) doClose(err error) {
Expand Down
38 changes: 16 additions & 22 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,26 @@ import (
)

type stream struct {
id byte
sessionClosedC <-chan struct{}
reader *bytes.Buffer
inC chan []byte
writer *bytes.Buffer
outC chan []byte
session *Session
id byte
reader *bytes.Buffer
inC chan []byte
writer bytes.Buffer
}

func newStream(id byte, sessionClosedC <-chan struct{}) *stream {
func newStream(session *Session, id byte) *stream {
s := &stream{
id: id,
sessionClosedC: sessionClosedC,
inC: make(chan []byte),
outC: make(chan []byte),
session: session,
id: id,
inC: make(chan []byte),
}
return s
}

func (s *stream) Read(p []byte) (n int, err error) {
if s.reader == nil || s.reader.Len() == 0 {
select {
case <-s.sessionClosedC:
case <-s.session.closedC:
return 0, errors.New("stream read from a closed session")
case data := <-s.inC: // only the message body
s.reader = bytes.NewBuffer(data)
Expand All @@ -37,24 +35,20 @@ func (s *stream) Read(p []byte) (n int, err error) {
}

func (s *stream) Write(p []byte) (n int, err error) {
if s.writer == nil {
s.writer = bytes.NewBuffer(nil)
if s.writer.Len() == 0 {
var dummyHeader [4]byte
s.writer.Write(dummyHeader[:])
}
return s.writer.Write(p)
}

func (s *stream) flush() error {
func (s *stream) flush() (err error) {
buffer := s.writer.Bytes()
bodyLen := s.writer.Len() - 4
encodeHeader(buffer, s.id, bodyLen)
select {
case <-s.sessionClosedC:
return errors.New("stream flush to a closed session")
case s.outC <- buffer:
}

s.writer = nil
return nil
err = s.session.write(buffer)

s.writer.Reset()
return
}

0 comments on commit 24dd35a

Please sign in to comment.