Skip to content

Commit

Permalink
PRT-343 reach by default to maximum number of connections available (l…
Browse files Browse the repository at this point in the history
…avanet#263)

* PRT-343 adding connection resource managment on stress and relax

* PRT-343 adding tests to connector and grpc connector.

* PRT-343 lint and log fix

* PRT-343 lint

* PRT-343 more indicative prints and improving functionality of removing a free client
  • Loading branch information
ranlavanet authored Jan 25, 2023
1 parent 560abd6 commit dc177fa
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ jobs:
### Run relayer unitests
######################################################
- name: Run Relayer unit Tests
run: go test ./relayer/lavasession/ ./protocol/chaintracker/ -v
run: go test ./relayer/lavasession/ ./protocol/chaintracker/ ./relayer/chainproxy/ -v
2 changes: 1 addition & 1 deletion cmd/lavad/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func main() {
cmdPortalServer.Flags().String(performance.PprofAddressFlagName, "", "pprof server address, used for code profiling")
cmdPortalServer.Flags().String(performance.CacheFlagName, "", "address for a cache server to improve performance")
cmdServer.Flags().String(performance.CacheFlagName, "", "address for a cache server to improve performance")
cmdServer.Flags().Uint(chainproxy.ParallelConnectionsFlag, chainproxy.DefaultNumberOfParallelConnections, "parallel connections")
cmdServer.Flags().Uint(chainproxy.ParallelConnectionsFlag, chainproxy.NumberOfParallelConnections, "parallel connections")

rootCmd.AddCommand(cmdServer)
rootCmd.AddCommand(cmdPortalServer)
Expand Down
133 changes: 110 additions & 23 deletions relayer/chainproxy/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,41 @@ import (
const (
DialTimeout = 500 * time.Millisecond
ParallelConnectionsFlag = "parallel-connections"
DefaultNumberOfParallelConnections = 10
MaximumNumberOfParallelConnectionsAttempts = 10
)

var NumberOfParallelConnections uint = 10

type Connector struct {
lock utils.LavaMutex
freeClients []*rpcclient.Client
usedClients int
addr string
}

func NewConnector(ctx context.Context, nConns uint, addr string) *Connector {
NumberOfParallelConnections = nConns // set number of parallel connections requested by user (or default.)
connector := &Connector{
freeClients: make([]*rpcclient.Client, 0, nConns),
addr: addr,
}
reachedClientLimit := false

for i := uint(0); i < nConns; i++ {
if reachedClientLimit {
break
}
var rpcClient *rpcclient.Client
var err error
numberOfConnectionAttempts := 0
for {
numberOfConnectionAttempts += 1
if numberOfConnectionAttempts > MaximumNumberOfParallelConnectionsAttempts {
utils.LavaFormatFatal("Reached maximum number of parallel connections attempts, consider decreasing number of connections",
nil,
&map[string]string{"Number of parallel connections": strconv.FormatUint(uint64(nConns), 10)},
utils.LavaFormatError("Reached maximum number of parallel connections attempts, consider decreasing number of connections",
nil, &map[string]string{"Number of parallel connections": strconv.FormatUint(uint64(nConns), 10), "Currently Connected": strconv.FormatUint(uint64(len(connector.freeClients)), 10)},
)
reachedClientLimit = true
break
}
if ctx.Err() != nil {
connector.Close()
Expand All @@ -56,12 +65,10 @@ func NewConnector(ctx context.Context, nConns uint, addr string) *Connector {
nctx, cancel := context.WithTimeout(ctx, DialTimeout)
rpcClient, err = rpcclient.DialContext(nctx, addr)
if err != nil {
utils.LavaFormatWarning("Could not connect to the node, retrying",
err,
&map[string]string{
"Current Number Of Connections": strconv.FormatUint(uint64(i), 10),
"Number Of Attempts Remaining": strconv.Itoa(numberOfConnectionAttempts),
})
utils.LavaFormatWarning("Could not connect to the node, retrying", err, &map[string]string{
"Current Number Of Connections": strconv.FormatUint(uint64(i), 10),
"Number Of Attempts Remaining": strconv.Itoa(numberOfConnectionAttempts),
})
cancel()
continue
}
Expand Down Expand Up @@ -101,27 +108,59 @@ func (connector *Connector) Close() {
}
}

func (connector *Connector) GetRpc(block bool) (*rpcclient.Client, error) {
func (connector *Connector) increaseNumberOfClients(ctx context.Context, numberOfFreeClients int) {
utils.LavaFormatDebug("increasing number of clients", &map[string]string{"numberOfFreeClients": strconv.Itoa(numberOfFreeClients)})
var rpcClient *rpcclient.Client
var err error
for connectionAttempt := 0; connectionAttempt < MaximumNumberOfParallelConnectionsAttempts; connectionAttempt++ {
nctx, cancel := context.WithTimeout(ctx, DialTimeout)
rpcClient, err = rpcclient.DialContext(nctx, connector.addr)
if err != nil {
utils.LavaFormatDebug(
"increaseNumberOfClients, Could not connect to the node, retrying",
&map[string]string{"err": err.Error(), "Number Of Attempts": strconv.Itoa(connectionAttempt)})
cancel()
continue
}
cancel()

connector.lock.Lock() // add connection to free list.
defer connector.lock.Unlock()
connector.freeClients = append(connector.freeClients, rpcClient)
return
}
utils.LavaFormatDebug("Failed increasing number of clients", nil)
}

func (connector *Connector) GetRpc(ctx context.Context, block bool) (*rpcclient.Client, error) {
connector.lock.Lock()
defer connector.lock.Unlock()
numberOfFreeClients := len(connector.freeClients)
if numberOfFreeClients <= connector.usedClients { // if we reached half of the free clients start creating new connections
go connector.increaseNumberOfClients(ctx, numberOfFreeClients) // increase asynchronously the free list.
}

if len(connector.freeClients) == 0 {
if numberOfFreeClients == 0 {
if !block {
return nil, errors.New("out of clients")
} else {
for {
connector.lock.Unlock()
// if we reached 0 connections we need to create more connections
// before sleeping, increase asynchronously the free list.
go connector.increaseNumberOfClients(ctx, numberOfFreeClients)
time.Sleep(50 * time.Millisecond)
connector.lock.Lock()
if len(connector.freeClients) != 0 {
numberOfFreeClients = len(connector.freeClients)
if numberOfFreeClients != 0 {
break
}
}
}
}

ret := connector.freeClients[len(connector.freeClients)-1]
connector.freeClients = connector.freeClients[:len(connector.freeClients)-1]
ret := connector.freeClients[0]
connector.freeClients = connector.freeClients[1:]
connector.usedClients++

return ret, nil
Expand All @@ -132,31 +171,44 @@ func (connector *Connector) ReturnRpc(rpc *rpcclient.Client) {
defer connector.lock.Unlock()

connector.usedClients--
if len(connector.freeClients) > (connector.usedClients + int(NumberOfParallelConnections) /* the number we started with */) {
rpc.Close() // close connection
return // return without appending back to decrease idle connections
}
connector.freeClients = append(connector.freeClients, rpc)
}

type GRPCConnector struct {
lock sync.RWMutex
freeClients []*grpc.ClientConn
usedClients int
addr string
}

func NewGRPCConnector(ctx context.Context, nConns uint, addr string) *GRPCConnector {
connector := &GRPCConnector{
freeClients: make([]*grpc.ClientConn, 0, nConns),
addr: addr,
}

NumberOfParallelConnections = nConns // set number of parallel connections requested by user (or default.)
reachedClientLimit := false

for i := uint(0); i < nConns; i++ {
if reachedClientLimit {
break
}
var grpcClient *grpc.ClientConn
var err error
numberOfConnectionAttempts := 0
for {
numberOfConnectionAttempts += 1
if numberOfConnectionAttempts > MaximumNumberOfParallelConnectionsAttempts {
utils.LavaFormatFatal("Reached maximum number of parallel connections attempts, consider decreasing number of connections",
nil,
&map[string]string{"Number of parallel connections": strconv.FormatUint(uint64(nConns), 10)},
utils.LavaFormatError("Reached maximum number of parallel connections attempts, consider decreasing number of connections",
nil, &map[string]string{"Number of parallel connections": strconv.FormatUint(uint64(nConns), 10), "Currently Connected": strconv.FormatUint(uint64(len(connector.freeClients)), 10)},
)
reachedClientLimit = true
break
}
if ctx.Err() != nil {
connector.Close()
Expand All @@ -178,27 +230,58 @@ func NewGRPCConnector(ctx context.Context, nConns uint, addr string) *GRPCConnec
return connector
}

func (connector *GRPCConnector) GetRpc(block bool) (*grpc.ClientConn, error) {
func (connector *GRPCConnector) increaseNumberOfClients(ctx context.Context, numberOfFreeClients int) {
utils.LavaFormatDebug("increasing number of clients", &map[string]string{"numberOfFreeClients": strconv.Itoa(numberOfFreeClients)})
var grpcClient *grpc.ClientConn
var err error
for connectionAttempt := 0; connectionAttempt < MaximumNumberOfParallelConnectionsAttempts; connectionAttempt++ {
nctx, cancel := context.WithTimeout(ctx, DialTimeout)
grpcClient, err = grpc.DialContext(nctx, connector.addr, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
utils.LavaFormatDebug("increaseNumberOfClients, Could not connect to the node, retrying", &map[string]string{"err": err.Error(), "Number Of Attempts": strconv.Itoa(connectionAttempt)})
cancel()
continue
}
cancel()

connector.lock.Lock() // add connection to free list.
defer connector.lock.Unlock()
connector.freeClients = append(connector.freeClients, grpcClient)
return
}
utils.LavaFormatDebug("increasing number of clients failed", nil)
}

func (connector *GRPCConnector) GetRpc(ctx context.Context, block bool) (*grpc.ClientConn, error) {
connector.lock.Lock()
defer connector.lock.Unlock()

if len(connector.freeClients) == 0 {
numberOfFreeClients := len(connector.freeClients)
if numberOfFreeClients <= connector.usedClients { // if we reached half of the free clients start creating new connections
go connector.increaseNumberOfClients(ctx, numberOfFreeClients) // increase asynchronously the free list.
}

if numberOfFreeClients == 0 {
if !block {
return nil, errors.New("out of clients")
} else {
for {
connector.lock.Unlock()
// if we reached 0 connections we need to create more connections
// before sleeping, increase asynchronously the free list.
go connector.increaseNumberOfClients(ctx, numberOfFreeClients)
time.Sleep(50 * time.Millisecond)
connector.lock.Lock()
if len(connector.freeClients) != 0 {
numberOfFreeClients = len(connector.freeClients)
if numberOfFreeClients != 0 {
break
}
}
}
}

ret := connector.freeClients[len(connector.freeClients)-1]
connector.freeClients = connector.freeClients[:len(connector.freeClients)-1]
ret := connector.freeClients[0]
connector.freeClients = connector.freeClients[1:]
connector.usedClients++

return ret, nil
Expand All @@ -209,6 +292,10 @@ func (connector *GRPCConnector) ReturnRpc(rpc *grpc.ClientConn) {
defer connector.lock.Unlock()

connector.usedClients--
if len(connector.freeClients) > (connector.usedClients + int(NumberOfParallelConnections) /* the number we started with */) {
rpc.Close() // close connection
return // return without appending back to decrease idle connections
}
connector.freeClients = append(connector.freeClients, rpc)
}

Expand Down
101 changes: 101 additions & 0 deletions relayer/chainproxy/connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package chainproxy

import (
"context"
"log"
"net"
"net/http"
"net/rpc"
"testing"
"time"

"github.com/lavanet/lava/relayer/chainproxy/rpcclient"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

const (
listenerAddress = "localhost:1234"
listenerAddressTcp = "http://localhost:1234"
numberOfClients = 5
)

type Args struct{}

type TimeServer int64

func (t *TimeServer) GiveServerTime(args *Args, reply *int64) error {
// Set the value at the pointer got from the client
*reply = time.Now().Unix()
return nil
}

func createGRPCServer(t *testing.T) *grpc.Server {
lis, err := net.Listen("tcp", listenerAddress)
require.Nil(t, err)
s := grpc.NewServer()
go s.Serve(lis) // serve in a different thread
return s
}

func createRPCServer(t *testing.T) net.Listener {
timeserver := new(TimeServer)
// Register the timeserver object upon which the GiveServerTime
// function will be called from the RPC server (from the client)
rpc.Register(timeserver)
// Registers an HTTP handler for RPC messages
rpc.HandleHTTP()
// Start listening for the requests on port 1234
listener, err := net.Listen("tcp", listenerAddress)
if err != nil {
log.Fatal("Listener error: ", err)
}
// Serve accepts incoming HTTP connections on the listener l, creating
// a new service goroutine for each. The service goroutines read requests
// and then call handler to reply to them
go http.Serve(listener, nil)

return listener
}

func TestConnector(t *testing.T) {
listener := createRPCServer(t) // create a grpcServer so we can connect to its endpoint and validate everything works.
defer listener.Close()
ctx := context.Background()
conn := NewConnector(ctx, numberOfClients, listenerAddressTcp)
require.Equal(t, len(conn.freeClients), numberOfClients)
increasedClients := numberOfClients * 2 // increase to double the number of clients
rpcList := make([]*rpcclient.Client, increasedClients)
for i := 0; i < increasedClients; i++ {
rpc, err := conn.GetRpc(ctx, true)
require.Nil(t, err)
rpcList[i] = rpc
}
require.Equal(t, conn.usedClients, increasedClients) // checking we have used clients
for i := 0; i < increasedClients; i++ {
conn.ReturnRpc(rpcList[i])
}
require.Equal(t, conn.usedClients, 0) // checking we dont have clients used
require.Equal(t, len(conn.freeClients), increasedClients) // checking we cleaned clients
}

func TestConnectorGrpc(t *testing.T) {
server := createGRPCServer(t) // create a grpcServer so we can connect to its endpoint and validate everything works.
defer server.Stop()
ctx := context.Background()
conn := NewGRPCConnector(ctx, numberOfClients, listenerAddress)
require.Equal(t, len(conn.freeClients), numberOfClients)
increasedClients := numberOfClients * 2 // increase to double the number of clients
rpcList := make([]*grpc.ClientConn, increasedClients)
for i := 0; i < increasedClients; i++ {
rpc, err := conn.GetRpc(ctx, true)
require.Nil(t, err)
rpcList[i] = rpc
}
require.Equal(t, conn.usedClients, increasedClients) // checking we have used clients
for i := 0; i < increasedClients; i++ {
conn.ReturnRpc(rpcList[i])
}
require.Equal(t, conn.usedClients, 0) // checking we dont have clients used
require.Equal(t, len(conn.freeClients), increasedClients) // checking we cleaned clients
}
2 changes: 1 addition & 1 deletion relayer/chainproxy/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (nm *GrpcMessage) Send(ctx context.Context, ch chan interface{}) (relayRepl
if ch != nil {
return nil, "", nil, utils.LavaFormatError("Subscribe is not allowed on rest", nil, nil)
}
conn, err := nm.cp.conn.GetRpc(true)
conn, err := nm.cp.conn.GetRpc(ctx, true)
if err != nil {
return nil, "", nil, utils.LavaFormatError("grpc get connection failed ", err, nil)
}
Expand Down
2 changes: 1 addition & 1 deletion relayer/chainproxy/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (nm *JrpcMessage) RequestedBlock() int64 {

func (nm *JrpcMessage) Send(ctx context.Context, ch chan interface{}) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) {
// Get node
rpc, err := nm.cp.conn.GetRpc(true)
rpc, err := nm.cp.conn.GetRpc(ctx, true)
if err != nil {
return nil, "", nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion relayer/chainproxy/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (cp *tendermintRpcChainProxy) PortalStart(ctx context.Context, privKey *btc

func (nm *TendemintRpcMessage) Send(ctx context.Context, ch chan interface{}) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) {
// Get node
rpc, err := nm.cp.conn.GetRpc(true)
rpc, err := nm.cp.conn.GetRpc(ctx, true)
if err != nil {
return nil, "", nil, err
}
Expand Down
Loading

0 comments on commit dc177fa

Please sign in to comment.