-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathconcurrent_map.go
95 lines (81 loc) · 2.69 KB
/
concurrent_map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package concurrent_map
import (
"sync"
)
// ConcurrentMap is a thread safe map collection with better performance.
// The backend map entries are separated into the different partitions.
// Threads can access the different partitions safely without lock.
type ConcurrentMap struct {
partitions []*innerMap
numOfBlockets int
}
// Partitionable is the interface which should be implemented by key type.
// It is to define how to partition the entries.
type Partitionable interface {
// Value is raw value of the key
Value() interface{}
// PartitionKey is used for getting the partition to store the entry with the key.
// E.g. the key's hash could be used as its PartitionKey
// The partition for the key is partitions[(PartitionKey % m.numOfBlockets)]
//
// 1 Why not provide the default hash function for partition?
// Ans: As you known, the partition solution would impact the performance significantly.
// The proper partition solution balances the access to the different partitions and
// avoid of the hot partition. The access mode highly relates to your business.
// So, the better partition solution would just be designed according to your business.
PartitionKey() int64
}
type innerMap struct {
m map[interface{}]interface{}
lock sync.RWMutex
}
func createInnerMap() *innerMap {
return &innerMap{
m: make(map[interface{}]interface{}),
}
}
func (im *innerMap) get(key Partitionable) (interface{}, bool) {
keyVal := key.Value()
im.lock.RLock()
v, ok := im.m[keyVal]
im.lock.RUnlock()
return v, ok
}
func (im *innerMap) set(key Partitionable, v interface{}) {
keyVal := key.Value()
im.lock.Lock()
im.m[keyVal] = v
im.lock.Unlock()
}
func (im *innerMap) del(key Partitionable) {
keyVal := key.Value()
im.lock.Lock()
delete(im.m, keyVal)
im.lock.Unlock()
}
// CreateConcurrentMap is to create a ConcurrentMap with the setting number of the partitions
func CreateConcurrentMap(numOfPartitions int) *ConcurrentMap {
var partitions []*innerMap
for i := 0; i < numOfPartitions; i++ {
partitions = append(partitions, createInnerMap())
}
return &ConcurrentMap{partitions, numOfPartitions}
}
func (m *ConcurrentMap) getPartition(key Partitionable) *innerMap {
partitionID := key.PartitionKey() % int64(m.numOfBlockets)
return (*innerMap)(m.partitions[partitionID])
}
// Get is to get the value by the key
func (m *ConcurrentMap) Get(key Partitionable) (interface{}, bool) {
return m.getPartition(key).get(key)
}
// Set is to store the KV entry to the map
func (m *ConcurrentMap) Set(key Partitionable, v interface{}) {
im := m.getPartition(key)
im.set(key, v)
}
// Del is to delete the entries by the key
func (m *ConcurrentMap) Del(key Partitionable) {
im := m.getPartition(key)
im.del(key)
}