Skip to content

Commit

Permalink
[ENH] Add rendezvous hashing to go and python. Add Assignment policy …
Browse files Browse the repository at this point in the history
…to go so it can be used in future commits (chroma-core#1360)

## Description of changes

*Summarize the changes made by this PR.*
Adds basic rendezvous hashing to the python and go codebases so it can
be used

## Test plan
*How are these changes tested?*
Basic unit tests. Manually tested against k8s.

## Documentation Changes
None required
  • Loading branch information
HammadB authored Nov 17, 2023
1 parent 680c555 commit 9191799
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 13 deletions.
30 changes: 30 additions & 0 deletions chromadb/test/segment/distributed/test_rendezvous_hash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from chromadb.utils.rendezvous_hash import assign, murmur3hasher


def test_rendezvous_hash() -> None:
# Tests the assign works as expected
members = ["a", "b", "c"]
key = "key"

def mock_hasher(member: str, key: str) -> int:
return members.index(member) # Highest index wins

assert assign(key, members, mock_hasher) == "c"


def test_even_distribution() -> None:
member_count = 10
tolerance = 25
nodes = [str(i) for i in range(member_count)]

# Test if keys are evenly distributed across nodes
key_distribution = {node: 0 for node in nodes}
num_keys = 1000
for i in range(num_keys):
key = f"key_{i}"
node = assign(key, nodes, murmur3hasher)
key_distribution[node] += 1

# Check if keys are somewhat evenly distributed
for node in nodes:
assert abs(key_distribution[node] - num_keys / len(nodes)) < tolerance
50 changes: 50 additions & 0 deletions chromadb/utils/rendezvous_hash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# An implementation of https://en.wikipedia.org/wiki/Rendezvous_hashing
from typing import Callable, List, cast
import mmh3

Hasher = Callable[[str, str], int]
Member = str
Members = List[str]
Key = str


def assign(key: Key, members: Members, hasher: Hasher) -> Member:
"""Assigns a key to a member using the rendezvous hashing algorithm"""
if len(members) == 0:
raise ValueError("Cannot assign key to empty memberlist")
if len(members) == 1:
return members[0]
if key == "":
raise ValueError("Cannot assign empty key")

max_score = -1
max_member = None

for member in members:
score = hasher(member, key)
if score > max_score:
max_score = score
max_member = member

max_member = cast(Member, max_member)
return max_member


def merge_hashes(x: int, y: int) -> int:
"""murmurhash3 mix 64-bit"""
acc = x ^ y
acc ^= acc >> 33
acc = (
acc * 0xFF51AFD7ED558CCD
) % 2**64 # We need to mod here to prevent python from using arbitrary size int
acc ^= acc >> 33
acc = (acc * 0xC4CEB9FE1A85EC53) % 2**64
acc ^= acc >> 33
return acc


def murmur3hasher(member: Member, key: Key) -> int:
"""Hashes the key and member using the murmur3 hashing algorithm"""
member_hash = mmh3.hash64(member, signed=False)[0]
key_hash = mmh3.hash64(key, signed=False)[0]
return merge_hashes(member_hash, key_hash)
3 changes: 2 additions & 1 deletion go/coordinator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
Expand All @@ -51,6 +51,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go/coordinator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
Expand Down
5 changes: 4 additions & 1 deletion go/coordinator/internal/coordinator/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func (s *Coordinator) GetTenant(ctx context.Context, getTenant *model.GetTenant)
}

func (s *Coordinator) CreateCollection(ctx context.Context, createCollection *model.CreateCollection) (*model.Collection, error) {
collectionTopic := s.assignCollection(createCollection.ID)
collectionTopic, err := s.assignCollection(createCollection.ID)
if err != nil {
return nil, err
}
createCollection.Topic = collectionTopic
log.Info("apis create collection", zap.Any("collection", createCollection))
collection, err := s.meta.AddCollection(ctx, createCollection)
Expand Down
6 changes: 3 additions & 3 deletions go/coordinator/internal/coordinator/apis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,13 @@ func NewMockAssignmentPolicy(collecions []*model.Collection) *MockAssignmentPoli
}
}

