From 919179947affafd50615c650104133a22ca96521 Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Thu, 16 Nov 2023 16:35:47 -0800 Subject: [PATCH] [ENH] Add rendezvous hashing to go and python. Add Assignment policy to go so it can be used in future commits (#1360) ## Description of changes *Summarize the changes made by this PR.* Adds basic rendezvous hashing to the python and go codebases so it can be used ## Test plan *How are these changes tested?* Basic unit tests. Manually tested against k8s. ## Documentation Changes None required --- .../distributed/test_rendezvous_hash.py | 30 +++++++++ chromadb/utils/rendezvous_hash.py | 50 +++++++++++++++ go/coordinator/go.mod | 3 +- go/coordinator/go.sum | 2 + go/coordinator/internal/coordinator/apis.go | 5 +- .../internal/coordinator/apis_test.go | 6 +- .../internal/coordinator/assignment_policy.go | 54 ++++++++++++++-- .../internal/coordinator/coordinator.go | 2 +- .../internal/grpccoordinator/server.go | 4 +- .../internal/utils/rendezvous_hash.go | 62 +++++++++++++++++++ .../internal/utils/rendezvous_hash_test.go | 62 +++++++++++++++++++ pyproject.toml | 1 + requirements.txt | 3 +- 13 files changed, 271 insertions(+), 13 deletions(-) create mode 100644 chromadb/test/segment/distributed/test_rendezvous_hash.py create mode 100644 chromadb/utils/rendezvous_hash.py create mode 100644 go/coordinator/internal/utils/rendezvous_hash.go create mode 100644 go/coordinator/internal/utils/rendezvous_hash_test.go diff --git a/chromadb/test/segment/distributed/test_rendezvous_hash.py b/chromadb/test/segment/distributed/test_rendezvous_hash.py new file mode 100644 index 00000000000..922ff61c97b --- /dev/null +++ b/chromadb/test/segment/distributed/test_rendezvous_hash.py @@ -0,0 +1,30 @@ +from chromadb.utils.rendezvous_hash import assign, murmur3hasher + + +def test_rendezvous_hash() -> None: + # Tests the assign works as expected + members = ["a", "b", "c"] + key = "key" + + def mock_hasher(member: str, key: str) -> int: + return members.index(member) # Highest index wins + + assert assign(key, members, mock_hasher) == "c" + + +def test_even_distribution() -> None: + member_count = 10 + tolerance = 25 + nodes = [str(i) for i in range(member_count)] + + # Test if keys are evenly distributed across nodes + key_distribution = {node: 0 for node in nodes} + num_keys = 1000 + for i in range(num_keys): + key = f"key_{i}" + node = assign(key, nodes, murmur3hasher) + key_distribution[node] += 1 + + # Check if keys are somewhat evenly distributed + for node in nodes: + assert abs(key_distribution[node] - num_keys / len(nodes)) < tolerance diff --git a/chromadb/utils/rendezvous_hash.py b/chromadb/utils/rendezvous_hash.py new file mode 100644 index 00000000000..0db248f93ac --- /dev/null +++ b/chromadb/utils/rendezvous_hash.py @@ -0,0 +1,50 @@ +# An implementation of https://en.wikipedia.org/wiki/Rendezvous_hashing +from typing import Callable, List, cast +import mmh3 + +Hasher = Callable[[str, str], int] +Member = str +Members = List[str] +Key = str + + +def assign(key: Key, members: Members, hasher: Hasher) -> Member: + """Assigns a key to a member using the rendezvous hashing algorithm""" + if len(members) == 0: + raise ValueError("Cannot assign key to empty memberlist") + if len(members) == 1: + return members[0] + if key == "": + raise ValueError("Cannot assign empty key") + + max_score = -1 + max_member = None + + for member in members: + score = hasher(member, key) + if score > max_score: + max_score = score + max_member = member + + max_member = cast(Member, max_member) + return max_member + + +def merge_hashes(x: int, y: int) -> int: + """murmurhash3 mix 64-bit""" + acc = x ^ y + acc ^= acc >> 33 + acc = ( + acc * 0xFF51AFD7ED558CCD + ) % 2**64 # We need to mod here to prevent python from using arbitrary size int + acc ^= acc >> 33 + acc = (acc * 0xC4CEB9FE1A85EC53) % 2**64 + acc ^= acc >> 33 + return acc + + +def murmur3hasher(member: Member, key: Key) -> int: + """Hashes the key and member using the murmur3 hashing algorithm""" + member_hash = mmh3.hash64(member, signed=False)[0] + key_hash = mmh3.hash64(key, signed=False)[0] + return merge_hashes(member_hash, key_hash) diff --git a/go/coordinator/go.mod b/go/coordinator/go.mod index 2361668a104..1412079e69e 100644 --- a/go/coordinator/go.mod +++ b/go/coordinator/go.mod @@ -22,13 +22,13 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-sql-driver/mysql v1.7.1 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.3 // indirect + github.com/go-sql-driver/mysql v1.7.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/gnostic-models v0.6.8 // indirect @@ -51,6 +51,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect + github.com/spaolacci/murmur3 v1.1.0 github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.1 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go/coordinator/go.sum b/go/coordinator/go.sum index f0186b52f34..953031f3e8e 100644 --- a/go/coordinator/go.sum +++ b/go/coordinator/go.sum @@ -97,6 +97,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= diff --git a/go/coordinator/internal/coordinator/apis.go b/go/coordinator/internal/coordinator/apis.go index 11a80bdb26b..24cb1a5ee13 100644 --- a/go/coordinator/internal/coordinator/apis.go +++ b/go/coordinator/internal/coordinator/apis.go @@ -68,7 +68,10 @@ func (s *Coordinator) GetTenant(ctx context.Context, getTenant *model.GetTenant) } func (s *Coordinator) CreateCollection(ctx context.Context, createCollection *model.CreateCollection) (*model.Collection, error) { - collectionTopic := s.assignCollection(createCollection.ID) + collectionTopic, err := s.assignCollection(createCollection.ID) + if err != nil { + return nil, err + } createCollection.Topic = collectionTopic log.Info("apis create collection", zap.Any("collection", createCollection)) collection, err := s.meta.AddCollection(ctx, createCollection) diff --git a/go/coordinator/internal/coordinator/apis_test.go b/go/coordinator/internal/coordinator/apis_test.go index 93fe8fbe3b4..465bdd504aa 100644 --- a/go/coordinator/internal/coordinator/apis_test.go +++ b/go/coordinator/internal/coordinator/apis_test.go @@ -214,13 +214,13 @@ func NewMockAssignmentPolicy(collecions []*model.Collection) *MockAssignmentPoli } } -func (m *MockAssignmentPolicy) AssignCollection(collectionID types.UniqueID) string { +func (m *MockAssignmentPolicy) AssignCollection(collectionID types.UniqueID) (string, error) { for _, collection := range m.collections { if collection.ID == collectionID { - return collection.Topic + return collection.Topic, nil } } - return "" + return "", common.ErrCollectionNotFound } func TestCreateGetDeleteCollections(t *testing.T) { diff --git a/go/coordinator/internal/coordinator/assignment_policy.go b/go/coordinator/internal/coordinator/assignment_policy.go index 0654d4df128..2a8b22604cf 100644 --- a/go/coordinator/internal/coordinator/assignment_policy.go +++ b/go/coordinator/internal/coordinator/assignment_policy.go @@ -4,10 +4,11 @@ import ( "fmt" "github.com/chroma/chroma-coordinator/internal/types" + "github.com/chroma/chroma-coordinator/internal/utils" ) type CollectionAssignmentPolicy interface { - AssignCollection(collectionID types.UniqueID) string + AssignCollection(collectionID types.UniqueID) (string, error) } // SimpleAssignmentPolicy is a simple assignment policy that assigns a 1 collection to 1 @@ -24,10 +25,53 @@ func NewSimpleAssignmentPolicy(tenantID string, topicNS string) *SimpleAssignmen } } -func (s *SimpleAssignmentPolicy) AssignCollection(collectionID types.UniqueID) string { - return createTopicName(s.tenantID, s.topicNS, collectionID.String()) +func (s *SimpleAssignmentPolicy) AssignCollection(collectionID types.UniqueID) (string, error) { + return createTopicName(s.tenantID, s.topicNS, collectionID.String()), nil } -func createTopicName(tenantID string, topicNS string, collectionID string) string { - return fmt.Sprintf("persistent://%s/%s/%s", tenantID, topicNS, collectionID) +func createTopicName(tenantID string, topicNS string, log_name string) string { + return fmt.Sprintf("persistent://%s/%s/%s", tenantID, topicNS, log_name) +} + +// RendezvousAssignmentPolicy is an assignment policy that assigns a collection to a topic +// For now it assumes there are 16 topics and uses the rendezvous hashing algorithm to +// assign a collection to a topic. + +var topics = [16]string{ + "chroma_log_0", + "chroma_log_1", + "chroma_log_2", + "chroma_log_3", + "chroma_log_4", + "chroma_log_5", + "chroma_log_6", + "chroma_log_7", + "chroma_log_8", + "chroma_log_9", + "chroma_log_10", + "chroma_log_11", + "chroma_log_12", + "chroma_log_13", + "chroma_log_14", + "chroma_log_15", +} + +type RendezvousAssignmentPolicy struct { + tenantID string + topicNS string +} + +func NewRendezvousAssignmentPolicy(tenantID string, topicNS string) *RendezvousAssignmentPolicy { + return &RendezvousAssignmentPolicy{ + tenantID: tenantID, + topicNS: topicNS, + } +} + +func (r *RendezvousAssignmentPolicy) AssignCollection(collectionID types.UniqueID) (string, error) { + assignment, error := utils.Assign(collectionID.String(), topics[:], utils.Murmur3Hasher) + if error != nil { + return "", error + } + return createTopicName(r.tenantID, r.topicNS, assignment), nil } diff --git a/go/coordinator/internal/coordinator/coordinator.go b/go/coordinator/internal/coordinator/coordinator.go index bb71457fa3d..1e61b8c0f39 100644 --- a/go/coordinator/internal/coordinator/coordinator.go +++ b/go/coordinator/internal/coordinator/coordinator.go @@ -43,6 +43,6 @@ func (s *Coordinator) Stop() error { return nil } -func (c *Coordinator) assignCollection(collectionID types.UniqueID) string { +func (c *Coordinator) assignCollection(collectionID types.UniqueID) (string, error) { return c.collectionAssignmentPolicy.AssignCollection(collectionID) } diff --git a/go/coordinator/internal/grpccoordinator/server.go b/go/coordinator/internal/grpccoordinator/server.go index 82c15136f8d..e26bdb976ba 100644 --- a/go/coordinator/internal/grpccoordinator/server.go +++ b/go/coordinator/internal/grpccoordinator/server.go @@ -71,7 +71,9 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor s := &Server{ healthServer: health.NewServer(), } - assignmentPolicy := coordinator.NewSimpleAssignmentPolicy("test-tenant", "test-topic") + // assignmentPolicy := coordinator.NewSimpleAssignmentPolicy("test-tenant", "test-topic") + // TODO: make this configuration, and make the pulsar tenant configuration too + assignmentPolicy := coordinator.NewRendezvousAssignmentPolicy("test-tenant", "test-topic") coordinator, err := coordinator.NewCoordinator(ctx, assignmentPolicy, db) if err != nil { return nil, err diff --git a/go/coordinator/internal/utils/rendezvous_hash.go b/go/coordinator/internal/utils/rendezvous_hash.go new file mode 100644 index 00000000000..374d78594cb --- /dev/null +++ b/go/coordinator/internal/utils/rendezvous_hash.go @@ -0,0 +1,62 @@ +package utils + +import ( + "errors" + + "github.com/spaolacci/murmur3" +) + +type Hasher = func(member string, key string) uint64 +type Member = string +type Members = []Member +type Key = string + +// assign assigns a key to a member using the rendezvous hashing algorithm. +func Assign(key Key, members Members, hasher Hasher) (Member, error) { + if len(members) == 0 { + return "", errors.New("cannot assign key to empty member list") + } + if len(members) == 1 { + return members[0], nil + } + if key == "" { + return "", errors.New("cannot assign empty key") + } + + maxScore := uint64(0) + var maxMember Member + + for _, member := range members { + score := hasher(string(member), string(key)) + if score > maxScore { + maxScore = score + maxMember = member + } + } + + return maxMember, nil +} + +func mergeHashes(a uint64, b uint64) uint64 { + acc := a ^ b + acc ^= acc >> 33 + acc *= 0xff51afd7ed558ccd + acc ^= acc >> 33 + acc *= 0xc4ceb9fe1a85ec53 + acc ^= acc >> 33 + return acc +} + +// NOTE: The python implementation of murmur3 may differ from the golang implementation. +// For now, this is fine since go and python don't need to agree on any hashing schemes +// but if we ever need to agree on a hashing scheme, we should verify that the implementations +// are the same. +func Murmur3Hasher(member string, key string) uint64 { + hasher := murmur3.New64() + hasher.Write([]byte(member)) + memberHash := hasher.Sum64() + hasher.Reset() + hasher.Write([]byte(key)) + keyHash := hasher.Sum64() + return mergeHashes(memberHash, keyHash) +} diff --git a/go/coordinator/internal/utils/rendezvous_hash_test.go b/go/coordinator/internal/utils/rendezvous_hash_test.go new file mode 100644 index 00000000000..282e7ca286b --- /dev/null +++ b/go/coordinator/internal/utils/rendezvous_hash_test.go @@ -0,0 +1,62 @@ +package utils + +import ( + "fmt" + "math" + "testing" +) + +func mockHasher(member string, key string) uint64 { + members := []string{"a", "b", "c"} + for i, m := range members { + if m == member { + return uint64(i) + } + } + return 0 +} + +func TestRendezvousHash(t *testing.T) { + members := []string{"a", "b", "c"} + key := "key" + + // Test that the assign function returns the expected result + node, error := Assign(key, members, mockHasher) + + if error != nil { + t.Errorf("Assign() returned an error: %v", error) + } + + if node != "c" { + t.Errorf("Assign() = %v, want %v", node, "c") + } +} + +func TestEvenDistribution(t *testing.T) { + memberCount := 10 + tolerance := 25 + var nodes []string + for i := 0; i < memberCount; i++ { + nodes = append(nodes, fmt.Sprint(i+'0')) // Convert int to string + } + + keyDistribution := make(map[string]int) + numKeys := 1000 + + // Test if keys are evenly distributed across nodes + for i := 0; i < numKeys; i++ { + key := "key_" + fmt.Sprint(i) + node, err := Assign(key, nodes, Murmur3Hasher) + if err != nil { + t.Errorf("Assign() returned an error: %v", err) + } + keyDistribution[node]++ + } + + // Check if keys are somewhat evenly distributed + for _, count := range keyDistribution { + if math.Abs(float64(count-numKeys/memberCount)) > float64(tolerance) { + t.Errorf("Key distribution is uneven: %v", keyDistribution) + } + } +} diff --git a/pyproject.toml b/pyproject.toml index 21a795e936d..f5579c62332 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ 'kubernetes>=28.1.0', 'tenacity>=8.2.3', 'PyYAML>=6.0.0', + 'mmh3>=4.0.1', ] [tool.black] diff --git a/requirements.txt b/requirements.txt index c23ddcc412e..3ba61555fa5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ graphlib_backport==1.0.3; python_version < '3.9' grpcio>=1.58.0 importlib-resources kubernetes>=28.1.0 +mmh3>=4.0.1 numpy==1.21.6; python_version < '3.8' numpy>=1.22.4; python_version >= '3.8' onnxruntime>=1.14.1 @@ -17,6 +18,7 @@ posthog==2.4.0 pulsar-client==3.1.0 pydantic>=1.9 pypika==0.48.9 +PyYAML>=6.0.0 requests==2.28.1 tenacity>=8.2.3 tokenizers==0.13.2 @@ -24,4 +26,3 @@ tqdm>=4.65.0 typer>=0.9.0 typing_extensions>=4.5.0 uvicorn[standard]==0.18.3 -PyYAML>=6.0.0