Skip to content

Commit

Permalink
Fixed iphash algorithm, implemented naive consistent iphash1; added t…
Browse files Browse the repository at this point in the history
…ests yyyar#150
  • Loading branch information
illarion committed Aug 12, 2018
1 parent 80e5219 commit 785a3d8
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ run: build
./bin/$(NAME) -c ./config/${NAME}.toml

test:
@go test test/*.go
@go test -v test/*.go

install: build
install -d ${DESTDIR}/usr/local/bin/
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* **Weight** - select backend from pool based relative weights of backends
* **Roundrobin** - simple elect backend from pool in circular order
* **Iphash** - route client to the same backend based on client ip hash
* **Iphash1** - same as iphash but backend removal consistent (clients remain connecting to the same backend, even if some other backends down)
* **Leastconn** - select backend with least active connections
* **Leastbandwidth** - backends with least bandwidth

Expand Down
2 changes: 1 addition & 1 deletion config/gobetween.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protocol = "udp"
#
#bind = "localhost:3000" # (required) "<host>:<port>"
#protocol = "tcp" # (required) "tcp" | "tls" | "udp"
#balance = "weight" # (optional [weight]) "weight" | "leastconn" | "roundrobin" | "iphash" | "leastbandwidth"
#balance = "weight" # (optional [weight]) "weight" | "leastconn" | "roundrobin" | "iphash" | "iphash1" | "leastbandwidth"
#
#max_connections = 0
#client_idle_timeout = "10m"
Expand Down
22 changes: 7 additions & 15 deletions src/balance/iphash.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
package balance

import (
"errors"
"math"

"../core"
"errors"
"hash/fnv"
)

/**
Expand All @@ -20,8 +19,8 @@ type IphashBalancer struct{}

/**
* Elect backend using iphash strategy
* It's naive impl (most possibly with bad performance) using
* FNV-1a hash (https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function)
* Using fnv1a for speed
*
* TODO: Improve as needed
*/
func (b *IphashBalancer) Elect(context core.Context, backends []*core.Backend) (*core.Backend, error) {
Expand All @@ -30,16 +29,9 @@ func (b *IphashBalancer) Elect(context core.Context, backends []*core.Backend) (
return nil, errors.New("Can't elect backend, Backends empty")
}

ip := context.Ip()

hash := 11
for _, b := range ip {
hash = (hash ^ int(b)) * 13
}

hash = int(math.Floor(math.Mod(float64(hash), float64(len(backends)))))

backend := backends[hash]
hash := fnv.New32a()
hash.Write(context.Ip())
backend := backends[hash.Sum32()%uint32(len(backends))]

return backend, nil
}
51 changes: 51 additions & 0 deletions src/balance/iphash1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* iphash1.go - semi-consistent iphash balance impl
*
* @author Illarion Kovalchuk <[email protected]>
*/

package balance

import (
"../core"
"errors"
"hash/fnv"
)

/**
* Iphash balancer
*/
type Iphash1Balancer struct {
}

/**
* Elect backend using semi-consistent iphash strategy. This is naive implementation
* using Key+Node Hash Algorithm for stable sharding described at http://kennethxu.blogspot.com/2012/11/sharding-algorithm.html
* It survives removing nodes (removing stability), so that clients connected to backends that have not been removed stay
* untouched.
*
*/
func (b *Iphash1Balancer) Elect(context core.Context, backends []*core.Backend) (*core.Backend, error) {

if len(backends) == 0 {
return nil, errors.New("Can't elect backend, Backends empty")
}

var result *core.Backend
{
var bestHash uint32

for i, backend := range backends {
hasher := fnv.New32a()
hasher.Write(context.Ip())
hasher.Write([]byte(backend.Address()))
s32 := hasher.Sum32()
if s32 > bestHash {
bestHash = s32
result = backends[i]
}
}
}

return result, nil
}
1 change: 1 addition & 0 deletions src/balance/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func init() {
typeRegistry["roundrobin"] = reflect.TypeOf(RoundrobinBalancer{})
typeRegistry["weight"] = reflect.TypeOf(WeightBalancer{})
typeRegistry["iphash"] = reflect.TypeOf(IphashBalancer{})
typeRegistry["iphash1"] = reflect.TypeOf(Iphash1Balancer{})
typeRegistry["leastbandwidth"] = reflect.TypeOf(LeastbandwidthBalancer{})
}

