Skip to content

Commit

Permalink
*: refactor distsql and executor. (pingcap#2942)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored and hanfei1991 committed Mar 29, 2017
1 parent 3bfc820 commit 78f7eab
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 159 deletions.
135 changes: 18 additions & 117 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
)

var (
errInvalidResp = terror.ClassXEval.New(codeInvalidResp, "invalid response")
errNilResp = terror.ClassXEval.New(codeNilResp, "client returns nil response")
)

var (
Expand All @@ -42,34 +39,27 @@ var (
type SelectResult interface {
// Next gets the next partial result.
Next() (PartialResult, error)
// SetFields sets the expected result type.
SetFields(fields []*types.FieldType)
// Close closes the iterator.
Close() error
// Fetch fetches partial results from client.
// The caller should call SetFields() before call Fetch().
Fetch(ctx goctx.Context)
// IgnoreData sets ignore data attr to true.
// For index double scan, we do not need row data when scanning index.
IgnoreData()
}

// PartialResult is the result from a single region server.
type PartialResult interface {
// Next returns the next row of the sub result.
// If no more row to return, data would be nil.
Next() (handle int64, data []types.Datum, err error)
// Next returns the next rowData of the sub result.
// If no more row to return, rowData would be nil.
Next() (handle int64, rowData []byte, err error)
// Close closes the partial result.
Close() error
}

// SelectResult is used to get response rows from SelectRequest.
type selectResult struct {
index bool
aggregate bool
fields []*types.FieldType
resp kv.Response
ignoreData bool
label string
aggregate bool
resp kv.Response

results chan resultWithErr
closed chan struct{}
Expand All @@ -89,13 +79,7 @@ func (r *selectResult) fetch(ctx goctx.Context) {
defer func() {
close(r.results)
duration := time.Since(startTime)
var label string
if r.index {
label = "index"
} else {
label = "table"
}
queryHistgram.WithLabelValues(label).Observe(duration.Seconds())
queryHistgram.WithLabelValues(r.label).Observe(duration.Seconds())
}()
for {
resultSubset, err := r.resp.Next()
Expand All @@ -106,12 +90,7 @@ func (r *selectResult) fetch(ctx goctx.Context) {
if resultSubset == nil {
return
}
pr := &partialResult{
index: r.index,
fields: r.fields,
aggregate: r.aggregate,
ignoreData: r.ignoreData,
}
pr := &partialResult{}
pr.unmarshal(resultSubset)

select {
Expand All @@ -131,15 +110,6 @@ func (r *selectResult) Next() (PartialResult, error) {
return re.result, errors.Trace(re.err)
}

// SetFields sets select result field types.
func (r *selectResult) SetFields(fields []*types.FieldType) {
r.fields = fields
}

func (r *selectResult) IgnoreData() {
r.ignoreData = true
}

// Close closes SelectResult.
func (r *selectResult) Close() error {
// close this channel tell fetch goroutine to exit
Expand All @@ -149,14 +119,10 @@ func (r *selectResult) Close() error {

// partialResult represents a subset of select result.
type partialResult struct {
index bool
aggregate bool
fields []*types.FieldType
resp *tipb.SelectResponse
chunkIdx int
cursor int
dataOffset int64
ignoreData bool
}

func (pr *partialResult) unmarshal(resultSubset []byte) error {
Expand All @@ -173,60 +139,17 @@ func (pr *partialResult) unmarshal(resultSubset []byte) error {
return nil
}

var dummyData = make([]types.Datum, 0)

// Next returns the next row of the sub result.
// If no more row to return, data would be nil.
func (pr *partialResult) Next() (handle int64, data []types.Datum, err error) {
if len(pr.resp.Chunks) > 0 {
// For new resp rows structure.
chunk := pr.getChunk()
if chunk == nil {
return 0, nil, nil
}
rowMeta := chunk.RowsMeta[pr.cursor]
if !pr.ignoreData {
rowData := chunk.RowsData[pr.dataOffset : pr.dataOffset+rowMeta.Length]
data, err = tablecodec.DecodeValues(rowData, pr.fields, pr.index)
if err != nil {
return 0, nil, errors.Trace(err)
}
pr.dataOffset += rowMeta.Length
}
if data == nil {
data = dummyData
}
if !pr.aggregate {
handle = rowMeta.Handle
}
pr.cursor++
return
}
if pr.cursor >= len(pr.resp.Rows) {
func (pr *partialResult) Next() (handle int64, data []byte, err error) {
chunk := pr.getChunk()
if chunk == nil {
return 0, nil, nil
}
row := pr.resp.Rows[pr.cursor]
if !pr.ignoreData {
data, err = tablecodec.DecodeValues(row.Data, pr.fields, pr.index)
if err != nil {
return 0, nil, errors.Trace(err)
}
}
if data == nil {
// When no column is referenced, the data may be nil, like 'select count(*) from t'.
// In this case, we need to create a zero length datum slice,
// as caller will check if data is nil to finish iteration.
// data = make([]types.Datum, 0)
data = dummyData
}
if !pr.aggregate {
handleBytes := row.GetHandle()
_, datum, err := codec.DecodeOne(handleBytes)
if err != nil {
return 0, nil, errors.Trace(err)
}
handle = datum.GetInt64()
}
rowMeta := chunk.RowsMeta[pr.cursor]
data = chunk.RowsData[pr.dataOffset : pr.dataOffset+rowMeta.Length]
pr.dataOffset += rowMeta.Length
handle = rowMeta.Handle
pr.cursor++
return
}
Expand Down Expand Up @@ -286,18 +209,12 @@ func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRan
// If Aggregates is not nil, we should set result fields latter.
if len(req.Aggregates) == 0 && len(req.GroupBy) == 0 {
if req.TableInfo != nil {
result.fields = ProtoColumnsToFieldTypes(req.TableInfo.Columns)
result.label = "table"
} else {
result.fields = ProtoColumnsToFieldTypes(req.IndexInfo.Columns)
length := len(req.IndexInfo.Columns)
if req.IndexInfo.Columns[length-1].GetPkHandle() {
// Returned index row do not contains extra PKHandle column.
result.fields = result.fields[:length-1]
}
result.index = true
result.label = "index"
}
} else {
result.aggregate = true
result.label = "aggregate"
}
return result, nil
}
Expand Down Expand Up @@ -378,22 +295,6 @@ func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool) []*tipb.Column
return cols
}

// ProtoColumnsToFieldTypes converts tipb column info slice to FieldTyps slice.
func ProtoColumnsToFieldTypes(pColumns []*tipb.ColumnInfo) []*types.FieldType {
fields := make([]*types.FieldType, len(pColumns))
for i, v := range pColumns {
field := new(types.FieldType)
field.Tp = byte(v.GetTp())
field.Collate = mysql.Collations[byte(v.GetCollation())]
field.Decimal = int(v.GetDecimal())
field.Flen = int(v.GetColumnLen())
field.Flag = uint(v.GetFlag())
field.Elems = v.GetElems()
fields[i] = field
}
return fields
}

// IndexToProto converts a model.IndexInfo to a tipb.IndexInfo.
func IndexToProto(t *model.TableInfo, idx *model.IndexInfo) *tipb.IndexInfo {
pi := &tipb.IndexInfo{
Expand Down
23 changes: 17 additions & 6 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan) Executor {
table, _ := b.is.TableByID(v.Table.ID)
client := b.ctx.GetClient()
supportDesc := client.SupportRequestType(kv.ReqTypeSelect, kv.ReqSubTypeDesc)
st := &XSelectTableExec{
e := &XSelectTableExec{
tableInfo: v.Table,
ctx: b.ctx,
startTS: startTS,
Expand All @@ -569,11 +569,10 @@ func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan) Executor {
where: v.TableConditionPBExpr,
aggregate: v.Aggregated,
aggFuncs: v.AggFuncsPB,
aggFields: v.AggFields,
byItems: v.GbyItemsPB,
orderByList: v.SortItemsPB,
}
return st
return e
}

func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor {
Expand All @@ -584,7 +583,7 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor {
table, _ := b.is.TableByID(v.Table.ID)
client := b.ctx.GetClient()
supportDesc := client.SupportRequestType(kv.ReqTypeIndex, kv.ReqSubTypeDesc)
st := &XSelectIndexExec{
e := &XSelectIndexExec{
tableInfo: v.Table,
ctx: b.ctx,
supportDesc: supportDesc,
Expand All @@ -596,10 +595,22 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor {
where: v.TableConditionPBExpr,
aggregate: v.Aggregated,
aggFuncs: v.AggFuncsPB,
aggFields: v.AggFields,
byItems: v.GbyItemsPB,
}
return st
if !e.aggregate && e.singleReadMode {
// Single read index result has the schema of full index columns.
schemaColumns := make([]*expression.Column, len(e.indexPlan.Index.Columns))
for i, col := range e.indexPlan.Index.Columns {
colInfo := e.indexPlan.Table.Columns[col.Offset]
schemaColumns[i] = &expression.Column{
Index: i,
ColName: col.Name,
RetType: &colInfo.FieldType,
}
}
e.idxColsSchema = expression.NewSchema(schemaColumns...)
}
return e
}

func (b *executorBuilder) buildSort(v *plan.Sort) Executor {
Expand Down
Loading

0 comments on commit 78f7eab

Please sign in to comment.