Skip to content

Commit

Permalink
添加osc支持,添加osc进度信息
Browse files Browse the repository at this point in the history
  • Loading branch information
hanchuanchuan committed Feb 12, 2019
1 parent eeaab11 commit 217f55a
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 23 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ type Osc struct {
OscCriticalConnected int `toml:"osc_critical_connected" json:"osc_critical_connected"`

// 对应参数pt-online-schema-change中的参数--chunk-time。默认值:1
OscChunkTime int `toml:"osc_chunk_time" json:"osc_chunk_time"`
OscChunkTime float32 `toml:"osc_chunk_time" json:"osc_chunk_time"`

// 对应参数pt-online-schema-change中的参数--chunk-size-limit。默认值:4
OscChunkSizeLimit int `toml:"osc_chunk_size_limit" json:"osc_chunk_size_limit"`
Expand Down
1 change: 1 addition & 0 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sql_safe_updates = 0
[osc]
osc_on = true
osc_min_table_size = 0
osc_print_none = true


[log]
Expand Down
15 changes: 15 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type Server struct {
// So we just stop the listener and store to force clients to chose other TiDB servers.
stopListenerCh chan struct{}
statusServer *http.Server

// osc进程列表
oscProcessList map[string]*util.OscProcessInfo
}

// ConnectionCount gets current connection count.
Expand Down Expand Up @@ -142,6 +145,8 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
rwlock: &sync.RWMutex{},
clients: make(map[uint32]*clientConn),
stopListenerCh: make(chan struct{}, 1),

oscProcessList: make(map[string]*util.OscProcessInfo),
}
s.loadTLSCertificates()

Expand Down Expand Up @@ -402,6 +407,16 @@ func (s *Server) kickIdleConnection() {
}
}

// AddOscProcess 添加osc进程
func (s *Server) AddOscProcess(p *util.OscProcessInfo) {
s.oscProcessList[p.Sqlsha1] = p
}

// ShowOscProcessList 返回osc进程列表
func (s *Server) ShowOscProcessList() map[string]*util.OscProcessInfo {
return s.oscProcessList
}

