Skip to content

Commit

Permalink
pool: new pool interface, change to the API
Browse files Browse the repository at this point in the history
  • Loading branch information
fatih committed Jul 18, 2014
1 parent 90610e3 commit 4b32ef8
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 129 deletions.
127 changes: 127 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
package pool

import (
"errors"
"fmt"
"net"
"sync"
)

// Pool allows you to use a pool of net.Conn connections.
type ChannelPool struct {
// storage for our net.Conn connections
mu sync.Mutex
conns chan net.Conn

// net.Conn generator
factory Factory
}

// Factory is a function to create new connections.
type Factory func() (net.Conn, error)

// NewChannelPool returns a new pool based on buffered channels with an initial
// capacity and maximum capacity. Factory is used when initial capacity is
// greater than zero to fill the pool.
func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
if initialCap <= 0 || maxCap <= 0 || initialCap > maxCap {
return nil, errors.New("invalid capacity settings")
}

c := &ChannelPool{
conns: make(chan net.Conn, maxCap),
factory: factory,
}

// create initial connections, if something goes wrong,
// just close the pool error out.
for i := 0; i < initialCap; i++ {
conn, err := factory()
if err != nil {
c.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- conn
}

return c, nil
}

func (c *ChannelPool) getConns() chan net.Conn {
c.mu.Lock()
conns := c.conns
c.mu.Unlock()
return conns
}

// Get returns a new connection from the pool. After using the connection it
// should be put back via the Put() method. If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
func (c *ChannelPool) Get() (net.Conn, error) {
conns := c.getConns()
if conns == nil {
return nil, ErrPoolClosed
}

select {
case conn := <-conns:
if conn == nil {
return nil, ErrPoolClosed
}
return conn, nil
default:
return c.factory()
}
}

// Put puts an existing connection into the pool. If the pool is full or
// closed, conn is simply closed. A nil conn will be rejected. Putting into a
// destroyed or full pool will be counted as an error.
func (c *ChannelPool) Put(conn net.Conn) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}

c.mu.Lock()
defer c.mu.Unlock()

if c.conns == nil {
conn.Close()
return ErrPoolClosed
}

select {
case c.conns <- conn:
return nil
default:
conn.Close()
return ErrPoolFull
}
}

// Close closes the pool and all its connections. After Close() the
// pool is no longer usable.
func (c *ChannelPool) Close() {
c.mu.Lock()
conns := c.conns
c.conns = nil
c.factory = nil
c.mu.Unlock()

if conns == nil {
return
}

close(conns)
for conn := range conns {
conn.Close()
}
}

// MaximumCapacity returns the maximum capacity of the pool
func (c *ChannelPool) MaximumCapacity() int { return cap(c.getConns()) }

// CurrentCapacity returns the current capacity of the pool.
func (c *ChannelPool) CurrentCapacity() int { return len(c.getConns()) }
34 changes: 19 additions & 15 deletions pool_test.go → channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ func init() {
}

func TestNew(t *testing.T) {
_, err := newPool()
_, err := newChannelPool()
if err != nil {
t.Errorf("New error: %s", err)
}
}

