Skip to content

Commit

Permalink
flush_all support
Browse files Browse the repository at this point in the history
  • Loading branch information
craigmj committed Jan 29, 2014
1 parent 82fef77 commit 806ff76
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
32 changes: 30 additions & 2 deletions memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ var (
resultNotFound = []byte("NOT_FOUND\r\n")
resultDeleted = []byte("DELETED\r\n")
resultEnd = []byte("END\r\n")
resultTouched = []byte("TOUCHED\r\n")
resultOk = []byte("OK\r\n")
resultTouched = []byte("TOUCHED\r\n")

resultClientErrorPrefix = []byte("CLIENT_ERROR ")
)
Expand Down Expand Up @@ -299,6 +300,10 @@ func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) e
return nil
}

func (c *Client) FlushAll() error {
return c.selector.Each(c.flushAllFromAddr)
}

// Get gets the item for the given key. ErrCacheMiss is returned for a
// memcache cache miss. The key must be at most 250 bytes in length.
func (c *Client) Get(key string) (item *Item, err error) {
Expand All @@ -318,7 +323,7 @@ func (c *Client) Get(key string) (item *Item, err error) {
func (c *Client) Touch(key string, seconds int32) (err error) {
return c.withKeyAddr(key, func(addr net.Addr) error {
return c.touchFromAddr(addr, []string{key}, seconds)
})
})
}

func (c *Client) withKeyAddr(key string, fn func(net.Addr) error) (err error) {
Expand Down Expand Up @@ -362,6 +367,29 @@ func (c *Client) getFromAddr(addr net.Addr, keys []string, cb func(*Item)) error
})
}

// flushAllFromAddr send the flush_all command to the given addr
func (c *Client) flushAllFromAddr(addr net.Addr) error {
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
if _, err := fmt.Fprintf(rw, "flush_all\r\n"); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
line, err := rw.ReadSlice('\n')
if err != nil {
return err
}
switch {
case bytes.Equal(line, resultOk):
break
default:
return fmt.Errorf("memcache: unexpected response line from flush_all: %q", string(line))
}
return nil
})
}

func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error {
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
for _, key := range keys {
Expand Down
13 changes: 13 additions & 0 deletions memcache/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ServerSelector interface {
// PickServer returns the server address that a given item
// should be shared onto.
PickServer(key string) (net.Addr, error)
Each(func(net.Addr) error) error
}

// ServerList is a simple ServerSelector. Its zero value is usable.
Expand Down Expand Up @@ -72,6 +73,18 @@ func (ss *ServerList) SetServers(servers ...string) error {
return nil
}

// Each iterates over each server calling the given function
func (ss *ServerList) Each(f func(net.Addr) error) error {
ss.lk.RLock()
defer ss.lk.RUnlock()
for _, a := range ss.addrs {
if err := f(a); nil != err {
return err
}
}
return nil
}

func (ss *ServerList) PickServer(key string) (net.Addr, error) {
ss.lk.RLock()
defer ss.lk.RUnlock()
Expand Down

0 comments on commit 806ff76

Please sign in to comment.