Skip to content

Commit

Permalink
add support for some of the FourLetter words
Browse files Browse the repository at this point in the history
I have a need for polling the Zookeeper instance with some of the four-letter words. In this PR te following words are implemneted with tests:

* srvr
* cons
* ruok

`srvr` get some stats about the server (connections, latencies, leader/follower mode, etc.)
`cons` gets detailed information about the individual connections and their sessions
`ruok` is actually mostly useless, but specifies whether the server has returned `imok` when prompted with `ruok`

I'm not sold on the name of the public API functions, `FLWCons()`, and would happily change them to something better.
  • Loading branch information
theckman committed Mar 25, 2015
1 parent fa6674a commit c5438a7
Show file tree
Hide file tree
Showing 7 changed files with 784 additions and 0 deletions.
25 changes: 25 additions & 0 deletions zk/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,28 @@ func (t EventType) String() string {
}
return "Unknown"
}

// Mode is used to build custom server modes (leader|follower|standalone).
type Mode uint8

func (m Mode) String() string {
if name := modeNames[m]; name != "" {
return name
}
return "unknown"
}

const (
ModeUnknown Mode = iota
ModeLeader Mode = iota
ModeFollower Mode = iota
ModeStandalone Mode = iota
)

var (
modeNames = map[Mode]string{
ModeLeader: "leader",
ModeFollower: "follower",
ModeStandalone: "standalone",
}
)
24 changes: 24 additions & 0 deletions zk/constants_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package zk

import (
"fmt"
"testing"
)

func TestModeString(t *testing.T) {
if fmt.Sprintf("%v", ModeUnknown) != "unknown" {
t.Errorf("unknown value should be 'unknown'")
}

if fmt.Sprintf("%v", ModeLeader) != "leader" {
t.Errorf("leader value should be 'leader'")
}

if fmt.Sprintf("%v", ModeFollower) != "follower" {
t.Errorf("follower value should be 'follower'")
}

if fmt.Sprintf("%v", ModeStandalone) != "standalone" {
t.Errorf("standlone value should be 'standalone'")
}
}
288 changes: 288 additions & 0 deletions zk/flw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
package zk

import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"math/big"
"net"
"regexp"
"strconv"
"time"
)

// FLWSrvr is a FourLetterWord helper function. In particular, this function pulls the srvr output
// from the zookeeper instances and parses the output. A slice of *ServerStats structs are returned
// as well as a boolean value to indicate whether this function processed successfully.
//
// If the boolean value is false there was a problem. If the *ServerStats slice is empty or nil,
// then the error happened before we started to obtain 'srvr' values. Otherwise, one of the
// servers had an issue and the "Error" value in the struct should be inspected to determine
// which server had the issue.
func FLWSrvr(servers []string, timeout time.Duration) ([]*ServerStats, bool) {
// different parts of the regular expression that are required to parse the srvr output
var (
zrVer = `^Zookeeper version: ([A-Za-z0-9\.\-]+), built on (\d\d/\d\d/\d\d\d\d \d\d:\d\d [A-Za-z0-9:\+\-]+)`
zrLat = `^Latency min/avg/max: (\d+)/(\d+)/(\d+)`
zrNet = `^Received: (\d+).*\n^Sent: (\d+).*\n^Connections: (\d+).*\n^Outstanding: (\d+)`
zrState = `^Zxid: (0x[A-Za-z0-9]+).*\n^Mode: (\w+).*\n^Node count: (\d+)`
)

// build the regex from the pieces above
re, err := regexp.Compile(fmt.Sprintf(`(?m:\A%v.*\n%v.*\n%v.*\n%v)`, zrVer, zrLat, zrNet, zrState))

if err != nil {
return nil, false
}

imOk := true
servers = FormatServers(servers)
ss := make([]*ServerStats, len(servers))

for i := range ss {
response, err := fourLetterWord(servers[i], "srvr", timeout)

if err != nil {
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}

match := re.FindAllStringSubmatch(string(response), -1)[0][1:]

if match == nil {
err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}

// determine current server
var srvrMode Mode
switch match[10] {
case "leader":
srvrMode = ModeLeader
case "follower":
srvrMode = ModeFollower
case "standalone":
srvrMode = ModeStandalone
default:
srvrMode = ModeUnknown
}

buildTime, err := time.Parse("01/02/2006 15:04 MST", match[1])

if err != nil {
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}

parsedInt, err := strconv.ParseInt(match[9], 0, 64)

if err != nil {
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}

// the ZxID value is an int64 with two int32s packed inside
// the high int32 is the epoch (i.e., number of leader elections)
// the low int32 is the counter
epoch := int32(parsedInt >> 32)
counter := int32(parsedInt & 0xFFFFFFFF)

// within the regex above, these values must be numerical
// so we can avoid useless checking of the error return value
minLatency, _ := strconv.ParseInt(match[2], 0, 64)
avgLatency, _ := strconv.ParseInt(match[3], 0, 64)
maxLatency, _ := strconv.ParseInt(match[4], 0, 64)
recv, _ := strconv.ParseInt(match[5], 0, 64)
sent, _ := strconv.ParseInt(match[6], 0, 64)
cons, _ := strconv.ParseInt(match[7], 0, 64)
outs, _ := strconv.ParseInt(match[8], 0, 64)
ncnt, _ := strconv.ParseInt(match[11], 0, 64)

ss[i] = &ServerStats{
Sent: sent,
Received: recv,
NodeCount: ncnt,
MinLatency: minLatency,
AvgLatency: avgLatency,
MaxLatency: maxLatency,
Connections: cons,
Outstanding: outs,
Epoch: epoch,
Counter: counter,
BuildTime: buildTime,
Mode: srvrMode,
Version: match[0],
}
}

return ss, imOk
}

