Skip to content

Commit

Permalink
使用全局状态保存 trace 对应的信息
Browse files Browse the repository at this point in the history
  • Loading branch information
lvan100 committed Aug 27, 2021
1 parent c006ab7 commit 5b7b959
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
43 changes: 35 additions & 8 deletions replayer-agent/logic/outbound/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"net"
"regexp"
"strconv"
"sync"
"time"

"github.com/didi/sharingan/replayer-agent/common/handlers/conf"
"github.com/didi/sharingan/replayer-agent/common/handlers/tlog"
"github.com/didi/sharingan/replayer-agent/logic/outbound/match"
"github.com/didi/sharingan/replayer-agent/model/pool"
"github.com/didi/sharingan/replayer-agent/model/recording"
"github.com/didi/sharingan/replayer-agent/model/replaying"
"github.com/didi/sharingan/replayer-agent/utils/helper"
"go.uber.org/zap/zapcore"
Expand All @@ -29,9 +29,36 @@ var (
errMissMatchTalk = errors.New("MISS match")
)

var traceMap sync.Map

type SessionInfo struct {
sync.Mutex
LastMatchedIndex int
}

// ClearSession 回放结束后掉一下这个方法。
func ClearSession(traceID string) {
traceMap.Delete(traceID)
}

func getLastMatchedIndex(traceID string) int {
actual, _ := traceMap.LoadOrStore(traceID, &SessionInfo{LastMatchedIndex: -1})
session := actual.(*SessionInfo)
session.Lock()
defer session.Unlock()
return session.LastMatchedIndex
}

func setLastMatchedIndex(traceID string, matchedIndex int) {
actual, _ := traceMap.Load(traceID)
session := actual.(*SessionInfo)
session.Lock()
defer session.Unlock()
session.LastMatchedIndex = matchedIndex
}

// ConnState 连接管理
type ConnState struct {
LastMatchedIndex int

// 原始连接信息
conn *net.TCPConn
Expand Down Expand Up @@ -168,7 +195,7 @@ func (cs *ConnState) rmTrafixPrefix(ctx context.Context, request []byte) []byte
func (cs *ConnState) match(ctx context.Context, request []byte) error {
quotedRequest := strconv.Quote(helper.BytesToString(request))

cs.Handler = loadHandler(ctx, string(cs.traceID))
cs.Handler = loadHandler(ctx, cs.traceID)
if cs.Handler == nil {
tlog.Handler.Warnf(ctx, tlog.DebugTag, "errmsg=find Handler failed||request=%s||traceID=%s", quotedRequest, string(cs.traceID))
return nil
Expand All @@ -194,12 +221,11 @@ func (cs *ConnState) match(ctx context.Context, request []byte) error {
return err
}

var matchedTalk *recording.CallOutbound
var mark float64
cs.LastMatchedIndex, mark, matchedTalk = cs.Handler.Matcher.MatchOutboundTalk(ctx, cs.Handler.ReplayingSession, cs.LastMatchedIndex, request)
lastMatchedIndex := getLastMatchedIndex(cs.traceID)
matchedIndex, mark, matchedTalk := cs.Handler.Matcher.MatchOutboundTalk(ctx, cs.Handler.ReplayingSession, lastMatchedIndex, request)
if callOutbound.MatchedActionIndex != fakeIndexSimulated {
if matchedTalk == nil && cs.LastMatchedIndex != 0 {
cs.LastMatchedIndex, mark, matchedTalk = cs.Handler.Matcher.MatchOutboundTalk(ctx, cs.Handler.ReplayingSession, -1, request)
if matchedTalk == nil && matchedIndex != 0 {
matchedIndex, mark, matchedTalk = cs.Handler.Matcher.MatchOutboundTalk(ctx, cs.Handler.ReplayingSession, -1, request)
}
if matchedTalk == nil {
callOutbound.MatchedRequest = nil
Expand Down Expand Up @@ -254,6 +280,7 @@ func (cs *ConnState) match(ctx context.Context, request []byte) error {
callOutbound.MatchedActionIndex,
strconv.Quote(helper.BytesToString(callOutbound.MatchedResponse)))

setLastMatchedIndex(cs.traceID, matchedIndex)
return nil
}

Expand Down
7 changes: 3 additions & 4 deletions replayer-agent/logic/outbound/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ func handleOutbound(serverAddr *net.TCPAddr, conn *net.TCPConn) {
tlog.Handler.Debugf(ctx, tlog.DebugTag, "new outbound||addr=%s||begin", tcpAddr.String())

cs := &ConnState{
LastMatchedIndex: -1,
conn: conn,
tcpAddr: tcpAddr,
proxyer: NewProxyer(conn),
conn: conn,
tcpAddr: tcpAddr,
proxyer: NewProxyer(conn),
}

for i := 0; ; i++ {
Expand Down

0 comments on commit 5b7b959

Please sign in to comment.