Skip to content

Commit

Permalink
http: scatter table API for table handler (pingcap#6378)
Browse files Browse the repository at this point in the history
* http: scatter table API for table handler
  • Loading branch information
nolouch authored May 3, 2018
1 parent e215e93 commit e5ab00e
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 10 deletions.
131 changes: 121 additions & 10 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,10 @@ type valueHandler struct {
}

const (
opTableRegions = "regions"
opTableDiskUsage = "disk-usage"
opTableRegions = "regions"
opTableDiskUsage = "disk-usage"
opTableScatter = "scatter-table"
opStopTableScatter = "stop-scatter-table"
)

// mvccTxnHandler is the handler for txn debugger
Expand Down Expand Up @@ -603,6 +605,10 @@ func (h tableHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.handleRegionRequest(schema, tableVal, w, req)
case opTableDiskUsage:
h.handleDiskUsageRequest(schema, tableVal, w, req)
case opTableScatter:
h.handleScatterTableRequest(schema, tableVal, w, req)
case opStopTableScatter:
h.handleStopScatterTableRequest(schema, tableVal, w, req)
default:
writeError(w, errors.New("method not found"))
}
Expand Down Expand Up @@ -647,6 +653,115 @@ func (h ddlHistoryJobHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
return
}

func (h tableHandler) getPDAddr() ([]string, error) {
var pdAddrs []string
etcd, ok := h.store.(domain.EtcdBackend)
if !ok {
return nil, errors.New("not implemented")
}
pdAddrs = etcd.EtcdAddrs()
if len(pdAddrs) < 0 {
return nil, errors.New("pd unavailable")
}
return pdAddrs, nil
}

func (h tableHandler) addScatterSchedule(startKey, endKey []byte, name string) error {
pdAddrs, err := h.getPDAddr()
if err != nil {
return err
}
input := map[string]string{
"name": "scatter-range",
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
"range_name": name,
}
v, err := json.Marshal(input)
if err != nil {
return err
}
scheduleURL := fmt.Sprintf("http://%s/pd/api/v1/schedulers", pdAddrs[0])
resp, err := http.Post(scheduleURL, "application/json", bytes.NewBuffer(v))
if err != nil {
return err
}
if err := resp.Body.Close(); err != nil {
log.Error(err)
}
return nil
}

func (h tableHandler) deleteScatterSchedule(name string) error {
pdAddrs, err := h.getPDAddr()
if err != nil {
return err
}
scheduleURL := fmt.Sprintf("http://%s/pd/api/v1/schedulers/scatter-range-%s", pdAddrs[0], name)
req, err := http.NewRequest(http.MethodDelete, scheduleURL, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if err := resp.Body.Close(); err != nil {
log.Error(err)
}
return nil
}

func (h tableHandler) handleScatterTableRequest(schema infoschema.InfoSchema, tbl table.Table, w http.ResponseWriter, req *http.Request) {
// for record
tableID := tbl.Meta().ID
startKey, endKey := tablecodec.GetTableHandleKeyRange(tableID)
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)
tableName := tbl.Meta().Name.String()
err := h.addScatterSchedule(startKey, endKey, tableName)
if err != nil {
writeError(w, errors.Annotate(err, "scatter record error"))
return
}
// for indices
for _, index := range tbl.Indices() {
indexID := index.Meta().ID
indexName := index.Meta().Name.String()
startKey, endKey := tablecodec.GetTableIndexKeyRange(tableID, indexID)
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)
name := tableName + "-" + indexName
err := h.addScatterSchedule(startKey, endKey, name)
if err != nil {
writeError(w, errors.Annotatef(err, "scatter index(%s) error", name))
return
}
}
writeData(w, "success!")
}

func (h tableHandler) handleStopScatterTableRequest(schema infoschema.InfoSchema, tbl table.Table, w http.ResponseWriter, req *http.Request) {
// for record
tableName := tbl.Meta().Name.String()
err := h.deleteScatterSchedule(tableName)
if err != nil {
writeError(w, errors.Annotate(err, "stop scatter record error"))
return
}
// for indices
for _, index := range tbl.Indices() {
indexName := index.Meta().Name.String()
name := tableName + "-" + indexName
err := h.deleteScatterSchedule(name)
if err != nil {
writeError(w, errors.Annotatef(err, "delete scatter index(%s) error", name))
return
}
}
writeData(w, "success!")
}

func (h tableHandler) handleRegionRequest(schema infoschema.InfoSchema, tbl table.Table, w http.ResponseWriter, req *http.Request) {
tableID := tbl.Meta().ID
// for record
Expand Down Expand Up @@ -704,14 +819,10 @@ type pdRegionStats struct {

func (h tableHandler) handleDiskUsageRequest(schema infoschema.InfoSchema, tbl table.Table, w http.ResponseWriter, req *http.Request) {
tableID := tbl.Meta().ID
var pdAddrs []string
etcd, ok := h.store.(domain.EtcdBackend)
if !ok {
writeError(w, errors.New("not implemented"))
}
pdAddrs = etcd.EtcdAddrs()
if len(pdAddrs) < 0 {
writeError(w, errors.New("pd unavailable"))
pdAddrs, err := h.getPDAddr()
if err != nil {
writeError(w, err)
return
}

// Include table and index data, because their range located in tableID_i tableID_r
Expand Down
2 changes: 2 additions & 0 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func (s *Server) startHTTPServer() {
if s.cfg.Store == "tikv" {
// HTTP path for tikv
router.Handle("/tables/{db}/{table}/regions", tableHandler{tikvHandlerTool, opTableRegions})
router.Handle("/tables/{db}/{table}/scatter", tableHandler{tikvHandlerTool, opTableScatter})
router.Handle("/tables/{db}/{table}/stop-scatter", tableHandler{tikvHandlerTool, opStopTableScatter})
router.Handle("/tables/{db}/{table}/disk-usage", tableHandler{tikvHandlerTool, opTableDiskUsage})
router.Handle("/regions/meta", regionHandler{tikvHandlerTool})
router.Handle("/regions/{regionID}", regionHandler{tikvHandlerTool})
Expand Down

0 comments on commit e5ab00e

Please sign in to comment.