Skip to content

Commit

Permalink
executor: support Chunk in LimitExec (pingcap#5200)
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason authored and XuHuaiyu committed Nov 28, 2017
1 parent 9ab0a2c commit 5b73623
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 27 deletions.
19 changes: 15 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,17 @@ func (b *executorBuilder) buildSelectLock(v *plan.SelectLock) Executor {
}

func (b *executorBuilder) buildLimit(v *plan.Limit) Executor {
childExec := b.build(v.Children()[0])
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
e := &LimitExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])),
Offset: v.Offset,
Count: v.Count,
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, childExec),
begin: v.Offset,
end: v.Offset + v.Count,
}
e.supportChk = true
return e
}

Expand Down Expand Up @@ -631,8 +637,13 @@ func (b *executorBuilder) buildSelection(v *plan.PhysicalSelection) Executor {
}

func (b *executorBuilder) buildProjection(v *plan.Projection) Executor {
childExec := b.build(v.Children()[0])
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
e := &ProjectionExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])),
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, childExec),
exprs: v.Exprs,
}
e.baseExecutor.supportChk = true
Expand Down
70 changes: 61 additions & 9 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,24 +344,27 @@ func (e *SelectLockExec) Next(goCtx goctx.Context) (Row, error) {
type LimitExec struct {
baseExecutor

Offset uint64
Count uint64
Idx uint64
begin uint64
end uint64
cursor uint64

// meetFirstBatch represents whether we have met the first valid Chunk from child.
meetFirstBatch bool
}

// Next implements the Executor Next interface.
func (e *LimitExec) Next(goCtx goctx.Context) (Row, error) {
for e.Idx < e.Offset {
for e.cursor < e.begin {
srcRow, err := e.children[0].Next(goCtx)
if err != nil {
return nil, errors.Trace(err)
}
if srcRow == nil {
return nil, nil
}
e.Idx++
e.cursor++
}
if e.Idx >= e.Count+e.Offset {
if e.cursor >= e.end {
return nil, nil
}
srcRow, err := e.children[0].Next(goCtx)
Expand All @@ -371,14 +374,63 @@ func (e *LimitExec) Next(goCtx goctx.Context) (Row, error) {
if srcRow == nil {
return nil, nil
}
e.Idx++
e.cursor++
return srcRow, nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *LimitExec) NextChunk(chk *chunk.Chunk) error {
chk.Reset()
if e.cursor >= e.end {
return nil
}
for !e.meetFirstBatch {
err := e.children[0].NextChunk(e.childrenResults[0])
if err != nil {
return errors.Trace(err)
}
batchSize := uint64(e.childrenResults[0].NumRows())
// no more data.
if batchSize == 0 {
return nil
}
if newCursor := e.cursor + batchSize; newCursor >= e.begin {
e.meetFirstBatch = true
begin, end := e.begin-e.cursor, batchSize
if newCursor > e.end {
end = e.end - e.cursor
}
chk.Append(e.childrenResults[0], int(begin), int(end))
e.cursor += end
return nil
}
e.cursor += batchSize
}
err := e.children[0].NextChunk(chk)
if err != nil {
return errors.Trace(err)
}
batchSize := uint64(chk.NumRows())
// no more data.
if batchSize == 0 {
return nil
}
if e.cursor+batchSize > e.end {
chk.TruncateTo(int(e.end - e.cursor))
batchSize = e.end - e.cursor
}
e.cursor += batchSize
return nil
}

// Open implements the Executor Open interface.
func (e *LimitExec) Open(goCtx goctx.Context) error {
e.Idx = 0
return errors.Trace(e.children[0].Open(goCtx))
if err := e.baseExecutor.Open(goCtx); err != nil {
return errors.Trace(err)
}
e.cursor = 0
e.meetFirstBatch = e.begin == 0
return nil
}

func init() {
Expand Down
74 changes: 61 additions & 13 deletions util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,26 @@ func NewChunk(fields []*types.FieldType) *Chunk {
// addFixedLenColumn adds a fixed length column with elemLen and initial data capacity.
func (c *Chunk) addFixedLenColumn(elemLen, initCap int) {
c.columns = append(c.columns, &column{
elemBuf: make([]byte, elemLen),
data: make([]byte, 0, initCap),
elemBuf: make([]byte, elemLen),
data: make([]byte, 0, initCap*elemLen),
nullBitmap: make([]byte, 0, initCap>>3),
})
}

// addVarLenColumn adds a variable length column with initial data capacity.
func (c *Chunk) addVarLenColumn(initCap int) {
c.columns = append(c.columns, &column{
offsets: []int32{0},
data: make([]byte, 0, initCap),
offsets: make([]int32, 1, initCap+1),
data: make([]byte, 0, initCap*4),
nullBitmap: make([]byte, 0, initCap>>3),
})
}

// addInterfaceColumn adds an interface column which holds element as interface.
func (c *Chunk) addInterfaceColumn() {
func (c *Chunk) addInterfaceColumn(initCap int) {
c.columns = append(c.columns, &column{
ifaces: []interface{}{},
ifaces: make([]interface{}, 0, initCap),
nullBitmap: make([]byte, 0, initCap>>3),
})
}

Expand All @@ -83,7 +86,7 @@ func (c *Chunk) addColumnByFieldType(fieldTp *types.FieldType, initCap int) {
case mysql.TypeNewDecimal:
c.addFixedLenColumn(types.MyDecimalStructSize, initCap)
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp, mysql.TypeJSON:
c.addInterfaceColumn()
c.addInterfaceColumn(initCap)
default:
c.addVarLenColumn(initCap)
}
Expand Down Expand Up @@ -129,7 +132,7 @@ func (c *Chunk) End() Row {
func (c *Chunk) AppendRow(colIdx int, row Row) {
for i, rowCol := range row.c.columns {
chkCol := c.columns[colIdx+i]
chkCol.setNullBitmap(!rowCol.isNull(row.idx))
chkCol.appendNullBitmap(!rowCol.isNull(row.idx))
if rowCol.isFixed() {
elemLen := len(rowCol.elemBuf)
offset := row.idx * elemLen
Expand All @@ -145,6 +148,51 @@ func (c *Chunk) AppendRow(colIdx int, row Row) {
}
}

// Append appends rows in [begin, end) in another Chunk to a Chunk.
func (c *Chunk) Append(other *Chunk, begin, end int) {
for colID, src := range other.columns {
dst := c.columns[colID]
if src.isFixed() {
elemLen := len(src.elemBuf)
dst.data = append(dst.data, src.data[begin*elemLen:end*elemLen]...)
} else if src.isVarlen() {
beginOffset, endOffset := src.offsets[begin], src.offsets[end]
dst.data = append(dst.data, src.data[beginOffset:endOffset]...)
for i := begin; i < end; i++ {
dst.offsets = append(dst.offsets, dst.offsets[len(dst.offsets)-1]+src.offsets[i+1]-src.offsets[i])
}
} else {
dst.ifaces = append(dst.ifaces, src.ifaces[begin:end]...)
}
for i := begin; i < end; i++ {
dst.appendNullBitmap(!src.isNull(i))
dst.length++
}
}
}

// TruncateTo truncates rows from tail to head in a Chunk to "numRows" rows.
func (c *Chunk) TruncateTo(numRows int) {
for _, col := range c.columns {
if col.isFixed() {
elemLen := len(col.elemBuf)
col.data = col.data[:numRows*elemLen]
} else if col.isVarlen() {
col.data = col.data[:col.offsets[numRows]]
col.offsets = col.offsets[:numRows+1]
} else {
col.ifaces = col.ifaces[:numRows]
}
for i := numRows; i < col.length; i++ {
if col.isNull(i) {
col.nullCount--
}
}
col.length = numRows
col.nullBitmap = col.nullBitmap[:(col.length>>3)+1]
}
}

// AppendNull appends a null value to the chunk.
func (c *Chunk) AppendNull(colIdx int) {
c.columns[colIdx].appendNull()
Expand Down Expand Up @@ -250,7 +298,7 @@ func (c *column) isNull(rowIdx int) bool {
return nullByte&(1<<(uint(rowIdx)&7)) == 0
}

func (c *column) setNullBitmap(on bool) {
func (c *column) appendNullBitmap(on bool) {
idx := c.length >> 3
if idx >= len(c.nullBitmap) {
c.nullBitmap = append(c.nullBitmap, 0)
Expand All @@ -264,7 +312,7 @@ func (c *column) setNullBitmap(on bool) {
}

func (c *column) appendNull() {
c.setNullBitmap(false)
c.appendNullBitmap(false)
if c.isFixed() {
c.data = append(c.data, c.elemBuf...)
} else if c.isVarlen() {
Expand All @@ -277,7 +325,7 @@ func (c *column) appendNull() {

func (c *column) finishAppendFixed() {
c.data = append(c.data, c.elemBuf...)
c.setNullBitmap(true)
c.appendNullBitmap(true)
c.length++
}

Expand All @@ -302,7 +350,7 @@ func (c *column) appendFloat64(f float64) {
}

func (c *column) finishAppendVar() {
c.setNullBitmap(true)
c.appendNullBitmap(true)
c.offsets = append(c.offsets, int32(len(c.data)))
c.length++
}
Expand All @@ -319,7 +367,7 @@ func (c *column) appendBytes(b []byte) {

func (c *column) appendInterface(o interface{}) {
c.ifaces = append(c.ifaces, o)
c.setNullBitmap(true)
c.appendNullBitmap(true)
c.length++
}

Expand Down
Loading

0 comments on commit 5b73623

Please sign in to comment.