Skip to content

Commit

Permalink
fix ws subscribe to get filtered log notifications (0xPolygonHermez#2396
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tclemos authored Aug 14, 2023
1 parent d045bd5 commit 077fb50
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 68 deletions.
10 changes: 8 additions & 2 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,9 @@ func (e *EthEndpoints) UninstallFilter(filterID string) (interface{}, types.Erro
func (e *EthEndpoints) Syncing() (interface{}, types.Error) {
return e.txMan.NewDbTxScope(e.state, func(ctx context.Context, dbTx pgx.Tx) (interface{}, types.Error) {
_, err := e.state.GetLastL2BlockNumber(ctx, dbTx)
if err != nil {
if errors.Is(err, state.ErrStateNotSynchronized) {
return nil, types.NewRPCErrorWithData(types.DefaultErrorCode, state.ErrStateNotSynchronized.Error(), nil)
} else if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to get last block number from state", err)
}

Expand Down Expand Up @@ -997,7 +999,10 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
}

if changes != nil {
e.sendSubscriptionResponse(filter, changes)
ethLogs := changes.([]types.Log)
for _, ethLog := range ethLogs {
e.sendSubscriptionResponse(filter, ethLog)
}
}
}
}
Expand Down Expand Up @@ -1027,4 +1032,5 @@ func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data interface{}
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
}
log.Debugf("WS message sent: %v", string(message))
}
1 change: 1 addition & 0 deletions jsonrpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (h *Handler) Handle(req handleRequest) types.Response {

// HandleWs handle websocket requests
func (h *Handler) HandleWs(reqBody []byte, wsConn *websocket.Conn, httpReq *http.Request) ([]byte, error) {
log.Debugf("WS message received: %v", string(reqBody))
var req types.Request
if err := json.Unmarshal(reqBody, &req); err != nil {
return types.NewResponse(req, nil, types.NewRPCError(types.InvalidRequestErrorCode, "Invalid json request")).Bytes()
Expand Down
2 changes: 1 addition & 1 deletion jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func RPCErrorResponse(code int, message string, err error) (interface{}, types.E
// RPCErrorResponseWithData formats error to be returned through RPC
func RPCErrorResponseWithData(code int, message string, data *[]byte, err error) (interface{}, types.Error) {
if err != nil {
log.Errorf("%v:%v", message, err.Error())
log.Errorf("%v: %v", message, err.Error())
} else {
log.Error(message)
}
Expand Down
83 changes: 46 additions & 37 deletions state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1847,51 +1847,60 @@ func (p *PostgresStorage) IsL2BlockVirtualized(ctx context.Context, blockNumber
// GetLogs returns the logs that match the filter
func (p *PostgresStorage) GetLogs(ctx context.Context, fromBlock uint64, toBlock uint64, addresses []common.Address, topics [][]common.Hash, blockHash *common.Hash, since *time.Time, dbTx pgx.Tx) ([]*types.Log, error) {
const getLogsByBlockHashSQL = `
SELECT t.l2_block_num, b.block_hash, l.tx_hash, l.log_index, l.address, l.data, l.topic0, l.topic1, l.topic2, l.topic3
FROM state.log l
INNER JOIN state.transaction t ON t.hash = l.tx_hash
INNER JOIN state.l2block b ON b.block_num = t.l2_block_num
WHERE b.block_hash = $1
ORDER BY b.block_num ASC, l.log_index ASC`
const getLogsByFilterSQL = `
SELECT t.l2_block_num, b.block_hash, l.tx_hash, l.log_index, l.address, l.data, l.topic0, l.topic1, l.topic2, l.topic3
FROM state.log l
INNER JOIN state.transaction t ON t.hash = l.tx_hash
INNER JOIN state.l2block b ON b.block_num = t.l2_block_num
WHERE b.block_num BETWEEN $1 AND $2 AND (l.address = any($3) OR $3 IS NULL)
AND (l.topic0 = any($4) OR $4 IS NULL)
AND (l.topic1 = any($5) OR $5 IS NULL)
AND (l.topic2 = any($6) OR $6 IS NULL)
AND (l.topic3 = any($7) OR $7 IS NULL)
AND (b.created_at >= $8 OR $8 IS NULL)
ORDER BY b.block_num ASC, l.log_index ASC`

var err error
var rows pgx.Rows
q := p.getExecQuerier(dbTx)
SELECT t.l2_block_num, b.block_hash, l.tx_hash, l.log_index, l.address, l.data, l.topic0, l.topic1, l.topic2, l.topic3
FROM state.log l
INNER JOIN state.transaction t ON t.hash = l.tx_hash
INNER JOIN state.l2block b ON b.block_num = t.l2_block_num
WHERE b.block_hash = $1
AND (l.address = any($2) OR $2 IS NULL)
AND (l.topic0 = any($3) OR $3 IS NULL)
AND (l.topic1 = any($4) OR $4 IS NULL)
AND (l.topic2 = any($5) OR $5 IS NULL)
AND (l.topic3 = any($6) OR $6 IS NULL)
AND (b.created_at >= $7 OR $7 IS NULL)
ORDER BY b.block_num ASC, l.log_index ASC`
const getLogsByBlockNumbersSQL = `
SELECT t.l2_block_num, b.block_hash, l.tx_hash, l.log_index, l.address, l.data, l.topic0, l.topic1, l.topic2, l.topic3
FROM state.log l
INNER JOIN state.transaction t ON t.hash = l.tx_hash
INNER JOIN state.l2block b ON b.block_num = t.l2_block_num
WHERE b.block_num BETWEEN $1 AND $2
AND (l.address = any($3) OR $3 IS NULL)
AND (l.topic0 = any($4) OR $4 IS NULL)
AND (l.topic1 = any($5) OR $5 IS NULL)
AND (l.topic2 = any($6) OR $6 IS NULL)
AND (l.topic3 = any($7) OR $7 IS NULL)
AND (b.created_at >= $8 OR $8 IS NULL)
ORDER BY b.block_num ASC, l.log_index ASC`

var args []interface{}
var query string
if blockHash != nil {
rows, err = q.Query(ctx, getLogsByBlockHashSQL, blockHash.String())
args = []interface{}{blockHash.String()}
query = getLogsByBlockHashSQL
} else {
args = []interface{}{fromBlock, toBlock}
query = getLogsByBlockNumbersSQL
}

if len(addresses) > 0 {
args = append(args, p.addressesToHex(addresses))
} else {
args := []interface{}{fromBlock, toBlock}
args = append(args, nil)
}

if len(addresses) > 0 {
args = append(args, p.addressesToHex(addresses))
for i := 0; i < maxTopics; i++ {
if len(topics) > i && len(topics[i]) > 0 {
args = append(args, p.hashesToHex(topics[i]))
} else {
args = append(args, nil)
}
}

for i := 0; i < maxTopics; i++ {
if len(topics) > i && len(topics[i]) > 0 {
args = append(args, p.hashesToHex(topics[i]))
} else {
args = append(args, nil)
}
}

args = append(args, since)
args = append(args, since)

rows, err = q.Query(ctx, getLogsByFilterSQL, args...)
}
q := p.getExecQuerier(dbTx)
rows, err := q.Query(ctx, query, args...)

if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 077fb50

Please sign in to comment.