// FLWRuok is a FourLetterWord helper function. In particular, this function
// pulls the ruok output from each server.
func FLWRuok(servers []string, timeout time.Duration) []bool {
servers = FormatServers(servers)
oks := make([]bool, len(servers))

for i := range oks {
response, err := fourLetterWord(servers[i], "ruok", timeout)

if err != nil {
continue
}

if bytes.Equal(response[:4], []byte("imok")) {
oks[i] = true
}
}
return oks
}

// FLWCons is a FourLetterWord helper function. In particular, this function
// pulls the ruok output from each server.
//
// As with FLWSrvr, the boolean value indicates whether one of the requests had
// an issue. The Clients struct has an Error value that can be checked.
func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool) {
var (
zrAddr = `^ /((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?):(?:\d+))\[\d+\]`
zrPac = `\(queued=(\d+),recved=(\d+),sent=(\d+),sid=(0x[A-Za-z0-9]+),lop=(\w+),est=(\d+),to=(\d+),`
zrSesh = `lcxid=(0x[A-Za-z0-9]+),lzxid=(0x[A-Za-z0-9]+),lresp=(\d+),llat=(\d+),minlat=(\d+),avglat=(\d+),maxlat=(\d+)\)`
)

re, err := regexp.Compile(fmt.Sprintf("%v%v%v", zrAddr, zrPac, zrSesh))

if err != nil {
return nil, false
}

servers = FormatServers(servers)
sc := make([]*ServerClients, len(servers))
imOk := true

for i := range sc {
response, err := fourLetterWord(servers[i], "cons", timeout)

if err != nil {
sc[i] = &ServerClients{Error: err}
imOk = false
continue
}

scan := bufio.NewScanner(bytes.NewReader(response))

var clients []*ServerClient

for scan.Scan() {
line := scan.Bytes()

if len(line) == 0 {
continue
}

m := re.FindAllStringSubmatch(string(line), -1)

if m == nil {
err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
sc[i] = &ServerClients{Error: err}
imOk = false
continue
}

match := m[0][1:]

queued, _ := strconv.ParseInt(match[1], 0, 64)
recvd, _ := strconv.ParseInt(match[2], 0, 64)
sent, _ := strconv.ParseInt(match[3], 0, 64)
sid, _ := strconv.ParseInt(match[4], 0, 64)
est, _ := strconv.ParseInt(match[6], 0, 64)
timeout, _ := strconv.ParseInt(match[7], 0, 32)
lresp, _ := strconv.ParseInt(match[10], 0, 64)
llat, _ := strconv.ParseInt(match[11], 0, 32)
minlat, _ := strconv.ParseInt(match[12], 0, 32)
avglat, _ := strconv.ParseInt(match[13], 0, 32)
maxlat, _ := strconv.ParseInt(match[14], 0, 32)

// zookeeper returns a value, '0xffffffffffffffff', as the
// Lzxid for PING requests in the 'cons' output.
// unfortunately, in Go that is an invalid int64 and is not represented
// as -1.
// However, converting the string value to a big.Int and then back to
// and int64 properly sets the value to -1
lzxid, ok := new(big.Int).SetString(match[9], 0)

var errVal error

if !ok {
errVal = fmt.Errorf("failed to convert lzxid value to big.Int")
imOk = false
}

lcxid, ok := new(big.Int).SetString(match[8], 0)

if !ok && errVal == nil {
errVal = fmt.Errorf("failed to convert lcxid value to big.Int")
imOk = false
}

clients = append(clients, &ServerClient{
Queued: queued,
Received: recvd,
Sent: sent,
SessionID: sid,
Lcxid: lcxid.Int64(),
Lzxid: lzxid.Int64(),
Timeout: int32(timeout),
LastLatency: int32(llat),
MinLatency: int32(minlat),
AvgLatency: int32(avglat),
MaxLatency: int32(maxlat),
Established: time.Unix(est, 0),
LastResponse: time.Unix(lresp, 0),
Addr: match[0],
LastOperation: match[5],
Error: errVal,
})
}

sc[i] = &ServerClients{Clients: clients}
}

return sc, imOk
}

func fourLetterWord(server, command string, timeout time.Duration) ([]byte, error) {
conn, err := net.DialTimeout("tcp", server, timeout)

if err != nil {
return nil, err
}

// the zookeeper server should automatically close this socket
// once the command has been processed, but better safe than sorry
defer conn.Close()

conn.SetWriteDeadline(time.Now().Add(timeout))

_, err = conn.Write([]byte(command))

if err != nil {
return nil, err
}

conn.SetReadDeadline(time.Now().Add(timeout))

resp, err := ioutil.ReadAll(conn)

if err != nil {
return nil, err
}

return resp, nil
}
Loading

0 comments on commit c5438a7

Please sign in to comment.