Skip to content

Commit

Permalink
update for inner
Browse files Browse the repository at this point in the history
  • Loading branch information
yangjing committed Jun 11, 2020
1 parent 4f39493 commit 1594a02
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 195 deletions.
3 changes: 3 additions & 0 deletions replayer-agent/common/handlers/conf/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
)

var Handler *viper.Viper
var HandlerInfo *viper.Viper
var newConfs map[string]*viper.Viper

// Root current dir
Expand All @@ -26,6 +27,7 @@ func Init(confPath string) {
}

Handler = LoadConfig(confPath)
HandlerInfo = LoadConfig(Root + "/conf/info.toml")
}

func LoadConfig(confPath string) *viper.Viper {
Expand All @@ -48,6 +50,7 @@ func LoadConfig(confPath string) *viper.Viper {
// FreshHandler 从磁盘读取配置文件,方便本地修改配置的测试,无需重启服务
func FreshHandler() {
Handler.ReadInConfig()
HandlerInfo.ReadInConfig()
}

// NewConf 实例化读取配置文件
Expand Down
40 changes: 14 additions & 26 deletions replayer-agent/common/handlers/ignore/ignore.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package ignore

import (
"github.com/didi/sharingan/replayer-agent/common/handlers/conf"
)

const (
NoiseMatch = iota
NoisePrefix
Expand All @@ -13,42 +17,26 @@ type NoiseMeta struct {
}

// 常规请求级别噪音
var Noise map[string]bool
var Noise = map[string]bool{}

// 定制请求级别噪音
var SeniorNoise map[string]NoiseMeta
var SeniorNoise = map[string]NoiseMeta{}

// 接口级别噪音
var OutboundNoise map[string]NoiseMeta
var OutboundNoise = map[string]NoiseMeta{}

// not matched接口级噪音,目前仅用于go模块
var NotMatchedNoise map[string]NoiseMeta
var NotMatchedNoise = map[string]NoiseMeta{}

func Init() {
Noise = map[string]bool{
// global field
"createTime": true,
"req_flag": true,
"request_id": true,
"sign": true,
"time_stamp": true,
"timeOffset": true,
"timeStamp": true,
"timestamp": true,
"token": true,

// http method field

// thrift method field
"mget.field_2.field_3": true,
"mset.field_2.field_3": true,

// controller field
noises := conf.HandlerInfo.GetStringSlice("ignore.noise")
for _, noise := range noises {
Noise[noise] = true
}

SeniorNoise = map[string]NoiseMeta{
".cyborg_sub_id": NoiseMeta{NoiseSuffix, ""},
".sample.code": NoiseMeta{NoiseSuffix, ""},
seniorNoises := conf.HandlerInfo.GetStringSlice("ignore.seniorNoise")
for _, seniorNoise := range seniorNoises {
SeniorNoise[seniorNoise] = NoiseMeta{NoiseSuffix, ""}
}

OutboundNoise = map[string]NoiseMeta{
Expand Down
33 changes: 33 additions & 0 deletions replayer-agent/conf/info.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[http]
header = [
"Content-Type",
"Origin",
"Access-Control-Allow-Origin",
"Access-Control-Allow-Credentials",
"Access-Control-Allow-Methods",
"Access-Control-Allow-Headers",
]

[ignore]
noise = [
# global field
"createTime",
"req_flag",
"request_id",
"sign",
"time_stamp",
"timeOffset",
"timeStamp",
"timestamp",
"token",
# http method field
#thrift method field
"mget.field_2.field_3",
"mset.field_2.field_3",
#controller field
]

seniorNoise = [
".cyborg_sub_id",
".sample.code",
]
172 changes: 112 additions & 60 deletions replayer-agent/controller/sharingan.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/didi/sharingan/replayer-agent/logic/worker"
"github.com/didi/sharingan/replayer-agent/model/esmodel"
"github.com/didi/sharingan/replayer-agent/model/nuwaplt"
"github.com/didi/sharingan/replayer-agent/model/replaying"
"github.com/didi/sharingan/replayer-agent/utils/helper"

jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -57,25 +58,32 @@ func (srg ShaRinGan) Index(w http.ResponseWriter, r *http.Request) {
* 流量搜索
*/
func (srg ShaRinGan) Search(w http.ResponseWriter, r *http.Request) {
stat, resp, ctx, req := SearchPreHandle(w, r)
if stat != 0 {
srg.EchoJSON(w, r, resp)
return
}

data := search.Search(ctx, &req)
srg.EchoJSON(w, r, idl.SearchResp{Results: data, Parallel: global.FlagHandler.Parallel})
}

func SearchPreHandle(w http.ResponseWriter, r *http.Request) (int, idl.SearchResp, context.Context, idl.SearchReq) {
conf.FreshHandler()
var req idl.SearchReq
if err := bind.Bind(r, &req); err != nil {
srg.EchoJSON(w, r, idl.SearchResp{Errmsg: "parse params failed"})
return
return 1, idl.SearchResp{Errmsg: "parse params failed"}, nil, req
}

ctx := r.Context()

parallel := global.FlagHandler.Parallel
_, ok := nuwaplt.Module2Host[req.Project]
if ok {
// get department info
depart := nuwaplt.GetValueByKey(req.Project, nuwaplt.KDepartment, nuwaplt.DefaultDepartment)
ctx = context.WithValue(ctx, nuwaplt.KDepartment, depart)
}

data := search.Search(ctx, &req)
srg.EchoJSON(w, r, idl.SearchResp{Results: data, Parallel: parallel})
return 0, idl.SearchResp{}, ctx, req
}

/**
Expand All @@ -97,44 +105,68 @@ func (srg ShaRinGan) Replay(w http.ResponseWriter, r *http.Request, ps httproute
* @Return ajax返回
*/
func (srg ShaRinGan) Replayed(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
// before fetch session
stat, sid, ctx, req, resp := ReplayedBeforeFetchSession(r, ps)
if stat != 0 {
srg.EchoJSON(w, r, resp)
return
}

// fetch sessions
stat, resp, session, replayer := ReplayedFetchSession(ctx, sid, req)
if stat != 0 {
srg.EchoJSON(w, r, resp)
return
}

// after fetch session
stat, resp = ReplayedAfterFetchSession(ctx, sid, session, req, replayer)

srg.EchoJSON(w, r, resp)
}

func ReplayedBeforeFetchSession(r *http.Request, ps httprouter.Params) (int, string, context.Context, idl.ReplayedReq, idl.ReplayedResp) {
conf.FreshHandler()

var req idl.ReplayedReq
if err := bind.Bind(r, &req); err != nil {
srg.EchoJSON(w, r, idl.ReplayedResp{Success: false, Errmsg: "parse params failed"})
return
return 1, "", nil, req, idl.ReplayedResp{Success: false, Errmsg: "parse params failed"}
}

// set department information for choosing the corresponding es
sid := ps.ByName("sessionId")
depart := nuwaplt.GetValueWithProject(req.Project, nuwaplt.KDepartment, nuwaplt.DefaultDepartment)
ctx := context.WithValue(r.Context(), nuwaplt.KDepartment, depart)

// fetch sessions
sessions := worker.FetchSessions(ctx, sid, req.Project)
if len(sessions) <= 0 {
srg.EchoJSON(w, r, idl.ReplayedResp{Success: false, Errmsg: "search session failed"})
return
}
session := sessions[0]

// listenAddr
listenAddr := nuwaplt.GetValueByKey(req.Project, nuwaplt.KListenAddr, global.ListenAddr)
if listenAddr == "" {
srg.EchoJSON(w, r, idl.ReplayedResp{Success: false, Errmsg: "get listen addr of SUT failed"})
return
return 1, sid, ctx, req, idl.ReplayedResp{Success: false, Errmsg: "get listen addr of SUT failed"}
}

// to replay
return 0, sid, ctx, req, idl.ReplayedResp{}
}

func ReplayedFetchSession(ctx context.Context, sid string, req idl.ReplayedReq) (int, idl.ReplayedResp, *replaying.Session, *worker.Replayer) {
// fetch sessions
sessions := worker.FetchSessions(ctx, sid, req.Project)
if len(sessions) <= 0 {
return 1, idl.ReplayedResp{Success: false, Errmsg: "search session failed"}, nil, nil
}
replayer := &worker.Replayer{BasePort: outbound.BasePort}

return 0, idl.ReplayedResp{}, sessions[0], replayer
}

func ReplayedAfterFetchSession(ctx context.Context, sid string, session *replaying.Session, req idl.ReplayedReq, replayer *worker.Replayer) (int, idl.ReplayedResp) {
replayer.Protocol = nuwaplt.GetValueByKey(req.Project, nuwaplt.KProtocol, nuwaplt.PHttp)
replayer.Language = nuwaplt.GetValueByKey(req.Project, nuwaplt.KLanguage, nuwaplt.LGO)
replayer.ReplayAddr = listenAddr
replayer.ReplayAddr = nuwaplt.GetValueByKey(req.Project, nuwaplt.KListenAddr, global.ListenAddr)

// begin replay【ShaRinGan -> sut -> mock server】
// to replay
err := replayer.ReplaySession(ctx, session, req.Project)
if err != nil {
srg.EchoJSON(w, r, idl.ReplayedResp{Success: false, Errmsg: err.Error()})
return
return 1, idl.ReplayedResp{Success: false, Errmsg: err.Error()}
}

diffs := replayed.DiffReplayed(ctx, replayer.ReplayedSession, req.Project)
Expand All @@ -150,7 +182,8 @@ func (srg ShaRinGan) Replayed(w http.ResponseWriter, r *http.Request, ps httprou
// return all the diffs
resp.Diffs = diffs
}
srg.EchoJSON(w, r, resp)

return 0, resp
}

/**
Expand Down Expand Up @@ -199,46 +232,19 @@ func (srg ShaRinGan) DelNoise(w http.ResponseWriter, r *http.Request) {
* 查看session详情
*/
func (srg ShaRinGan) Session(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
conf.FreshHandler()
r.ParseForm()
project := r.Form.Get("project")
depart := nuwaplt.GetValueWithProject(project, nuwaplt.KDepartment, nuwaplt.DefaultDepartment)
ctx := context.WithValue(r.Context(), nuwaplt.KDepartment, depart)
sid := ps.ByName("sessionId")
ctx, sid, project := SessionPreHandle(w, r, ps)

var bodyByte []byte
var err error
var stat int
//优先读取es地址
if conf.Handler.GetString("es_url.default") != "" {
cond := &idl.SearchReq{SessionId: sid, Size: 1}
bodyByte, err = search.Query(ctx, cond, 0)
if err != nil {
srg.Echo(w, r, bodyByte)
return
}
stat, bodyByte = SessionFromES(ctx, sid)
} else {
//读取配置文件conf/traffic/{project}
contents, err := helper.ReadLines(conf.Root + "/conf/traffic/" + project)
if err != nil {
tlog.Handler.Errorf(ctx, tlog.DLTagUndefined, "Failed to read /conf/traffic/"+project+", err="+err.Error())
srg.Echo(w, r, bodyByte)
return
}
var json = jsoniter.ConfigCompatibleWithStandardLibrary

//原始流量格式
for _, flow := range contents {
traffic := &esmodel.SessionId{}
err := json.Unmarshal([]byte(flow), traffic)
if err != nil {
tlog.Handler.Warnf(r.Context(), tlog.DLTagUndefined, "errmsg= Failed at UmMarshal origin traffic||err=%s", err.Error())
continue
}
if traffic.Id == sid {
bodyByte = []byte(flow)
break
}
}
stat, bodyByte = SessionFromLocal(ctx, sid, project, r)
}
if stat != 0 {
srg.Echo(w, r, bodyByte)
return
}

w.Header().Set("Content-Type", "application/json; charset=utf-8")
Expand All @@ -247,7 +253,53 @@ func (srg ShaRinGan) Session(w http.ResponseWriter, r *http.Request, ps httprout
} else {
srg.Echo(w, r, []byte("No Result, please change another sessionId~"))
}
}

func SessionPreHandle(w http.ResponseWriter, r *http.Request, ps httprouter.Params) (context.Context, string, string) {
conf.FreshHandler()
r.ParseForm()
project := r.Form.Get("project")
depart := nuwaplt.GetValueWithProject(project, nuwaplt.KDepartment, nuwaplt.DefaultDepartment)
ctx := context.WithValue(r.Context(), nuwaplt.KDepartment, depart)
sid := ps.ByName("sessionId")

return ctx, sid, project
}

func SessionFromES(ctx context.Context, sid string) (int, []byte) {
cond := &idl.SearchReq{SessionId: sid, Size: 1}
bodyByte, err := search.Query(ctx, cond, 0)
if err != nil {
return 1, bodyByte
}

return 0, bodyByte
}

func SessionFromLocal(ctx context.Context, sid string, project string, r *http.Request) (int, []byte) {
var bodyByte []byte
//读取配置文件conf/traffic/{project}
stat, contents := search.GetTrafficFromLocal(ctx, &idl.SearchReq{Project: project})
if stat != 0 {
return 1, bodyByte
}

var json = jsoniter.ConfigCompatibleWithStandardLibrary
//原始流量格式
for _, flow := range contents {
traffic := &esmodel.SessionId{}
err := json.Unmarshal([]byte(flow), traffic)
if err != nil {
tlog.Handler.Warnf(r.Context(), tlog.DLTagUndefined, "errmsg= Failed at UmMarshal origin traffic||err=%s", err.Error())
continue
}
if traffic.Id == sid {
bodyByte = []byte(flow)
break
}
}

return 0, bodyByte
}

// base64 decode and bianry
Expand Down Expand Up @@ -589,7 +641,7 @@ func (srg ShaRinGan) CodeCoverage(w http.ResponseWriter, r *http.Request) {
cmd.Stderr = &stderr
err = cmd.Run()
if err != nil {
tlog.Handler.Errorf(r.Context(), tlog.DLTagUndefined, "Failed to execute"+covCmd+"! err="+err.Error() + stderr.String())
tlog.Handler.Errorf(r.Context(), tlog.DLTagUndefined, "Failed to execute"+covCmd+"! err="+err.Error()+stderr.String())
resErr["errmsg"] = "Failed to execute " + covCmd + "!"
srg.EchoJSON(w, r, resErr)
return
Expand Down
Loading

0 comments on commit 1594a02

Please sign in to comment.