Skip to content

Commit

Permalink
Refactor handshake duration tracking in lb.Node
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Mar 7, 2024
1 parent 78a6aaf commit cbd636e
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 54 deletions.
30 changes: 0 additions & 30 deletions internal/cmgr/cmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmgr

import (
"context"
"net/http"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -175,32 +174,3 @@ func (cm *cmgrImpl) Start(ctx context.Context, errCH chan error) {
}
}
}

type syncReq struct {
RelayLabel string `json:"relay_label"`
Stats conn.Stats `json:"stats"`
}

func (cm *cmgrImpl) syncOnce() error {
cm.l.Infof("sync once total closed connections: %d", cm.countClosedConnection())
// todo: opt lock
cm.lock.Lock()

reqs := []syncReq{}
for label, conns := range cm.closedConnectionsMap {
for _, c := range conns {
reqs = append(reqs, syncReq{
RelayLabel: label,
Stats: *c.GetStats(),
})
}
}
cm.closedConnectionsMap = make(map[string][]conn.RelayConn)
cm.lock.Unlock()
if cm.cfg.NeedSync() {
return myhttp.PostJson(http.DefaultClient, cm.cfg.SyncURL, &reqs)
} else {
cm.l.Debugf("remove %d closed connections", len(reqs))
}
return nil
}
53 changes: 53 additions & 0 deletions internal/cmgr/metric_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package cmgr

import (
"net/http"

"github.com/Ehco1996/ehco/internal/conn"
myhttp "github.com/Ehco1996/ehco/pkg/http"
)

type StatsPerRule struct {
RelayLabel string `json:"relay_label"`

Up int64 `json:"up_bytes"`
Down int64 `json:"down_bytes"`
ConnectionCnt int `json:"connection_count"`
HandShakeLatency int64 `json:"latency_in_ms"`
}

type syncReq struct {
Stats []StatsPerRule `json:"stats"`
}

func (cm *cmgrImpl) syncOnce() error {
cm.l.Infof("sync once total closed connections: %d", cm.countClosedConnection())
// todo: opt lock
cm.lock.Lock()

req := syncReq{Stats: []StatsPerRule{}}
for label, conns := range cm.closedConnectionsMap {
s := StatsPerRule{
RelayLabel: label,
}
var totalLatency int64
for _, c := range conns {
s.ConnectionCnt++
s.Up += c.GetStats().Up
s.Down += c.GetStats().Down
totalLatency += c.GetStats().HandShakeLatency.Milliseconds()
}
if s.ConnectionCnt > 0 {
s.HandShakeLatency = totalLatency / int64(s.ConnectionCnt)
}
req.Stats = append(req.Stats, s)
}
cm.closedConnectionsMap = make(map[string][]conn.RelayConn)
cm.lock.Unlock()
if cm.cfg.NeedSync() {
return myhttp.PostJson(http.DefaultClient, cm.cfg.SyncURL, &req)
} else {
cm.l.Debugf("remove %d closed connections", len(req.Stats))
}
return nil
}
28 changes: 21 additions & 7 deletions internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ var (
)

type Stats struct {
Up int64 `json:"up"`
Down int64 `json:"down"`
Up int64
Down int64
HandShakeLatency time.Duration
}

