forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sysvar_cache.go
227 lines (212 loc) · 7.66 KB
/
sysvar_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package domain
import (
"context"
"fmt"
"strconv"
"sync"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
storekv "github.com/tikv/client-go/v2/kv"
"go.uber.org/zap"
)
// The sysvar cache replaces the GlobalVariableCache.
// It is an improvement because it operates similar to privilege cache:
// - it caches for 30s instead of 2s
// - the cache is invalidated on update
// - an etcd notification is sent to other tidb servers.
// sysVarCache represents the cache of system variables broken up into session and global scope.
type sysVarCache struct {
sync.RWMutex // protects global and session maps
global map[string]string
session map[string]string
rebuildLock sync.Mutex // protects concurrent rebuild
}
func (do *Domain) rebuildSysVarCacheIfNeeded() (err error) {
do.sysVarCache.RLock()
cacheNeedsRebuild := len(do.sysVarCache.session) == 0 || len(do.sysVarCache.global) == 0
do.sysVarCache.RUnlock()
if cacheNeedsRebuild {
logutil.BgLogger().Warn("sysvar cache is empty, triggering rebuild")
if err = do.rebuildSysVarCache(nil); err != nil {
logutil.BgLogger().Error("rebuilding sysvar cache failed", zap.Error(err))
}
}
return err
}
// GetSessionCache gets a copy of the session sysvar cache.
// The intention is to copy it directly to the systems[] map
// on creating a new session.
func (do *Domain) GetSessionCache() (map[string]string, error) {
if err := do.rebuildSysVarCacheIfNeeded(); err != nil {
return nil, err
}
do.sysVarCache.RLock()
defer do.sysVarCache.RUnlock()
// Perform a deep copy since this will be assigned directly to the session
newMap := make(map[string]string, len(do.sysVarCache.session))
for k, v := range do.sysVarCache.session {
newMap[k] = v
}
return newMap, nil
}
// GetGlobalVar gets an individual global var from the sysvar cache.
func (do *Domain) GetGlobalVar(name string) (string, error) {
if err := do.rebuildSysVarCacheIfNeeded(); err != nil {
return "", err
}
do.sysVarCache.RLock()
defer do.sysVarCache.RUnlock()
if val, ok := do.sysVarCache.global[name]; ok {
return val, nil
}
logutil.BgLogger().Warn("could not find key in global cache", zap.String("name", name))
return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name)
}
func (do *Domain) fetchTableValues(ctx sessionctx.Context) (map[string]string, error) {
tableContents := make(map[string]string)
// Copy all variables from the table to tableContents
exec := ctx.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.Background(), `SELECT variable_name, variable_value FROM mysql.global_variables`)
if err != nil {
return tableContents, err
}
rows, _, err := exec.ExecRestrictedStmt(context.TODO(), stmt)
if err != nil {
return nil, err
}
for _, row := range rows {
name := row.GetString(0)
val := row.GetString(1)
tableContents[name] = val
}
return tableContents, nil
}
// rebuildSysVarCache rebuilds the sysvar cache both globally and for session vars.
// It needs to be called when sysvars are added or removed.
func (do *Domain) rebuildSysVarCache(ctx sessionctx.Context) error {
newSessionCache := make(map[string]string)
newGlobalCache := make(map[string]string)
if ctx == nil {
sysSessionPool := do.SysSessionPool()
res, err := sysSessionPool.Get()
if err != nil {
return err
}
defer sysSessionPool.Put(res)
ctx = res.(sessionctx.Context)
}
// Only one rebuild can be in progress at a time, this prevents a lost update race
// where an earlier fetchTableValues() finishes last.
do.sysVarCache.rebuildLock.Lock()
defer do.sysVarCache.rebuildLock.Unlock()
tableContents, err := do.fetchTableValues(ctx)
if err != nil {
return err
}
for _, sv := range variable.GetSysVars() {
sVal := sv.Value
if _, ok := tableContents[sv.Name]; ok {
sVal = tableContents[sv.Name]
}
// session cache stores non-skippable variables, which essentially means session scope.
// for historical purposes there are some globals, but these should eventually be removed.
if !sv.SkipInit() {
newSessionCache[sv.Name] = sVal
}
if sv.HasGlobalScope() {
newGlobalCache[sv.Name] = sVal
}
// Propagate any changes to the server scoped variables
checkEnableServerGlobalVar(sv.Name, sVal)
}
logutil.BgLogger().Debug("rebuilding sysvar cache")
do.sysVarCache.Lock()
defer do.sysVarCache.Unlock()
do.sysVarCache.session = newSessionCache
do.sysVarCache.global = newGlobalCache
return nil
}
// checkEnableServerGlobalVar processes variables that acts in server and global level.
// This is required because the SetGlobal function on the sysvar struct only executes on
// the initiating tidb-server. There is no current method to say "run this function on all
// tidb servers when the value of this variable changes". If you do not require changes to
// be applied on all servers, use a getter/setter instead! You don't need to add to this list.
func checkEnableServerGlobalVar(name, sVal string) {
var err error
switch name {
case variable.TiDBEnableLocalTxn:
variable.EnableLocalTxn.Store(variable.TiDBOptOn(sVal))
case variable.TiDBEnableStmtSummary:
err = stmtsummary.StmtSummaryByDigestMap.SetEnabled(sVal, false)
case variable.TiDBStmtSummaryInternalQuery:
err = stmtsummary.StmtSummaryByDigestMap.SetEnabledInternalQuery(sVal, false)
case variable.TiDBStmtSummaryRefreshInterval:
err = stmtsummary.StmtSummaryByDigestMap.SetRefreshInterval(sVal, false)
case variable.TiDBStmtSummaryHistorySize:
err = stmtsummary.StmtSummaryByDigestMap.SetHistorySize(sVal, false)
case variable.TiDBStmtSummaryMaxStmtCount:
err = stmtsummary.StmtSummaryByDigestMap.SetMaxStmtCount(sVal, false)
case variable.TiDBStmtSummaryMaxSQLLength:
err = stmtsummary.StmtSummaryByDigestMap.SetMaxSQLLength(sVal, false)
case variable.TiDBCapturePlanBaseline:
variable.CapturePlanBaseline.Set(sVal, false)
case variable.TiDBEnableTopSQL:
variable.TopSQLVariable.Enable.Store(variable.TiDBOptOn(sVal))
case variable.TiDBTopSQLPrecisionSeconds:
var val int64
val, err = strconv.ParseInt(sVal, 10, 64)
if err != nil {
break
}
variable.TopSQLVariable.PrecisionSeconds.Store(val)
case variable.TiDBTopSQLMaxStatementCount:
var val int64
val, err = strconv.ParseInt(sVal, 10, 64)
if err != nil {
break
}
variable.TopSQLVariable.MaxStatementCount.Store(val)
case variable.TiDBTopSQLMaxCollect:
var val int64
val, err = strconv.ParseInt(sVal, 10, 64)
if err != nil {
break
}
variable.TopSQLVariable.MaxCollect.Store(val)
case variable.TiDBTopSQLReportIntervalSeconds:
var val int64
val, err = strconv.ParseInt(sVal, 10, 64)
if err != nil {
break
}
variable.TopSQLVariable.ReportIntervalSeconds.Store(val)
case variable.TiDBRestrictedReadOnly:
variable.RestrictedReadOnly.Store(variable.TiDBOptOn(sVal))
case variable.TiDBStoreLimit:
var val int64
val, err = strconv.ParseInt(sVal, 10, 64)
if err != nil {
break
}
storekv.StoreLimit.Store(val)
}
if err != nil {
logutil.BgLogger().Error(fmt.Sprintf("load global variable %s error", name), zap.Error(err))
}
}