Skip to content

Commit

Permalink
Increase the speed of convergence (matrixorigin#1564)
Browse files Browse the repository at this point in the history
  • Loading branch information
nnsgmsone authored Dec 31, 2021
1 parent 671b938 commit 52e5b13
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"net"
"runtime"
"strings"
Expand Down Expand Up @@ -659,6 +660,9 @@ func (s *Scope) RunAQ(e engine.Engine) error {
ss[i].Proc.Id = s.Proc.Id
ss[i].Proc.Lim = s.Proc.Lim
}
for len(ss) > 3 {
ss = newMergeScope(ss, arg.Typ, s.Proc)
}
ctx, cancel := context.WithCancel(context.Background())
s.Magic = Merge
s.PreScopes = ss
Expand Down Expand Up @@ -838,6 +842,9 @@ func (s *Scope) RunCAQ(e engine.Engine) error {
},
}}
}
for len(ss) > 3 {
ss = newMergeScope(ss, arg.Arg.Typ, s.Proc)
}
rs := &Scope{
PreScopes: ss,
Magic: Merge,
Expand Down Expand Up @@ -875,3 +882,54 @@ func (s *Scope) RunCAQ(e engine.Engine) error {
}
return rs.MergeRun(e)
}

func newMergeScope(ss []*Scope, typ int, proc *process.Process) []*Scope {
step := int(math.Log2(float64(len(ss))))
n := len(ss) / step
rs := make([]*Scope, n)
for i := 0; i < n; i++ {
if i == n-1 {
rs[i] = &Scope{
PreScopes: ss[i*step:],
Magic: Merge,
}
} else {
rs[i] = &Scope{
PreScopes: ss[i*step : (i+1)*step],
Magic: Merge,
}
}
rs[i].Instructions = append(rs[i].Instructions, vm.Instruction{
Op: vm.Plus,
Arg: &plus.Argument{Typ: typ},
})
{
m := len(rs[i].PreScopes)
ctx, cancel := context.WithCancel(context.Background())
rs[i].Proc = process.New(mheap.New(guest.New(proc.Mp.Gm.Limit, proc.Mp.Gm.Mmu)))
rs[i].Proc.Cancel = cancel
rs[i].Proc.Cancel = cancel
rs[i].Proc.Id = proc.Id
rs[i].Proc.Lim = proc.Lim
rs[i].Proc.Reg.MergeReceivers = make([]*process.WaitRegister, m)
{
for j := 0; j < m; j++ {
rs[i].Proc.Reg.MergeReceivers[j] = &process.WaitRegister{
Ctx: ctx,
Ch: make(chan *batch.Batch, 1),
}
}
}
for j := 0; j < m; j++ {
ss[i*step+j].Instructions = append(ss[i*step+j].Instructions, vm.Instruction{
Op: vm.Connector,
Arg: &connector.Argument{
Mmu: rs[i].Proc.Mp.Gm,
Reg: rs[i].Proc.Reg.MergeReceivers[j],
},
})
}
}
}
return rs
}

0 comments on commit 52e5b13

Please sign in to comment.