Skip to content

Commit

Permalink
Add IP to Pool txs (0xPolygonHermez#1817)
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM authored Mar 16, 2023
1 parent 992a322 commit d7b8575
Show file tree
Hide file tree
Showing 21 changed files with 194 additions and 161 deletions.
13 changes: 13 additions & 0 deletions db/migrations/pool/0003.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +migrate Up
ALTER TABLE pool.transaction
DROP COLUMN failed_counter;

ALTER TABLE pool.transaction
ADD COLUMN ip VARCHAR;

-- +migrate Down
ALTER TABLE pool.transaction
ADD COLUMN failed_counter DECIMAL(78, 0) DEFAULT 0;

ALTER TABLE pool.transaction
DROP COLUMN ip;
12 changes: 12 additions & 0 deletions db/migrations/state/0004.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +migrate Up
CREATE TABLE state.event
(
event_type VARCHAR NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
ip VARCHAR,
tx_hash VARCHAR,
payload VARCHAR
);

-- +migrate Down
DROP table state.event;
10 changes: 6 additions & 4 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"net/http"

"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/log"
Expand Down Expand Up @@ -699,11 +700,12 @@ func (e *EthEndpoints) newPendingTransactionFilter(wsConn *websocket.Conn) (inte
// SendRawTransaction has two different ways to handle new transactions:
// - for Sequencer nodes it tries to add the tx to the pool
// - for Non-Sequencer nodes it relays the Tx to the Sequencer node
func (e *EthEndpoints) SendRawTransaction(input string) (interface{}, rpcError) {
func (e *EthEndpoints) SendRawTransaction(httpRequest *http.Request, input string) (interface{}, rpcError) {
if e.cfg.SequencerNodeURI != "" {
return e.relayTxToSequencerNode(input)
} else {
return e.tryToAddTxToPool(input)
ip := httpRequest.Header.Get("X-Forwarded-For")
return e.tryToAddTxToPool(input, ip)
}
}

Expand All @@ -722,14 +724,14 @@ func (e *EthEndpoints) relayTxToSequencerNode(input string) (interface{}, rpcErr
return txHash, nil
}

func (e *EthEndpoints) tryToAddTxToPool(input string) (interface{}, rpcError) {
func (e *EthEndpoints) tryToAddTxToPool(input, ip string) (interface{}, rpcError) {
tx, err := hexToTx(input)
if err != nil {
return rpcErrorResponse(invalidParamsErrorCode, "invalid tx input", err)
}

log.Infof("adding TX to the pool: %v", tx.Hash().Hex())
if err := e.pool.AddTx(context.Background(), *tx); err != nil {
if err := e.pool.AddTx(context.Background(), *tx, ip); err != nil {
return rpcErrorResponse(defaultErrorCode, err.Error(), nil)
}
log.Infof("TX added to the pool: %v", tx.Hash().Hex())
Expand Down
12 changes: 6 additions & 6 deletions jsonrpc/endpoints_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2735,7 +2735,7 @@ func TestSendRawTransactionViaGeth(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(nil).
Once()
},
Expand All @@ -2752,7 +2752,7 @@ func TestSendRawTransactionViaGeth(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(errors.New("failed to add TX to the pool")).
Once()
},
Expand Down Expand Up @@ -2810,7 +2810,7 @@ func TestSendRawTransactionJSONRPCCall(t *testing.T) {
},
SetupMocks: func(t *testing.T, m *mocks, tc testCase) {
m.Pool.
On("AddTx", context.Background(), mock.IsType(types.Transaction{})).
On("AddTx", context.Background(), mock.IsType(types.Transaction{}), "").
Return(nil).
Once()
},
Expand All @@ -2832,7 +2832,7 @@ func TestSendRawTransactionJSONRPCCall(t *testing.T) {
},
SetupMocks: func(t *testing.T, m *mocks, tc testCase) {
m.Pool.
On("AddTx", context.Background(), mock.IsType(types.Transaction{})).
On("AddTx", context.Background(), mock.IsType(types.Transaction{}), "").
Return(errors.New("failed to add TX to the pool")).
Once()
},
Expand Down Expand Up @@ -2900,7 +2900,7 @@ func TestSendRawTransactionViaGethForNonSequencerNode(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(nil).
Once()
},
Expand All @@ -2917,7 +2917,7 @@ func TestSendRawTransactionViaGethForNonSequencerNode(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(errors.New("failed to add TX to the pool")).
Once()
},
Expand Down
11 changes: 10 additions & 1 deletion jsonrpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jsonrpc
import (
"encoding/json"
"fmt"
"net/http"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -34,7 +35,8 @@ func (f *funcData) numParams() int {

type handleRequest struct {
Request
wsConn *websocket.Conn
wsConn *websocket.Conn
HttpRequest *http.Request
}

// Handler manage services to handle jsonrpc requests
Expand Down Expand Up @@ -101,12 +103,19 @@ func (h *Handler) Handle(req handleRequest) Response {
requestHasWebSocketConn := req.wsConn != nil
funcHasMoreThanOneInputParams := len(fd.reqt) > 1
firstFuncParamIsWebSocketConn := false
firstFuncParamIsHttpRequest := false
if funcHasMoreThanOneInputParams {
firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&websocket.Conn{}))
firstFuncParamIsHttpRequest = fd.reqt[1].AssignableTo(reflect.TypeOf(&http.Request{}))
}
if requestHasWebSocketConn && firstFuncParamIsWebSocketConn {
inArgs[1] = reflect.ValueOf(req.wsConn)
inArgsOffset++
} else if firstFuncParamIsHttpRequest {
// If in the future one endponit needs to have both a websocket connection and an http request
// we will need to modify this code to properly handle it
inArgs[1] = reflect.ValueOf(req.HttpRequest)
inArgsOffset++
}

// check params passed by request match function params
Expand Down
2 changes: 1 addition & 1 deletion jsonrpc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// jsonRPCTxPool contains the methods required to interact with the tx pool.
type jsonRPCTxPool interface {
AddTx(ctx context.Context, tx types.Transaction) error
AddTx(ctx context.Context, tx types.Transaction, ip string) error
GetGasPrice(ctx context.Context) (uint64, error)
GetNonce(ctx context.Context, address common.Address) (uint64, error)
GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error)
Expand Down
10 changes: 5 additions & 5 deletions jsonrpc/mock_pool_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ func (s *Server) handle(w http.ResponseWriter, req *http.Request) {

start := time.Now()
if single {
s.handleSingleRequest(w, data)
s.handleSingleRequest(req, w, data)
} else {
s.handleBatchRequest(w, data)
s.handleBatchRequest(req, w, data)
}
metrics.RequestDuration(start)
}
Expand All @@ -262,14 +262,14 @@ func (s *Server) isSingleRequest(data []byte) (bool, rpcError) {
return x[0] == '{', nil
}

func (s *Server) handleSingleRequest(w http.ResponseWriter, data []byte) {
func (s *Server) handleSingleRequest(httpRequest *http.Request, w http.ResponseWriter, data []byte) {
defer metrics.RequestHandled(metrics.RequestHandledLabelSingle)
request, err := s.parseRequest(data)
if err != nil {
handleError(w, err)
return
}
req := handleRequest{Request: request}
req := handleRequest{Request: request, HttpRequest: httpRequest}
response := s.handler.Handle(req)

respBytes, err := json.Marshal(response)
Expand All @@ -285,7 +285,7 @@ func (s *Server) handleSingleRequest(w http.ResponseWriter, data []byte) {
}
}

func (s *Server) handleBatchRequest(w http.ResponseWriter, data []byte) {
func (s *Server) handleBatchRequest(httpRequest *http.Request, w http.ResponseWriter, data []byte) {
defer metrics.RequestHandled(metrics.RequestHandledLabelBatch)
requests, err := s.parseRequests(data)
if err != nil {
Expand All @@ -296,7 +296,7 @@ func (s *Server) handleBatchRequest(w http.ResponseWriter, data []byte) {
responses := make([]Response, 0, len(requests))

for _, request := range requests {
req := handleRequest{Request: request}
req := handleRequest{Request: request, HttpRequest: httpRequest}
response := s.handler.Handle(req)
responses = append(responses, response)
}
Expand Down
2 changes: 1 addition & 1 deletion pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type storage interface {
GetTxs(ctx context.Context, filterStatus TxStatus, isClaims bool, minGasPrice, limit uint64) ([]*Transaction, error)
GetTxFromAddressFromByHash(ctx context.Context, hash common.Hash) (common.Address, uint64, error)
GetTxByHash(ctx context.Context, hash common.Hash) (*Transaction, error)
IncrementFailedCounter(ctx context.Context, hashes []string) error
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, error)
DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
MarkWIPTxsAsPending(ctx context.Context) error
Expand All @@ -41,4 +40,5 @@ type stateInterface interface {
GetNonce(ctx context.Context, address common.Address, batchNumber uint64, dbTx pgx.Tx) (uint64, error)
GetTransactionByHash(ctx context.Context, transactionHash common.Hash, dbTx pgx.Tx) (*types.Transaction, error)
PreProcessTransaction(ctx context.Context, tx *types.Transaction, dbTx pgx.Tx) (*state.ProcessBatchResponse, error)
AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error
}
Loading

0 comments on commit d7b8575

Please sign in to comment.