// Server error codes.
const (
codeUnknownFieldType = 1
Expand Down
238 changes: 216 additions & 22 deletions session/osc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,33 @@
package session

import (
"fmt"
// "strings"
"bufio"
"bytes"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"strconv"
"strings"

// "github.com/hanchuanchuan/goInception/ast"
// "github.com/hanchuanchuan/goInception/server"
"github.com/hanchuanchuan/goInception/util"
"github.com/hanchuanchuan/goInception/util/auth"
// "github.com/hanchuanchuan/goInception/mysql"
// "github.com/pingcap/errors"
// log "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)

type ChanOscData struct {
out string
p *util.OscProcessInfo
}

// Copying `test`.`t1`: 99% 00:00 remain
// 匹配osc执行进度
var regOscPercent *regexp.Regexp = regexp.MustCompile(`^Copying .*? (\d+)% (\d+:\d+) remain`)

func (s *session) checkAlterUseOsc(t *TableInfo) {
if s.Osc.OscOn && (s.Osc.OscMinTableSize == 0 || t.TableSize >= s.Osc.OscMinTableSize) {
s.myRecord.useOsc = true
Expand Down Expand Up @@ -57,39 +72,218 @@ func (s *session) mysqlComputeSqlSha1(r *Record) {

func (s *session) mysqlExecuteAlterTableOsc(r *Record) {

buf := bytes.NewBufferString(fmt.Sprintf("PATH=%s:$PATH && ", s.Osc.OscBinDir))
err := os.Setenv("PATH", fmt.Sprintf("%s%s%s",
s.Osc.OscBinDir, os.PathListSeparator, os.Getenv("PATH")))
if err != nil {
log.Error(err)
return
}

buf.WriteString("pt-online-schema-change ")
buf.WriteString("--alter ")
buf.WriteString(r.Sql)
buf.WriteString(" ")
if _, err := exec.LookPath("pt-online-schema-change"); err != nil {
log.Error(err)
return
}

buf := bytes.NewBufferString("pt-online-schema-change")

buf.WriteString(" --alter \"")
// buf.WriteString(r.Sql)
buf.WriteString("modify ROW_FORMAT varchar(11)")
buf.WriteString("\" ")
if s.Osc.OscPrintSql {
buf.WriteString("--print ")
buf.WriteString(" --print ")
}
buf.WriteString("--charset=utf8 ")
buf.WriteString("--chunk-time ")
buf.WriteString(strconv.Itoa(s.Osc.OscChunkTime))
buf.WriteString(" --charset=utf8 ")
buf.WriteString(" --chunk-time ")
buf.WriteString(fmt.Sprintf("%g ", s.Osc.OscChunkTime))

buf.WriteString("--critical-load ")
buf.WriteString(" --critical-load ")
buf.WriteString("Threads_connected:")
buf.WriteString(strconv.Itoa(s.Osc.OscCriticalConnected))
buf.WriteString("Threads_running:")
buf.WriteString(",Threads_running:")
buf.WriteString(strconv.Itoa(s.Osc.OscCriticalRunning))
buf.WriteString(" ")

buf.WriteString("--max-load ")
buf.WriteString(" --max-load ")
buf.WriteString("Threads_connected:")
buf.WriteString(strconv.Itoa(s.Osc.OscMaxConnected))
buf.WriteString("Threads_running:")
buf.WriteString(",Threads_running:")
buf.WriteString(strconv.Itoa(s.Osc.OscMaxRunning))
buf.WriteString(" ")

buf.WriteString(" --recurse=1 ")

buf.WriteString(" --check-interval ")
buf.WriteString(fmt.Sprintf("%d ", s.Osc.OscCheckInterval))

buf.WriteString("--recurse=1")
if !s.Osc.OscDropNewTable {
buf.WriteString(" --no-drop-new-table ")
}

if !s.Osc.OscDropOldTable {
buf.WriteString(" --no-drop-old-table ")
}

if !s.Osc.OscCheckReplicationFilters {
buf.WriteString(" --no-check-replication-filters ")
}

buf.WriteString("--check-interval")
if !s.Osc.OscCheckAlter {
buf.WriteString(" --no-check-alter ")
}

buf.WriteString(" --alter-foreign-keys-method=")
buf.WriteString(s.Osc.OscAlterForeignKeysMethod)

if s.Osc.OscAlterForeignKeysMethod == "none" {
buf.WriteString(" --force ")
}

buf.WriteString(" --execute ")
buf.WriteString(" --statistics ")
buf.WriteString(" --max-lag=")
buf.WriteString(fmt.Sprintf("%d", s.Osc.OscMaxLag))

buf.WriteString(" --no-version-check ")
buf.WriteString(" --recursion-method=")
buf.WriteString(s.Osc.OscRecursionMethod)

buf.WriteString(" --progress ")
buf.WriteString("percentage,1 ")

buf.WriteString(" --user=")
buf.WriteString(s.opt.user)
buf.WriteString(" --password=")
buf.WriteString(s.opt.password)
buf.WriteString(" --host=")
buf.WriteString(s.opt.host)
buf.WriteString(" --port=")
buf.WriteString(strconv.Itoa(s.opt.port))
buf.WriteString(strconv.Itoa(r.SeqNo))
buf.WriteString(r.Sql)

r.Sqlsha1 = auth.EncodePassword(buf.String())
buf.WriteString(" D=")
buf.WriteString(r.TableInfo.Schema)
buf.WriteString(",t=")
buf.WriteString(r.TableInfo.Name)

str := buf.String()
// log.Info(str)

s.execCommand(r, "bash", []string{"-c", str})
}

func (s *session) execCommand(r *Record, commandName string, params []string) bool {
//函数返回一个*Cmd,用于使用给出的参数执行name指定的程序
cmd := exec.Command(commandName, params...)

//显示运行的命令
// log.Info(cmd.Args)
//StdoutPipe方法返回一个在命令Start后与命令标准输出关联的管道。Wait方法获知命令结束后会关闭这个管道,一般不需要显式的关闭该管道。
stdout, err := cmd.StdoutPipe()
if err != nil {
s.AppendErrorMessage(err.Error())
return false
}
stderr, err := cmd.StderrPipe()
if err != nil {
s.AppendErrorMessage(err.Error())
return false
}

// 保证关闭输出流
defer stdout.Close()
defer stderr.Close()

// 运行命令
if err := cmd.Start(); err != nil {
s.AppendErrorMessage(err.Error())
return false
}

p := &util.OscProcessInfo{
Schema: r.TableInfo.Schema,
Table: r.TableInfo.Name,
Sqlsha1: r.Sqlsha1,
Percent: 0,
RemainTime: "",
Info: "",
}
s.sessionManager.AddOscProcess(p)

// 消息
reader := bufio.NewReader(stdout)

// 进度
reader2 := bufio.NewReader(stderr)

buf := bytes.NewBufferString("")

//实时循环读取输出流中的一行内容
go func() {
for {
line, err2 := reader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
buf.WriteString(line)
buf.WriteString("\n")
s.mysqlAnalyzeOscOutput(line, p)
}
}()

go func() {
for {
line, err2 := reader2.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
buf.WriteString(line)
buf.WriteString("\n")
s.mysqlAnalyzeOscOutput(line, p)
}
}()

//阻塞直到该命令执行完成,该命令必须是被Start方法开始执行的
err = cmd.Wait()
if err != nil {
s.AppendErrorMessage(err.Error())
}
if s.hasError() {
r.StageStatus = StatusExecFail

} else {
r.StageStatus = StatusExecOK
r.ExecComplete = true
}

if p.Percent < 100 || s.Osc.OscPrintNone {
r.Buf.WriteString(buf.String())
r.Buf.WriteString("\n")
}

return true
}

func (s *session) mysqlAnalyzeOscOutput(out string, p *util.OscProcessInfo) {
firsts := regOscPercent.FindStringSubmatch(out)

if len(firsts) < 3 {
if strings.HasPrefix(out, "Successfully altered") {

p.Percent = 100
p.RemainTime = ""
p.Info = out
}
return
}

pct, _ := strconv.Atoi(firsts[1])
remain := firsts[2]
p.Info = out

p.Percent = pct
p.RemainTime = remain

// for _, p := range s.sessionManager.ShowOscProcessList() {
// log.Infof("%v", p)
// }
}
3 changes: 3 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ type session struct {
// 总的操作行数,当备份时用以计算备份进度
TotalChangeRows int
BackupTotalRows int

// // osc进程解析通道
// chanOsc chan *ChanOscData
}

// DDLOwnerChecker returns s.ddlOwnerChecker.
Expand Down
5 changes: 5 additions & 0 deletions session/session_inception.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ func init() {

func (s *session) ExecuteInc(ctx context.Context, sql string) (recordSets []ast.RecordSet, err error) {

// log.Infof("%#v", ctx)
// log.Infof("%#v", s.sessionManager)

s.DBName = ""
s.haveBegin = false
s.haveCommit = false
Expand Down Expand Up @@ -991,6 +994,8 @@ func (s *session) executeRemoteStatement(record *Record) {

if record.useOsc {
s.mysqlExecuteAlterTableOsc(record)
record.ExecTimestamp = time.Now().Unix()
record.ThreadId = s.fetchThreadID()
record.ExecTime = fmt.Sprintf("%.3f", time.Since(start).Seconds())
} else {
res := s.db.Exec(sql)
Expand Down
15 changes: 15 additions & 0 deletions util/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,19 @@ type SessionManager interface {
// ShowProcessList returns map[connectionID]ProcessInfo
ShowProcessList() map[uint64]ProcessInfo
Kill(connectionID uint64, query bool)

// 添加osc进程
AddOscProcess(p *OscProcessInfo)
// 返回osc进程列表
ShowOscProcessList() map[string]*OscProcessInfo
}

// OscProcessInfo is a struct used for show osc processlist statement.
type OscProcessInfo struct {
Schema string
Table string
Sqlsha1 string
Percent int
RemainTime string
Info string
}

0 comments on commit 217f55a

Please sign in to comment.