func (m *MockAssignmentPolicy) AssignCollection(collectionID types.UniqueID) string {
func (m *MockAssignmentPolicy) AssignCollection(collectionID types.UniqueID) (string, error) {
for _, collection := range m.collections {
if collection.ID == collectionID {
return collection.Topic
return collection.Topic, nil
}
}
return ""
return "", common.ErrCollectionNotFound
}

func TestCreateGetDeleteCollections(t *testing.T) {
Expand Down
54 changes: 49 additions & 5 deletions go/coordinator/internal/coordinator/assignment_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"fmt"

"github.com/chroma/chroma-coordinator/internal/types"
"github.com/chroma/chroma-coordinator/internal/utils"
)

type CollectionAssignmentPolicy interface {
AssignCollection(collectionID types.UniqueID) string
AssignCollection(collectionID types.UniqueID) (string, error)
}

// SimpleAssignmentPolicy is a simple assignment policy that assigns a 1 collection to 1
Expand All @@ -24,10 +25,53 @@ func NewSimpleAssignmentPolicy(tenantID string, topicNS string) *SimpleAssignmen
}
}

func (s *SimpleAssignmentPolicy) AssignCollection(collectionID types.UniqueID) string {
return createTopicName(s.tenantID, s.topicNS, collectionID.String())
func (s *SimpleAssignmentPolicy) AssignCollection(collectionID types.UniqueID) (string, error) {
return createTopicName(s.tenantID, s.topicNS, collectionID.String()), nil
}

func createTopicName(tenantID string, topicNS string, collectionID string) string {
return fmt.Sprintf("persistent://%s/%s/%s", tenantID, topicNS, collectionID)
func createTopicName(tenantID string, topicNS string, log_name string) string {
return fmt.Sprintf("persistent://%s/%s/%s", tenantID, topicNS, log_name)
}

// RendezvousAssignmentPolicy is an assignment policy that assigns a collection to a topic
// For now it assumes there are 16 topics and uses the rendezvous hashing algorithm to
// assign a collection to a topic.

var topics = [16]string{
"chroma_log_0",
"chroma_log_1",
"chroma_log_2",
"chroma_log_3",
"chroma_log_4",
"chroma_log_5",
"chroma_log_6",
"chroma_log_7",
"chroma_log_8",
"chroma_log_9",
"chroma_log_10",
"chroma_log_11",
"chroma_log_12",
"chroma_log_13",
"chroma_log_14",
"chroma_log_15",
}

type RendezvousAssignmentPolicy struct {
tenantID string
topicNS string
}

func NewRendezvousAssignmentPolicy(tenantID string, topicNS string) *RendezvousAssignmentPolicy {
return &RendezvousAssignmentPolicy{
tenantID: tenantID,
topicNS: topicNS,
}
}

func (r *RendezvousAssignmentPolicy) AssignCollection(collectionID types.UniqueID) (string, error) {
assignment, error := utils.Assign(collectionID.String(), topics[:], utils.Murmur3Hasher)
if error != nil {
return "", error
}
return createTopicName(r.tenantID, r.topicNS, assignment), nil
}
2 changes: 1 addition & 1 deletion go/coordinator/internal/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ func (s *Coordinator) Stop() error {
return nil
}

func (c *Coordinator) assignCollection(collectionID types.UniqueID) string {
func (c *Coordinator) assignCollection(collectionID types.UniqueID) (string, error) {
return c.collectionAssignmentPolicy.AssignCollection(collectionID)
}
4 changes: 3 additions & 1 deletion go/coordinator/internal/grpccoordinator/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor
s := &Server{
healthServer: health.NewServer(),
}
assignmentPolicy := coordinator.NewSimpleAssignmentPolicy("test-tenant", "test-topic")
// assignmentPolicy := coordinator.NewSimpleAssignmentPolicy("test-tenant", "test-topic")
// TODO: make this configuration, and make the pulsar tenant configuration too
assignmentPolicy := coordinator.NewRendezvousAssignmentPolicy("test-tenant", "test-topic")
coordinator, err := coordinator.NewCoordinator(ctx, assignmentPolicy, db)
if err != nil {
return nil, err
Expand Down
62 changes: 62 additions & 0 deletions go/coordinator/internal/utils/rendezvous_hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package utils

import (
"errors"

"github.com/spaolacci/murmur3"
)

type Hasher = func(member string, key string) uint64
type Member = string
type Members = []Member
type Key = string

// assign assigns a key to a member using the rendezvous hashing algorithm.
func Assign(key Key, members Members, hasher Hasher) (Member, error) {
if len(members) == 0 {
return "", errors.New("cannot assign key to empty member list")
}
if len(members) == 1 {
return members[0], nil
}
if key == "" {
return "", errors.New("cannot assign empty key")
}

maxScore := uint64(0)
var maxMember Member

for _, member := range members {
score := hasher(string(member), string(key))
if score > maxScore {
maxScore = score
maxMember = member
}
}

return maxMember, nil
}

func mergeHashes(a uint64, b uint64) uint64 {
acc := a ^ b
acc ^= acc >> 33
acc *= 0xff51afd7ed558ccd
acc ^= acc >> 33
acc *= 0xc4ceb9fe1a85ec53
acc ^= acc >> 33
return acc
}

// NOTE: The python implementation of murmur3 may differ from the golang implementation.
// For now, this is fine since go and python don't need to agree on any hashing schemes
// but if we ever need to agree on a hashing scheme, we should verify that the implementations
// are the same.
func Murmur3Hasher(member string, key string) uint64 {
hasher := murmur3.New64()
hasher.Write([]byte(member))
memberHash := hasher.Sum64()
hasher.Reset()
hasher.Write([]byte(key))
keyHash := hasher.Sum64()
return mergeHashes(memberHash, keyHash)
}
62 changes: 62 additions & 0 deletions go/coordinator/internal/utils/rendezvous_hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package utils

import (
"fmt"
"math"
"testing"
)

func mockHasher(member string, key string) uint64 {
members := []string{"a", "b", "c"}
for i, m := range members {
if m == member {
return uint64(i)
}
}
return 0
}

func TestRendezvousHash(t *testing.T) {
members := []string{"a", "b", "c"}
key := "key"

// Test that the assign function returns the expected result
node, error := Assign(key, members, mockHasher)

if error != nil {
t.Errorf("Assign() returned an error: %v", error)
}

if node != "c" {
t.Errorf("Assign() = %v, want %v", node, "c")
}
}

func TestEvenDistribution(t *testing.T) {
memberCount := 10
tolerance := 25
var nodes []string
for i := 0; i < memberCount; i++ {
nodes = append(nodes, fmt.Sprint(i+'0')) // Convert int to string
}

keyDistribution := make(map[string]int)
numKeys := 1000

// Test if keys are evenly distributed across nodes
for i := 0; i < numKeys; i++ {
key := "key_" + fmt.Sprint(i)
node, err := Assign(key, nodes, Murmur3Hasher)
if err != nil {
t.Errorf("Assign() returned an error: %v", err)
}
keyDistribution[node]++
}

// Check if keys are somewhat evenly distributed
for _, count := range keyDistribution {
if math.Abs(float64(count-numKeys/memberCount)) > float64(tolerance) {
t.Errorf("Key distribution is uneven: %v", keyDistribution)
}
}
}
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
'kubernetes>=28.1.0',
'tenacity>=8.2.3',
'PyYAML>=6.0.0',
'mmh3>=4.0.1',
]

[tool.black]
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ graphlib_backport==1.0.3; python_version < '3.9'
grpcio>=1.58.0
importlib-resources
kubernetes>=28.1.0
mmh3>=4.0.1
numpy==1.21.6; python_version < '3.8'
numpy>=1.22.4; python_version >= '3.8'
onnxruntime>=1.14.1
Expand All @@ -17,11 +18,11 @@ posthog==2.4.0
pulsar-client==3.1.0
pydantic>=1.9
pypika==0.48.9
PyYAML>=6.0.0
requests==2.28.1
tenacity>=8.2.3
tokenizers==0.13.2
tqdm>=4.65.0
typer>=0.9.0
typing_extensions>=4.5.0
uvicorn[standard]==0.18.3
PyYAML>=6.0.0

0 comments on commit 9191799

Please sign in to comment.