func TestPool_Get(t *testing.T) {
p, _ := newPool()
p, _ := newChannelPool()
defer p.Close()

_, err := p.Get()
Expand Down Expand Up @@ -70,11 +70,11 @@ func TestPool_Get(t *testing.T) {
}

func TestPool_Put(t *testing.T) {
p, _ := newPool()
p, _ := newChannelPool()
defer p.Close()

for i := 0; i < MaximumCap; i++ {
conn, _ := p.factory()
conn, _ := factory()
p.Put(conn)
}

Expand All @@ -88,7 +88,7 @@ func TestPool_Put(t *testing.T) {
t.Errorf("Put error. A nil conn should be rejected")
}

conn, _ := p.factory()
conn, _ := factory()
err = p.Put(conn) // try to put into a full pool
if err == nil {
t.Errorf("Put error. Put into a full pool should return an error")
Expand All @@ -97,7 +97,7 @@ func TestPool_Put(t *testing.T) {
}

func TestPool_MaximumCapacity(t *testing.T) {
p, _ := newPool()
p, _ := newChannelPool()
defer p.Close()

if p.MaximumCapacity() != MaximumCap {
Expand All @@ -107,7 +107,7 @@ func TestPool_MaximumCapacity(t *testing.T) {
}

func TestPool_UsedCapacity(t *testing.T) {
p, _ := newPool()
p, _ := newChannelPool()
defer p.Close()

if p.CurrentCapacity() != InitialCap {
Expand All @@ -117,17 +117,19 @@ func TestPool_UsedCapacity(t *testing.T) {
}

func TestPool_Close(t *testing.T) {
p, _ := newPool()
conn, _ := p.factory() // to be used with put
p, _ := newChannelPool()
conn, _ := factory() // to be used with put

// now close it and test all cases we are expecting.
p.Close()

if p.conns != nil {
c := p.(*ChannelPool)

if c.conns != nil {
t.Errorf("Close error, conns channel should be nil")
}

if p.factory != nil {
if c.factory != nil {
t.Errorf("Close error, factory should be nil")
}

Expand All @@ -151,7 +153,7 @@ func TestPool_Close(t *testing.T) {
}

func TestPoolConcurrent(t *testing.T) {
p, _ := newPool()
p, _ := newChannelPool()
pipe := make(chan net.Conn, 0)

go func() {
Expand All @@ -173,10 +175,10 @@ func TestPoolConcurrent(t *testing.T) {
}

func TestPoolConcurrent2(t *testing.T) {
p, _ := newPool()
p, _ := newChannelPool()

for i := 0; i < MaximumCap; i++ {
conn, _ := p.factory()
conn, _ := factory()
p.Put(conn)
}

Expand All @@ -189,7 +191,9 @@ func TestPoolConcurrent2(t *testing.T) {
p.Close()
}

func newPool() (*Pool, error) { return New(InitialCap, MaximumCap, factory) }
func newChannelPool() (Pool, error) {
return NewChannelPool(InitialCap, MaximumCap, factory)
}

func simpleTCPServer() {
l, err := net.Listen(network, address)
Expand Down
132 changes: 18 additions & 114 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package pool

import (
"errors"
"fmt"
"net"
"sync"
)

var (
Expand All @@ -17,120 +15,26 @@ var (
ErrPoolFull = errors.New("pool is full")
)

// Factory is a function to create new connections.
type Factory func() (net.Conn, error)
// Pool interface describes a pool implementation.
type Pool interface {
// Get returns a new connection from the pool. After using the connection it
// should be put back via the Put() method. If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
Get() (net.Conn, error)

// Pool allows you to use a pool of net.Conn connections.
type Pool struct {
// storage for our net.Conn connections
mu sync.Mutex
conns chan net.Conn
// Put puts an existing connection into the pool. If the pool is full or
// closed, conn is simply closed. A nil conn will be rejected. Putting into a
// destroyed or full pool will be counted as an error.
Put(conn net.Conn) error

// net.Conn generator
factory Factory
}

// New returns a new pool with an initial capacity and maximum capacity.
// Factory is used when initial capacity is greater than zero to fill the
// pool.
func New(initialCap, maxCap int, factory Factory) (*Pool, error) {
if initialCap <= 0 || maxCap <= 0 || initialCap > maxCap {
return nil, errors.New("invalid capacity settings")
}

p := &Pool{
conns: make(chan net.Conn, maxCap),
factory: factory,
}

// create initial connections, if something goes wrong,
// just close the pool error out.
for i := 0; i < initialCap; i++ {
conn, err := factory()
if err != nil {
p.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
p.conns <- conn
}

return p, nil
}

func (p *Pool) getConns() chan net.Conn {
p.mu.Lock()
conns := p.conns
p.mu.Unlock()
return conns
}
// Close closes the pool and all its connections. After Close() the
// pool is no longer usable.
Close()

// Get returns a new connection from the pool. After using the connection it
// should be put back via the Put() method. If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
func (p *Pool) Get() (net.Conn, error) {
conns := p.getConns()
if conns == nil {
return nil, ErrPoolClosed
}
// MaximumCapacity returns the maximum capacity of the pool
MaximumCapacity() int

select {
case conn := <-conns:
if conn == nil {
return nil, ErrPoolClosed
}
return conn, nil
default:
return p.factory()
}
// CurrentCapacity returns the current capacity of the pool.
CurrentCapacity() int
}

// Put puts an existing connection into the pool. If the pool is full or
// closed, conn is simply closed. A nil conn will be rejected. Putting into a
// destroyed or full pool will be counted as an error.
func (p *Pool) Put(conn net.Conn) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}

p.mu.Lock()
defer p.mu.Unlock()

if p.conns == nil {
conn.Close()
return ErrPoolClosed
}

select {
case p.conns <- conn:
return nil
default:
conn.Close()
return ErrPoolFull
}
}

// Close closes the pool and all its connections. After Close() the
// pool is no longer usable.
func (p *Pool) Close() {
p.mu.Lock()
conns := p.conns
p.conns = nil
p.factory = nil
p.mu.Unlock()

if conns == nil {
return
}

close(conns)
for conn := range conns {
conn.Close()
}
}

// MaximumCapacity returns the maximum capacity of the pool
func (p *Pool) MaximumCapacity() int { return cap(p.getConns()) }

// CurrentCapacity returns the current capacity of the pool.
func (p *Pool) CurrentCapacity() int { return len(p.getConns()) }

0 comments on commit 4b32ef8

Please sign in to comment.