forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcurrent_map.go
109 lines (96 loc) · 3.24 KB
/
concurrent_map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"sync"
)
// ShardCount controls the shard maps within the concurrent map
var ShardCount = 320
// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (ShardCount) map shards.
type concurrentMap []*concurrentMapShared
// A "thread" safe string to anything map.
type concurrentMapShared struct {
items map[uint64]*entry
sync.RWMutex // Read Write mutex, guards access to internal map.
}
// newConcurrentMap creates a new concurrent map.
func newConcurrentMap() concurrentMap {
m := make(concurrentMap, ShardCount)
for i := 0; i < ShardCount; i++ {
m[i] = &concurrentMapShared{items: make(map[uint64]*entry)}
}
return m
}
// getShard returns shard under given key
func (m concurrentMap) getShard(hashKey uint64) *concurrentMapShared {
return m[hashKey%uint64(ShardCount)]
}
// Insert inserts a value in a shard safely
func (m concurrentMap) Insert(key uint64, value *entry) {
shard := m.getShard(key)
shard.Lock()
v, ok := shard.items[key]
if !ok {
shard.items[key] = value
} else {
value.next = v
shard.items[key] = value
}
shard.Unlock()
}
// UpsertCb : Callback to return new element to be inserted into the map
// It is called while lock is held, therefore it MUST NOT
// try to access other keys in same map, as it can lead to deadlock since
// Go sync.RWLock is not reentrant
type UpsertCb func(exist bool, valueInMap, newValue *entry) *entry
// Upsert: Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m concurrentMap) Upsert(key uint64, value *entry, cb UpsertCb) (res *entry) {
shard := m.getShard(key)
shard.Lock()
v, ok := shard.items[key]
res = cb(ok, v, value)
shard.items[key] = res
shard.Unlock()
return res
}
// Get retrieves an element from map under given key.
// Note that in hash joins, reading proceeds after all writes, so we ignore RLock() here.
// Otherwise, we should use RLock() for concurrent reads and writes.
func (m concurrentMap) Get(key uint64) (*entry, bool) {
// Get shard
shard := m.getShard(key)
// shard.RLock()
// Get item from shard.
val, ok := shard.items[key]
// shard.RUnlock()
return val, ok
}
// IterCb :Iterator callback,called for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb func(key uint64, e *entry)
// IterCb iterates the map using a callback, cheapest way to read
// all elements in a map.
func (m concurrentMap) IterCb(fn IterCb) {
for idx := range m {
shard := (m)[idx]
shard.RLock()
for key, value := range shard.items {
fn(key, value)
}
shard.RUnlock()
}
}