Skip to content

Commit

Permalink
event log (0xPolygonHermez#1989)
Browse files Browse the repository at this point in the history
* event log

* event log

* event log

* event log

* event log

* fix config

* reuse code

* fix

* fix

* fix Makefile

* remove default config

* remove default config

* remove default config

* remove default config
  • Loading branch information
ToniRamirezM authored Apr 5, 2023
1 parent 2b94b84 commit 9453210
Show file tree
Hide file tree
Showing 39 changed files with 744 additions and 365 deletions.
101 changes: 82 additions & 19 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/etherman"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/event"
"github.com/0xPolygonHermez/zkevm-node/event/nileventstorage"
"github.com/0xPolygonHermez/zkevm-node/event/pgeventstorage"
"github.com/0xPolygonHermez/zkevm-node/gasprice"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/client"
Expand Down Expand Up @@ -72,6 +75,24 @@ func start(cliCtx *cli.Context) error {
}
checkStateMigrations(c.StateDB)

// Prepare event log
var eventLog *event.EventLog
var eventStorage event.Storage

if c.EventLog.DB.Name == "" {
eventStorage, err = pgeventstorage.NewPostgresEventStorage(c.EventLog.DB)
if err != nil {
log.Fatal(err)
}
} else {
eventStorage, err = nileventstorage.NewNilEventStorage()
if err != nil {
log.Fatal(err)
}
}

eventLog = event.NewEventLog(c.EventLog, eventStorage)

stateSqlDB, err := db.NewSQLDB(c.StateDB)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -105,7 +126,7 @@ func start(cliCtx *cli.Context) error {
log.Infof("Chain ID read from POE SC = %v", l2ChainID)

ctx := context.Background()
st := newState(ctx, c, l2ChainID, forkIDIntervals, stateSqlDB)
st := newState(ctx, c, l2ChainID, forkIDIntervals, stateSqlDB, eventLog)

ethTxManagerStorage, err := ethtxmanager.NewPostgresStorage(c.StateDB)
if err != nil {
Expand All @@ -114,19 +135,41 @@ func start(cliCtx *cli.Context) error {

etm := ethtxmanager.New(c.EthTxManager, etherman, ethTxManagerStorage, st)

ev := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Level: event.Level_Info,
EventID: event.EventID_NodeComponentStarted,
}

for _, component := range components {
switch component {
case AGGREGATOR:
log.Info("Running aggregator")
ev.Component = event.Component_Aggregator
ev.Description = "Running aggregator"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
go runAggregator(ctx, c.Aggregator, etherman, etm, st)
case SEQUENCER:
log.Info("Running sequencer")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
seq := createSequencer(*c, poolInstance, ethTxManagerStorage, st)
ev.Component = event.Component_Sequencer
ev.Description = "Running sequencer"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
seq := createSequencer(*c, poolInstance, ethTxManagerStorage, st, eventLog)
go seq.Start(ctx)
case RPC:
log.Info("Running JSON-RPC server")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
ev.Component = event.Component_RPC
ev.Description = "Running JSON-RPC server"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
if c.RPC.EnableL2SuggestedGasPricePolling {
// Needed for rejecting transactions with too low gas price
poolInstance.StartPollingMinSuggestedGasPrice(ctx)
Expand All @@ -137,19 +180,39 @@ func start(cliCtx *cli.Context) error {
}
go runJSONRPCServer(*c, poolInstance, st, apis)
case SYNCHRONIZER:
log.Info("Running synchronizer")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
ev.Component = event.Component_Synchronizer
ev.Description = "Running synchronizer"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
go runSynchronizer(*c, etherman, etm, st, poolInstance)
case BROADCAST:
log.Info("Running broadcast service")
ev.Component = event.Component_Broadcast
ev.Description = "Running broadcast service"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
go runBroadcastServer(c.BroadcastServer, st)
case ETHTXMANAGER:
log.Info("Running eth tx manager service")
ev.Component = event.Component_EthTxManager
ev.Description = "Running eth tx manager service"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
etm := createEthTxManager(*c, ethTxManagerStorage, st)
go etm.Start()
case L2GASPRICER:
log.Info("Running L2 gasPricer")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
ev.Component = event.Component_GasPricer
ev.Description = "Running L2 gasPricer"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
go runL2GasPriceSuggester(c.L2GasPriceSuggester, st, poolInstance, etherman)
}
}
Expand Down Expand Up @@ -236,7 +299,7 @@ func runJSONRPCServer(c config.Config, pool *pool.Pool, st *state.State, apis ma
}
}

func createSequencer(cfg config.Config, pool *pool.Pool, etmStorage *ethtxmanager.PostgresStorage, st *state.State) *sequencer.Sequencer {
func createSequencer(cfg config.Config, pool *pool.Pool, etmStorage *ethtxmanager.PostgresStorage, st *state.State, eventLog *event.EventLog) *sequencer.Sequencer {
etherman, err := newEtherman(cfg)
if err != nil {
log.Fatal(err)
Expand All @@ -251,7 +314,7 @@ func createSequencer(cfg config.Config, pool *pool.Pool, etmStorage *ethtxmanage

ethTxManager := ethtxmanager.New(cfg.EthTxManager, etherman, etmStorage, st)

seq, err := sequencer.New(cfg.Sequencer, pool, st, etherman, ethTxManager)
seq, err := sequencer.New(cfg.Sequencer, pool, st, etherman, ethTxManager, eventLog)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -302,7 +365,7 @@ func waitSignal(cancelFuncs []context.CancelFunc) {
}
}

func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDIntervals []state.ForkIDInterval, sqlDB *pgxpool.Pool) *state.State {
func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDIntervals []state.ForkIDInterval, sqlDB *pgxpool.Pool, eventLog *event.EventLog) *state.State {
stateDb := state.NewPostgresStorage(sqlDB)
executorClient, _, _ := executor.NewExecutorClient(ctx, c.Executor)
stateDBClient, _, _ := merkletree.NewMTDBServiceClient(ctx, c.MTClient)
Expand All @@ -314,17 +377,17 @@ func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDInt
ForkIDIntervals: forkIDIntervals,
}

st := state.NewState(stateCfg, stateDb, executorClient, stateTree)
st := state.NewState(stateCfg, stateDb, executorClient, stateTree, eventLog)
return st
}

func createPool(cfgPool pool.Config, l2BridgeAddr common.Address, l2ChainID uint64, st *state.State) *pool.Pool {
func createPool(cfgPool pool.Config, l2BridgeAddr common.Address, l2ChainID uint64, st *state.State, eventLog *event.EventLog) *pool.Pool {
runPoolMigrations(cfgPool.DB)
poolStorage, err := pgpoolstorage.NewPostgresPoolStorage(cfgPool.DB)
if err != nil {
log.Fatal(err)
}
poolInstance := pool.NewPool(cfgPool, poolStorage, st, l2BridgeAddr, l2ChainID)
poolInstance := pool.NewPool(cfgPool, poolStorage, st, l2BridgeAddr, l2ChainID, eventLog)
return poolInstance
}

Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/etherman"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/event"
"github.com/0xPolygonHermez/zkevm-node/gasprice"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc"
"github.com/0xPolygonHermez/zkevm-node/log"
Expand Down Expand Up @@ -67,6 +68,7 @@ type Config struct {
MTClient merkletree.Config
StateDB db.Config
Metrics metrics.Config
EventLog event.Config
}

// Default parses the default configuration values.
Expand Down
1 change: 0 additions & 1 deletion config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,3 @@ Enabled = false
ProfilingHost = "0.0.0.0"
ProfilingPort = 6060
ProfilingEnabled = false

20 changes: 20 additions & 0 deletions db/migrations/state/0005.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +migrate Up
DROP table state.event;
DROP table state.debug;

-- +migrate Down
CREATE TABLE IF NOT EXISTS state.event
(
event_type VARCHAR NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
ip VARCHAR,
tx_hash VARCHAR,
payload VARCHAR
);

CREATE TABLE IF NOT EXISTS state.debug
(
error_type VARCHAR,
timestamp timestamp,
payload VARCHAR
);
14 changes: 14 additions & 0 deletions db/scripts/init_event_db.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TYPE level_t AS ENUM ('emerg', 'alert', 'crit', 'err', 'warning', 'notice', 'info', 'debug');

CREATE TABLE public.event (
id BIGSERIAL PRIMARY KEY,
received_at timestamp WITH TIME ZONE default CURRENT_TIMESTAMP,
ip_address inet,
source varchar(32) not null,
component varchar(32),
level level_t not null,
event_id varchar(32) not null,
description text,
data bytea,
json jsonb
);
9 changes: 9 additions & 0 deletions event/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package event

import "github.com/0xPolygonHermez/zkevm-node/db"

// Config for event
type Config struct {
// DB is the database configuration
DB db.Config `mapstructure:"DB"`
}
88 changes: 88 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package event

import (
"math/big"
"time"
)

// EventID is the ID of the event
type EventID string

// Source is the source of the event
type Source string

// Component is the component that triggered the event
type Component string

// Level is the level of the event
type Level string

const (
// EventID_NodeComponentStarted is triggered when the node starts
EventID_NodeComponentStarted = "NODE COMPONENT STARTED"
// EventID_PreexecutionOOC is triggered when an OOC error is detected during the preexecution
EventID_PreexecutionOOC EventID = "PRE EXECUTION OOC"
// EventID_PreexecutionOOG is triggered when an OOG error is detected during the preexecution
EventID_PreexecutionOOG EventID = "PRE EXECUTION OOG"
// EventID_ExecutorError is triggered when an error is detected during the execution
EventID_ExecutorError EventID = "EXECUTOR ERROR"
// EventID_ReprocessFullBatchOOC is triggered when an OOC error is detected during the reprocessing of a full batch
EventID_ReprocessFullBatchOOC EventID = "REPROCESS FULL BATCH OOC"
// EventID_ExecutorRLPError is triggered when an RLP error is detected during the execution
EventID_ExecutorRLPError EventID = "EXECUTOR RLP ERROR"
// EventID_FinalizerHalt is triggered when the finalizer halts
EventID_FinalizerHalt EventID = "FINALIZER HALT"

// Source_Node is the source of the event
Source_Node Source = "node"

// Component_RPC is the component that triggered the event
Component_RPC Component = "rpc"
// Component_Pool is the component that triggered the event
Component_Pool Component = "pool"
// Component_Sequencer is the component that triggered the event
Component_Sequencer Component = "sequencer"
// Component_Synchronizer is the component that triggered the event
Component_Synchronizer Component = "synchronizer"
// Component_Aggregator is the component that triggered the event
Component_Aggregator Component = "aggregator"
// Component_EthTxManager is the component that triggered the event
Component_EthTxManager Component = "ethtxmanager"
// Component_GasPricer is the component that triggered the event
Component_GasPricer Component = "gaspricer"
// Component_Executor is the component that triggered the event
Component_Executor Component = "executor"
// Component_Broadcast is the component that triggered the event
Component_Broadcast Component = "broadcast"

// Level_Emergency is the most severe level
Level_Emergency Level = "emerg"
// Level_Alert is the second most severe level
Level_Alert Level = "alert"
// Level_Critical is the third most severe level
Level_Critical Level = "crit"
// Level_Error is the fourth most severe level
Level_Error Level = "err"
// Level_Warning is the fifth most severe level
Level_Warning Level = "warning"
// Level_Notice is the sixth most severe level
Level_Notice Level = "notice"
// Level_Info is the seventh most severe level
Level_Info Level = "info"
// Level_Debug is the least severe level
Level_Debug Level = "debug"
)

// Event represents a event that may be investigated
type Event struct {
Id big.Int
ReceivedAt time.Time
IPAddress string
Source Source
Component Component
Level Level
EventID EventID
Description string
Data []byte
Json interface{}
}
53 changes: 53 additions & 0 deletions event/eventlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package event

import (
"context"
"encoding/json"
"time"

"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor/pb"
)

// EventLog is the main struct for the event log
type EventLog struct {
cfg Config
storage Storage
}

// NewEventLog creates and initializes an instance of EventLog
func NewEventLog(cfg Config, storage Storage) *EventLog {
return &EventLog{
cfg: cfg,
storage: storage,
}
}

// LogEvent is used to store an event for runtime debugging
func (e *EventLog) LogEvent(ctx context.Context, event *Event) error {
return e.storage.LogEvent(ctx, event)
}

// LogExecutorError is used to store Executor error for runtime debugging
func (e *EventLog) LogExecutorError(ctx context.Context, responseError pb.ExecutorError, processBatchRequest *pb.ProcessBatchRequest) {
timestamp := time.Now()
log.Errorf("error found in the executor: %v at %v", responseError, timestamp)
payload, err := json.Marshal(processBatchRequest)
if err != nil {
log.Errorf("error marshaling payload: %v", err)
} else {
event := &Event{
ReceivedAt: timestamp,
Source: Source_Node,
Component: Component_Executor,
Level: Level_Error,
EventID: EventID_ExecutorError,
Description: responseError.String(),
Json: string(payload),
}
err = e.storage.LogEvent(ctx, event)
if err != nil {
log.Errorf("error storing event: %v", err)
}
}
}
Loading

0 comments on commit 9453210

Please sign in to comment.