Skip to content

Commit

Permalink
Cleanup byte slice pool implementation. (lsds#122)
Browse files Browse the repository at this point in the history
* better structur.

* code cleanup.

* remove unused c pool.

* further code cleanup.
  • Loading branch information
luomai authored Sep 30, 2019
1 parent 8699efd commit f894336
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 193 deletions.
44 changes: 29 additions & 15 deletions srcs/go/rchannel/byte_slice_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,55 @@ import (
"sync"
)

// Reuse pool: chunk size -> pool.
var buffers = map[uint32]*sync.Pool{}
var mu sync.Mutex
// ByteSlicePool reuse byte slices: chunk size -> pool.
type ByteSlicePool struct {
sync.Mutex
buffers map[uint32]*sync.Pool
}

const minBufSize uint32 = 512 // Minimum chunk size that is reused, reusing chunks too small will result in overhead.

var (
defaultPool = newByteSlicePool()
GetBuf = defaultPool.GetBuf
PutBuf = defaultPool.PutBuf
)

// newByteSlicePool create a byte slice pool
func newByteSlicePool() *ByteSlicePool {
return &ByteSlicePool{
buffers: make(map[uint32]*sync.Pool),
}
}

// PutBuf puts a chunk to reuse pool if it can be reused.
func PutBuf(buf []byte) {
func (p *ByteSlicePool) PutBuf(buf []byte) {
size := uint32(cap(buf))
if size < minBufSize {
return
}
mu.Lock()
defer mu.Unlock()
if c := buffers[size]; c != nil {
p.Lock()
defer p.Unlock()
if c := p.buffers[size]; c != nil {
c.Put(buf)
}
}

// GetBuf gets a chunk from reuse pool or creates a new one if reuse failed.
func GetBuf(size uint32) []byte {
func (p *ByteSlicePool) GetBuf(size uint32) []byte {
if size < minBufSize {
return make([]byte, size)
}

mu.Lock()
c := buffers[size]
if c == nil {
p.Lock()
c, ok := p.buffers[size]
if !ok {
c = new(sync.Pool)
buffers[size] = c
p.buffers[size] = c
}
mu.Unlock()
p.Unlock()

v := c.Get()
if v != nil {
if v := c.Get(); v != nil {
return v.([]byte)
}

Expand Down
2 changes: 1 addition & 1 deletion srcs/go/rchannel/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (m *Message) ReadFrom(r io.Reader) error {
return err
}
// m.Data = make([]byte, m.Length)
m.Data = GetBuf(m.Length) // Use leaky pool
m.Data = GetBuf(m.Length) // Use memory pool
if err := readN(r, m.Data, int(m.Length)); err != nil {
return err
}
Expand Down
78 changes: 0 additions & 78 deletions srcs/go/rchannel/pool.cpp

This file was deleted.

49 changes: 0 additions & 49 deletions srcs/go/rchannel/pool.go

This file was deleted.

15 changes: 0 additions & 15 deletions srcs/go/rchannel/pool.h

This file was deleted.

72 changes: 37 additions & 35 deletions srcs/go/rchannel/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Router struct {
monitor monitor.Monitor

store *store.VersionedStore
localStore *LocalStore // FIXME: deprecated
localStore *LocalStore // TODO: replaced by verison store

reqMu sync.Mutex
}
Expand All @@ -37,7 +37,7 @@ func NewRouter(self plan.PeerSpec, store *store.VersionedStore) *Router {
connPool: newConnectionPool(), // out-going connections
monitor: monitor.GetMonitor(),
store: store,
localStore: newLocalStore(), // FIXME: deprecated
localStore: newLocalStore(), // TODO: replaced by verison store
}
}

Expand Down Expand Up @@ -166,7 +166,40 @@ func (r *Router) handleCollective(name string, msg *Message, conn net.Conn, remo

func (r *Router) handlePeerToPeerConn(name string, msg *Message, conn net.Conn, remote plan.NetAddr) {
version := string(msg.Data)
if len(version) > 0 {
if len(version) == 0 {
// TODO: Always using the verisoned tensor store.
r.localStore.RLock()
defer r.localStore.RUnlock()

modelBuffer := r.localStore.data[name]
if modelBuffer == nil {
utils.ExitErr(fmt.Errorf("Model buffer[%s] is nil", name))
}

bs := []byte(name)
mh := messageHeader{
NameLength: uint32(len(bs)),
Name: bs,
}

if err := mh.WriteTo(conn); err != nil {
log.Errorf("Could not write variable from store to connection: %s", name)
utils.ExitErr(err)
}

m := Message{
Length: uint32(modelBuffer.Count * modelBuffer.Type.Size()),
Data: modelBuffer.Data,
}

if err := m.WriteTo(conn); err != nil {
log.Errorf("Could not write variable from store to connection: %s", name)
utils.ExitErr(err)
}

r.monitor.Egress(int64(m.Length), remote)
} else {
// NOTE: This part is currently not used by any optimizer.
var blob *store.Blob // FIXME: copy elision
if err := r.store.Get(version, name, &blob); err != nil {
utils.ExitErr(fmt.Errorf("Router.store.Get(%s, %s): %v", version, name, err))
Expand All @@ -188,40 +221,9 @@ func (r *Router) handlePeerToPeerConn(name string, msg *Message, conn net.Conn,
log.Errorf("Could not write variable from store to connection: %s", name)
utils.ExitErr(err)
}
r.monitor.Egress(int64(m.Length), remote)
return
}

// FIXME: deprecated
r.localStore.RLock()
defer r.localStore.RUnlock()

modelBuffer := r.localStore.data[name]
if modelBuffer == nil {
utils.ExitErr(fmt.Errorf("Model buffer[%s] is nil", name))
}

bs := []byte(name)
mh := messageHeader{
NameLength: uint32(len(bs)),
Name: bs,
}

if err := mh.WriteTo(conn); err != nil {
log.Errorf("Could not write variable from store to connection: %s", name)
utils.ExitErr(err)
}

m := Message{
Length: uint32(modelBuffer.Count * modelBuffer.Type.Size()),
Data: modelBuffer.Data,
}

if err := m.WriteTo(conn); err != nil {
log.Errorf("Could not write variable from store to connection: %s", name)
utils.ExitErr(err)
r.monitor.Egress(int64(m.Length), remote)
}
r.monitor.Egress(int64(m.Length), remote)
}

func (r *Router) Save(name string, model *kb.Buffer) error {
Expand Down

0 comments on commit f894336

Please sign in to comment.