Skip to content

Commit

Permalink
*: fix data race in join. (pingcap#3159)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Apr 27, 2017
1 parent 883ea49 commit c646b59
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 36 deletions.
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,10 @@ func (b *executorBuilder) buildHashJoin(v *plan.PhysicalHashJoin) Executor {
for i := 0; i < e.concurrency; i++ {
ctx := &hashJoinCtx{}
if e.bigFilter != nil {
ctx.bigFilter = e.bigFilter
ctx.bigFilter = e.bigFilter.Clone()
}
if e.otherFilter != nil {
ctx.otherFilter = e.otherFilter
ctx.otherFilter = e.otherFilter.Clone()
}
ctx.datumBuffer = make([]types.Datum, len(e.bigHashKey))
ctx.hashKeyBuffer = make([]byte, 0, 10000)
Expand Down
40 changes: 19 additions & 21 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ type HashJoinExec struct {
bigExec Executor
prepared bool
ctx context.Context
smallFilter []expression.Expression
bigFilter []expression.Expression
otherFilter []expression.Expression
smallFilter expression.CNFExprs
bigFilter expression.CNFExprs
otherFilter expression.CNFExprs
schema *expression.Schema
outer bool
leftSmall bool
Expand Down Expand Up @@ -75,8 +75,8 @@ type HashJoinExec struct {

// hashJoinCtx holds the variables needed to do a hash join in one of many concurrent goroutines.
type hashJoinCtx struct {
bigFilter []expression.Expression
otherFilter []expression.Expression
bigFilter expression.CNFExprs
otherFilter expression.CNFExprs
// Buffer used for encode hash keys.
datumBuffer []types.Datum
hashKeyBuffer []byte
Expand Down Expand Up @@ -153,7 +153,7 @@ func (e *HashJoinExec) fetchBigExec() {
e.wg.Done()
}()
curBatchSize := 1
result := &execResult{rows: make([]*Row, 0, batchSize)}
result := &execResult{rows: make([]*Row, 0, curBatchSize)}
txnCtx := e.ctx.GoCtx()
for {
done := false
Expand All @@ -174,18 +174,18 @@ func (e *HashJoinExec) fetchBigExec() {
break
}
result.rows = append(result.rows, row)
if len(result.rows) >= batchSize {
if len(result.rows) >= curBatchSize {
select {
case <-txnCtx.Done():
return
case e.bigTableResultCh[idx] <- result:
result = &execResult{rows: make([]*Row, 0, batchSize)}
result = &execResult{rows: make([]*Row, 0, curBatchSize)}
}
}
}
cnt++
if done {
if len(result.rows) > 0 && len(result.rows) < batchSize {
if len(result.rows) > 0 {
select {
case <-txnCtx.Done():
return
Expand Down Expand Up @@ -368,12 +368,10 @@ func (e *HashJoinExec) joinOneBigRow(ctx *hashJoinCtx, bigRow *Row, result *exec
err error
)
bigMatched := true
if e.bigFilter != nil {
bigMatched, err = expression.EvalBool(ctx.bigFilter, bigRow.Data, e.ctx)
if err != nil {
result.err = errors.Trace(err)
return false
}
bigMatched, err = expression.EvalBool(ctx.bigFilter, bigRow.Data, e.ctx)
if err != nil {
result.err = errors.Trace(err)
return false
}
if bigMatched {
matchedRows, err = e.constructMatchedRows(ctx, bigRow)
Expand Down Expand Up @@ -503,9 +501,9 @@ type NestedLoopJoinExec struct {
leftSmall bool
prepared bool
Ctx context.Context
SmallFilter []expression.Expression
BigFilter []expression.Expression
OtherFilter []expression.Expression
SmallFilter expression.CNFExprs
BigFilter expression.CNFExprs
OtherFilter expression.CNFExprs
schema *expression.Schema
outer bool
defaultValues []types.Datum
Expand Down Expand Up @@ -656,9 +654,9 @@ type HashSemiJoinExec struct {
bigExec Executor
prepared bool
ctx context.Context
smallFilter []expression.Expression
bigFilter []expression.Expression
otherFilter []expression.Expression
smallFilter expression.CNFExprs
bigFilter expression.CNFExprs
otherFilter expression.CNFExprs
schema *expression.Schema
resultRows []*Row
// In auxMode, the result row always returns with an extra column which stores a boolean
Expand Down
3 changes: 3 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ func (s *testSuite) TestJoin(c *C) {
tk.MustExec("insert into t1 values (1),(2),(3),(4),(5),(6),(7)")
result = tk.MustQuery("select a.c1 from t a , t1 b where a.c1 = b.c1 order by a.c1;")
result.Check(testkit.Rows("1", "2", "3", "4", "5", "6", "7"))
// Test race.
result = tk.MustQuery("select a.c1 from t a , t1 b where a.c1 = b.c1 and a.c1 + b.c1 > 5 order by b.c1")
result.Check(testkit.Rows("3", "4", "5", "6", "7"))
result = tk.MustQuery("select a.c1 from t a , (select * from t1 limit 3) b where a.c1 = b.c1 order by b.c1;")
result.Check(testkit.Rows("1", "2", "3"))

Expand Down
14 changes: 13 additions & 1 deletion expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,20 @@ type Expression interface {
ResolveIndices(schema *Schema)
}

// CNFExprs stands for a CNF expression.
type CNFExprs []Expression

// Clone clones itself.
func (e CNFExprs) Clone() CNFExprs {
cnf := make(CNFExprs, 0, len(e))
for _, expr := range e {
cnf = append(cnf, expr.Clone())
}
return cnf
}

// EvalBool evaluates expression list to a boolean value.
func EvalBool(exprList []Expression, row []types.Datum, ctx context.Context) (bool, error) {
func EvalBool(exprList CNFExprs, row []types.Datum, ctx context.Context) (bool, error) {
for _, expr := range exprList {
data, err := expr.Eval(row)
if err != nil {
Expand Down
33 changes: 33 additions & 0 deletions util/mvmap/fnv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mvmap

const (
offset64 uint64 = 14695981039346656037
prime64 = 1099511628211
)

// fnvHash64 is ported from go library, which is thread-safe.
func fnvHash64(data []byte) uint64 {
hash := offset64
for _, c := range data {
hash *= prime64
hash ^= uint64(c)
}
return hash
}
14 changes: 2 additions & 12 deletions util/mvmap/mvmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package mvmap

import (
"bytes"
"hash"
"hash/fnv"
)

type entry struct {
Expand Down Expand Up @@ -121,15 +119,13 @@ type MVMap struct {
entryStore entryStore
dataStore dataStore
hashTable map[uint64]entryAddr
hashFunc hash.Hash64
length int
}

// NewMVMap creates a new multi-value map.
func NewMVMap() *MVMap {
m := new(MVMap)
m.hashTable = make(map[uint64]entryAddr)
m.hashFunc = fnv.New64()
m.entryStore.slices = [][]entry{make([]entry, 0, 64)}
// Append the first empty entry, so the zero entryAddr can represent null.
m.entryStore.put(entry{})
Expand All @@ -140,7 +136,7 @@ func NewMVMap() *MVMap {
// Put puts the key/value pairs to the MVMap, if the key already exists, old value will not be overwritten,
// values are stored in a list.
func (m *MVMap) Put(key, value []byte) {
hashKey := m.hash(key)
hashKey := fnvHash64(key)
oldEntryAddr := m.hashTable[hashKey]
dataAddr := m.dataStore.put(key, value)
e := entry{
Expand All @@ -157,7 +153,7 @@ func (m *MVMap) Put(key, value []byte) {
// Get gets the values of the key.
func (m *MVMap) Get(key []byte) [][]byte {
var values [][]byte
hashKey := m.hash(key)
hashKey := fnvHash64(key)
entryAddr := m.hashTable[hashKey]
for entryAddr != nullEntryAddr {
e := m.entryStore.get(entryAddr)
Expand All @@ -176,12 +172,6 @@ func (m *MVMap) Get(key []byte) [][]byte {
return values
}

func (m *MVMap) hash(key []byte) uint64 {
m.hashFunc.Reset()
m.hashFunc.Write(key)
return m.hashFunc.Sum64()
}

// Len returns the number of values in th mv map, the number of keys may be less than Len
// if the same key is put more than once.
func (m *MVMap) Len() int {
Expand Down
13 changes: 13 additions & 0 deletions util/mvmap/mvmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"hash/fnv"
"testing"
)

Expand Down Expand Up @@ -78,3 +79,15 @@ func BenchmarkMVMapGet(b *testing.B) {
}
}
}

func TestFNVHash(t *testing.T) {
b := []byte{0xcb, 0xf2, 0x9c, 0xe4, 0x84, 0x22, 0x23, 0x25}
sum1 := fnvHash64(b)
hash := fnv.New64()
hash.Reset()
hash.Write(b)
sum2 := hash.Sum64()
if sum1 != sum2 {
t.FailNow()
}
}

0 comments on commit c646b59

Please sign in to comment.