Expand Down
1 change: 1 addition & 0 deletions src/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ func prepareConfig(name string, server config.Server, defaults config.Connection
"leastconn",
"roundrobin",
"leastbandwidth",
"iphash1",
"iphash":
case "":
server.Balance = "weight"
Expand Down
3 changes: 1 addition & 2 deletions src/server/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ func (this *Scheduler) HandleBackendsUpdate(backends []core.Backend) {
updated := map[core.Target]*core.Backend{}
updatedList := make([]*core.Backend, len(backends))

for i := range backends {
b := backends[i]
for i, b := range backends {
oldB, ok := this.backends[b.Target]

if ok {
Expand Down
30 changes: 30 additions & 0 deletions test/dummy_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package test

import (
"fmt"
"net"
)

type DummyContext struct {
ip net.IP
port int
}

func (d DummyContext) String() string {
return fmt.Sprintf("%v:%v", d.Ip(), d.Port())
}

func (d DummyContext) Ip() net.IP {
if d.ip == nil {
d.ip = make(net.IP, 1)
}
return d.ip
}

func (d DummyContext) Port() int {
return d.port
}

func (d DummyContext) Sni() string {
return ""
}
164 changes: 164 additions & 0 deletions test/iphash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package test

import (
"../src/balance"
"../src/core"
"errors"
"fmt"
"math/rand"
"testing"
)

func makeDistribution(balancer core.Balancer, backends []*core.Backend, clients []DummyContext) (map[string]*core.Backend, error) {

result := make(map[string]*core.Backend)

for _, client := range clients {
electedBackend, err := balancer.Elect(client, backends)

if err != nil {
return nil, err
}

if electedBackend == nil {
return nil, errors.New("Elected nil backend!")
}

result[client.ip.String()] = electedBackend
}

return result, nil

}

// Prepare list of backends, for testing purposes they end with .1, .2, .3 etc
// It will be easier to print them if needed
func prepareBackends(base string, n int) []*core.Backend {
backends := make([]*core.Backend, n)

for i := 0; i < n; i++ {
backends[i] = &core.Backend{
Target: core.Target{
Host: fmt.Sprintf("%s.%d", base, i+1),
Port: fmt.Sprintf("%d", 1000+i),
},
}
}

return backends
}

// Prepare random list of clients
func prepareClients(n int) []DummyContext {

clients := make([]DummyContext, n)

for i := 0; i < n; i++ {

ip := make([]byte, 4)
rand.Read(ip)

clients[i] = DummyContext{
ip: ip,
}
}

return clients

}

//TODO enable test when real consisten hashing will be implemented
/*
func TestIPHash2AddingBackendsRedistribution(t *testing.T) {
rand.Seed(time.Now().Unix())
balancer := &balance.Iphash2Balancer{}
N := 50 // initial number of backends
M := 1 // added number of backends
C := 1000 // number of clients
backends := prepareBackends("127.0.0", N)
clients := prepareClients(C)
// Perform balancing for on a given balancer, for clients versus backends
d1, err := makeDistribution(balancer, backends, clients)
if err != nil {
t.Error(err)
t.Fail()
}
extendedBackends := append(backends, prepareBackends("192.168.1", M)...)
// Perform balancing for on a given balancer, for clients versus extended list of backends
d2, err := makeDistribution(balancer, extendedBackends, clients)
if err != nil {
t.Error(err)
t.Fail()
}
Q := 0 // number of rehashed clients
// Q should not be bigger than C/ M+N
// values should differ
for k, v1 := range d1 {
v2 := d2[k]
if v1 != v2 {
Q++
}
}
if Q > C/(M+N) {
t.Fail()
}
}
*/

func TestIPHash1RemovingBackendsStability(t *testing.T) {

balancer := &balance.Iphash1Balancer{}

backends := prepareBackends("127.0.0", 4)
clients := prepareClients(100)

// Perform balancing for on a given balancer, for clients versus backends
d1, err := makeDistribution(balancer, backends, clients)
if err != nil {
t.Error(err)
t.Fail()
}

// Remove a backend from a list(second one)
removedBackend := backends[1]
backends = append(backends[:1], backends[2:]...)

// Perform balancing on the same balancer, same clients, but backends missing one.
d2, err := makeDistribution(balancer, backends, clients)
if err != nil {
t.Error(err)
t.Fail()
}

// check the results
for k, v1 := range d1 {

// in the second try (d2) removed backend will be obviously changed to something else,
// skipping it
if v1 == removedBackend {
continue
}

v2 := d2[k]

// the second try (d2) should not have other changes, so that if some backend (not removed) was
// elected previously, it should be elected now
if v1 != v2 {
t.Fail()
break
}

}

}
19 changes: 0 additions & 19 deletions test/weight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,13 @@ package test
import (
"math"
"math/rand"
"net"
"testing"
"time"

"../src/balance"
"../src/core"
)

type DummyContext struct{}

func (d DummyContext) String() string {
return "123"
}

func (d DummyContext) Ip() net.IP {
return make(net.IP, 1)
}

func (d DummyContext) Port() int {
return 0
}

func (d DummyContext) Sni() string {
return ""
}

func TestWeightDistribution(t *testing.T) {
rand.Seed(time.Now().Unix())
balancer := &balance.WeightBalancer{}
Expand Down

0 comments on commit 785a3d8

Please sign in to comment.