func (s *Stats) Record(up, down int64) {
Expand Down Expand Up @@ -136,6 +137,8 @@ func copyConn(conn1, conn2 *innerConn) error {
return err2
}

type RelayConnOption func(*relayConnImpl)

type RelayConn interface {
// Transport transports data between the client and the remote server.
// The remoteLabel is the label of the remote server.
Expand All @@ -149,14 +152,18 @@ type RelayConn interface {
Close() error
}

func NewRelayConn(relayName string, clientConn, remoteConn net.Conn) RelayConn {
return &relayConnImpl{
func NewRelayConn(relayName string, clientConn, remoteConn net.Conn, opts ...RelayConnOption) RelayConn {
rci := &relayConnImpl{
RelayLabel: relayName,
Stats: &Stats{Up: 0, Down: 0},

clientConn: clientConn,
remoteConn: remoteConn,
}
for _, opt := range opts {
opt(rci)
}
s := &Stats{Up: 0, Down: 0, HandShakeLatency: rci.HandshakeDuration}
rci.Stats = s
return rci
}

type relayConnImpl struct {
Expand All @@ -166,12 +173,19 @@ type relayConnImpl struct {
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time,omitempty"`

Stats *Stats `json:"stats"`
Stats *Stats `json:"stats"`
HandshakeDuration time.Duration

clientConn net.Conn
remoteConn net.Conn
}

func WithHandshakeDuration(duration time.Duration) RelayConnOption {
return func(rci *relayConnImpl) {
rci.HandshakeDuration = duration
}
}

func (rc *relayConnImpl) Transport(remoteLabel string) error {
defer rc.Close() // nolint: errcheck
name := rc.Name()
Expand Down
9 changes: 6 additions & 3 deletions internal/transporter/mtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ func (s *MTCP) dialRemote(remote *lb.Node) (net.Conn, error) {
if err != nil {
return nil, err
}
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(time.Since(t1).Milliseconds()))
latency := time.Since(t1)
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(latency.Milliseconds()))
remote.HandShakeDuration = latency
return mtcpc, nil
}

func (s *MTCP) HandleTCPConn(c net.Conn, remote *lb.Node) error {
mtcpc, err := s.dialRemote(remote)
clonedRemote := remote.Clone()
mtcpc, err := s.dialRemote(clonedRemote)
if err != nil {
return err
}
s.l.Infof("HandleTCPConn from:%s to:%s", c.LocalAddr(), remote.Address)
relayConn := conn.NewRelayConn(s.relayLabel, c, mtcpc)
relayConn := conn.NewRelayConn(s.relayLabel, c, mtcpc, conn.WithHandshakeDuration(clonedRemote.HandShakeDuration))
s.cmgr.AddConnection(relayConn)
defer s.cmgr.RemoveConnection(relayConn)
return relayConn.Transport(remote.Label)
Expand Down
10 changes: 7 additions & 3 deletions internal/transporter/mwss.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,21 @@ func (s *Mwss) dialRemote(remote *lb.Node) (net.Conn, error) {
if err != nil {
return nil, err
}
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(time.Since(t1).Milliseconds()))

latency := time.Since(t1)
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(latency.Milliseconds()))
remote.HandShakeDuration = latency
return mwssc, nil
}

func (s *Mwss) HandleTCPConn(c net.Conn, remote *lb.Node) error {
mwsc, err := s.dialRemote(remote)
clonedRemote := remote.Clone()
mwsc, err := s.dialRemote(clonedRemote)
if err != nil {
return err
}
s.l.Infof("HandleTCPConn from:%s to:%s", c.LocalAddr(), remote.Address)
relayConn := conn.NewRelayConn(s.relayLabel, c, mwsc)
relayConn := conn.NewRelayConn(s.relayLabel, c, mwsc, conn.WithHandshakeDuration(clonedRemote.HandShakeDuration))
s.cmgr.AddConnection(relayConn)
defer s.cmgr.RemoveConnection(relayConn)
return relayConn.Transport(remote.Label)
Expand Down
9 changes: 6 additions & 3 deletions internal/transporter/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ func (raw *Raw) dialRemote(remote *lb.Node) (net.Conn, error) {
if err != nil {
return nil, err
}
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(time.Since(t1).Milliseconds()))
latency := time.Since(t1)
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(latency.Milliseconds()))
remote.HandShakeDuration = latency
return rc, nil
}

Expand All @@ -159,12 +161,13 @@ func (raw *Raw) HandleTCPConn(c net.Conn, remote *lb.Node) error {
metrics.CurConnectionCount.WithLabelValues(remote.Label, metrics.METRIC_CONN_TYPE_TCP).Inc()
defer metrics.CurConnectionCount.WithLabelValues(remote.Label, metrics.METRIC_CONN_TYPE_TCP).Dec()

rc, err := raw.dialRemote(remote)
clonedRemote := remote.Clone()
rc, err := raw.dialRemote(clonedRemote)
if err != nil {
return err
}
raw.l.Infof("HandleTCPConn from %s to %s", c.LocalAddr(), remote.Address)
relayConn := conn.NewRelayConn(raw.relayLabel, c, rc)
relayConn := conn.NewRelayConn(raw.relayLabel, c, rc, conn.WithHandshakeDuration(clonedRemote.HandShakeDuration))
raw.cmgr.AddConnection(relayConn)
defer raw.cmgr.RemoveConnection(relayConn)
return relayConn.Transport(remote.Label)
Expand Down
11 changes: 8 additions & 3 deletions internal/transporter/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@ func (s *Ws) dialRemote(remote *lb.Node) (net.Conn, error) {
if err != nil {
return nil, err
}
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(time.Since(t1).Milliseconds()))
latency := time.Since(t1)
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(latency.Milliseconds()))
remote.HandShakeDuration = latency
return wsc, nil
}

func (s *Ws) HandleTCPConn(c net.Conn, remote *lb.Node) error {
wsc, err := s.dialRemote(remote)
clonedRemote := remote.Clone()
wsc, err := s.dialRemote(clonedRemote)
if err != nil {
return err
}
s.l.Infof("HandleTCPConn from %s to %s", c.LocalAddr(), remote.Address)
relayConn := conn.NewRelayConn(s.relayLabel, c, wsc)
relayConn := conn.NewRelayConn(
s.relayLabel, c, wsc,
conn.WithHandshakeDuration(clonedRemote.HandShakeDuration))
s.cmgr.AddConnection(relayConn)
defer s.cmgr.RemoveConnection(relayConn)
return relayConn.Transport(remote.Label)
Expand Down
9 changes: 6 additions & 3 deletions internal/transporter/wss.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,21 @@ func (s *Wss) dialRemote(remote *lb.Node) (net.Conn, error) {
if err != nil {
return nil, err
}
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(time.Since(t1).Milliseconds()))
latency := time.Since(t1)
metrics.HandShakeDuration.WithLabelValues(remote.Label).Observe(float64(latency.Milliseconds()))
remote.HandShakeDuration = latency
return wssc, nil
}

func (s *Wss) HandleTCPConn(c net.Conn, remote *lb.Node) error {
wssc, err := s.dialRemote(remote)
clonedRemote := remote.Clone()
wssc, err := s.dialRemote(clonedRemote)
if err != nil {
return err
}
s.l.Infof("HandleTCPConn from %s to %s", c.RemoteAddr(), remote.Address)

relayConn := conn.NewRelayConn(s.relayLabel, c, wssc)
relayConn := conn.NewRelayConn(s.relayLabel, c, wssc, conn.WithHandshakeDuration(clonedRemote.HandShakeDuration))
s.cmgr.AddConnection(relayConn)
defer s.cmgr.RemoveConnection(relayConn)
return relayConn.Transport(remote.Label)
Expand Down
16 changes: 14 additions & 2 deletions pkg/lb/round_robin.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
package lb

import (
"time"

"go.uber.org/atomic"
)

// todo: move to internal/lb
type Node struct {
Address string
Label string
Address string
Label string
HandShakeDuration time.Duration
}

func (n *Node) Clone() *Node {
return &Node{
Address: n.Address,
Label: n.Label,
HandShakeDuration: n.HandShakeDuration,
}
}

// RoundRobin is an interface for representing round-robin balancing.
Expand Down

0 comments on commit cbd636e

Please sign in to comment.