Skip to content

Commit

Permalink
Refactor DB transactions to be used concurrently. (0xPolygonHermez#599)
Browse files Browse the repository at this point in the history
* Refactor DB transactions to be used concurrently.

* refactor common postgres store functionality to separate package

* pg store tests, including concurrent transactions

* Add db tx concurrency test at the top-level state

* add txBundleID parameter to all State methods
  • Loading branch information
fgimenez authored Apr 25, 2022
1 parent cb8e275 commit 8bfad9f
Show file tree
Hide file tree
Showing 57 changed files with 1,144 additions and 885 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ test: compile-scs ## Runs only short tests without checking race conditions
.PHONY: test-full
test-full: build-docker compile-scs ## Runs all tests checking race conditions
$(STOPDB)
$(RUNDB); sleep 5
$(RUNDB); sleep 7
trap '$(STOPDB)' EXIT; MallocNanoZone=0 go test -race -p 1 -timeout 600s ./...

.PHONY: install-linter
Expand Down
10 changes: 5 additions & 5 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func (a *Aggregator) Start() {
case <-time.After(a.cfg.IntervalToConsolidateState.Duration):

// 1. check, if state is synced
lastConsolidatedBatch, err := a.State.GetLastBatch(a.ctx, false)
lastConsolidatedBatch, err := a.State.GetLastBatch(a.ctx, false, "")
if err != nil {
log.Warnf("failed to get last consolidated batch, err: %v", err)
continue
}
lastConsolidatedEthBatchNum, err := a.State.GetLastBatchNumberConsolidatedOnEthereum(a.ctx)
lastConsolidatedEthBatchNum, err := a.State.GetLastBatchNumberConsolidatedOnEthereum(a.ctx, "")
if err != nil {
log.Warnf("failed to get last eth batch, err: %v", err)
continue
Expand All @@ -95,7 +95,7 @@ func (a *Aggregator) Start() {
// 2. find next batch to consolidate
delete(batchesSent, lastConsolidatedBatch.Number().Uint64())

batchToConsolidate, err := a.State.GetBatchByNumber(a.ctx, lastConsolidatedBatch.Number().Uint64()+1)
batchToConsolidate, err := a.State.GetBatchByNumber(a.ctx, lastConsolidatedBatch.Number().Uint64()+1, "")

if err != nil {
if err == state.ErrNotFound {
Expand Down Expand Up @@ -126,13 +126,13 @@ func (a *Aggregator) Start() {
}

// 4. send zki + txs to the prover
stateRootConsolidated, err := a.State.GetStateRootByBatchNumber(a.ctx, lastConsolidatedBatch.Number().Uint64())
stateRootConsolidated, err := a.State.GetStateRootByBatchNumber(a.ctx, lastConsolidatedBatch.Number().Uint64(), "")
if err != nil {
log.Warnf("failed to get current state root, err: %v", err)
continue
}

stateRootToConsolidate, err := a.State.GetStateRootByBatchNumber(a.ctx, batchToConsolidate.Number().Uint64())
stateRootToConsolidate, err := a.State.GetStateRootByBatchNumber(a.ctx, batchToConsolidate.Number().Uint64(), "")
if err != nil {
log.Warnf("failed to get state root to consolidate, err: %v", err)
continue
Expand Down
10 changes: 5 additions & 5 deletions aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ type aggregatorTxProfitabilityChecker interface {

// stateInterface gathers the methods to interract with the state.
type stateInterface interface {
GetLastBatch(ctx context.Context, isVirtual bool) (*state.Batch, error)
GetLastBatchNumberConsolidatedOnEthereum(ctx context.Context) (uint64, error)
GetBatchByNumber(ctx context.Context, batchNumber uint64) (*state.Batch, error)
GetStateRootByBatchNumber(ctx context.Context, batchNumber uint64) ([]byte, error)
GetSequencer(ctx context.Context, address common.Address) (*state.Sequencer, error)
GetLastBatch(ctx context.Context, isVirtual bool, txBundleID string) (*state.Batch, error)
GetLastBatchNumberConsolidatedOnEthereum(ctx context.Context, txBundleID string) (uint64, error)
GetBatchByNumber(ctx context.Context, batchNumber uint64, txBundleID string) (*state.Batch, error)
GetStateRootByBatchNumber(ctx context.Context, batchNumber uint64, txBundleID string) ([]byte, error)
GetSequencer(ctx context.Context, address common.Address, txBundleID string) (*state.Sequencer, error)
}
2 changes: 1 addition & 1 deletion aggregator/profitabilitychecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (pc *TxProfitabilityCheckerAcceptAll) IsProfitable(ctx context.Context, mat
}

func isConsolidatedBatchAppeared(ctx context.Context, state stateInterface, intervalAfterWhichBatchConsolidatedAnyway time.Duration) (bool, error) {
batch, err := state.GetLastBatch(ctx, false)
batch, err := state.GetLastBatch(ctx, false, "")
if err != nil {
return false, fmt.Errorf("failed to get last consolidated batch, err: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func registerSequencer(ctx *cli.Context) error {
stateDb := state.NewPostgresStorage(sqlDB)
st := state.NewState(stateCfg, stateDb, tr)

_, err = st.GetSequencer(ctx.Context, etherman.GetAddress())
_, err = st.GetSequencer(ctx.Context, etherman.GetAddress(), "")
if errors.Is(err, state.ErrNotFound) { //If It doesn't exist, register the sequencer
tx, err := etherman.RegisterSequencer(url)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ import (

type mtStore interface {
SupportsDBTransactions() bool
BeginDBTransaction(ctx context.Context) error
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
Get(ctx context.Context, key []byte) ([]byte, error)
Set(ctx context.Context, key []byte, value []byte) error
BeginDBTransaction(ctx context.Context, txBundleID string) error
Commit(ctx context.Context, txBundleID string) error
Rollback(ctx context.Context, txBundleID string) error
Get(ctx context.Context, key []byte, txBundleID string) ([]byte, error)
Set(ctx context.Context, key []byte, value []byte, txBundleID string) error
}

func start(ctx *cli.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions config/config.debug.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ ProverURI = "localhost:50051"

[MTServer]
Host = "0.0.0.0"
Port = 50052
Port = 50060
StoreBackend = "PostgreSQL"

[MTClient]
URI = "127.0.0.1:50052"
URI = "127.0.0.1:50060"
4 changes: 2 additions & 2 deletions config/config.local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ ProverURI = "hez-prover:50051"

[MTServer]
Host = "0.0.0.0"
Port = 50052
Port = 50060
StoreBackend = "PostgreSQL"

[MTClient]
URI = "127.0.0.1:50052"
URI = "127.0.0.1:50060"
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ func Test_Defaults(t *testing.T) {
},
{
path: "MTServer.Port",
expectedValue: 50052,
expectedValue: 50060,
},
{
path: "MTServer.StoreBackend",
expectedValue: tree.PgMTStoreBackend,
},
{
path: "MTClient.URI",
expectedValue: "127.0.0.1:50052",
expectedValue: "127.0.0.1:50060",
},
{
path: "Database.MaxConns",
Expand Down
4 changes: 2 additions & 2 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ ProverURI = "0.0.0.0:50051"
[MTServer]
Host = "0.0.0.0"
Port = 50052
Port = 50060
StoreBackend = "PostgreSQL"
[MTClient]
URI = "127.0.0.1:50052"
URI = "127.0.0.1:50060"
`
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ services:
hez-postgres:
container_name: hez-postgres
image: postgres
deploy:
resources:
limits:
memory: 2G
reservations:
memory: 1G
ports:
- 5432:5432
environment:
Expand Down
4 changes: 2 additions & 2 deletions gasprice/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ type pool interface {

// stateInterface gathers the methods required to interact with the state.
type stateInterface interface {
GetLastBatchNumber(ctx context.Context) (uint64, error)
GetTxsByBatchNum(ctx context.Context, batchNum uint64) ([]*types.Transaction, error)
GetLastBatchNumber(ctx context.Context, txBundleID string) (uint64, error)
GetTxsByBatchNum(ctx context.Context, batchNum uint64, txBundleID string) ([]*types.Transaction, error)
}
4 changes: 2 additions & 2 deletions gasprice/lastnbatches.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewEstimatorLastNBatches(cfg Config, state stateInterface) *LastNBatches {

// GetAvgGasPrice calculate avg gas price from last n batches.
func (g *LastNBatches) GetAvgGasPrice(ctx context.Context) (*big.Int, error) {
batchNumber, err := g.state.GetLastBatchNumber(ctx)
batchNumber, err := g.state.GetLastBatchNumber(ctx, "")
if err != nil {
return nil, fmt.Errorf("failed to get last batch number, err: %v", err)
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func (g *LastNBatches) GetAvgGasPrice(ctx context.Context) (*big.Int, error) {

// getBatchTxsTips calculates batch transaction gas fees.
func (g *LastNBatches) getBatchTxsTips(ctx context.Context, batchNum uint64, limit int, ignorePrice *big.Int, result chan results, quit chan struct{}) {
txs, err := g.state.GetTxsByBatchNum(ctx, batchNum)
txs, err := g.state.GetTxsByBatchNum(ctx, batchNum, "")
if txs == nil {
select {
case result <- results{nil, err}:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/go-git/go-billy/v5 v5.3.1
github.com/go-git/go-git/v5 v5.4.2
github.com/gobuffalo/packr/v2 v2.8.3
github.com/google/uuid v1.3.0
github.com/hermeznetwork/tracerr v0.3.2
github.com/iden3/go-iden3-crypto v0.0.14-0.20220413123345-edc36bfa5247
github.com/jackc/pgconn v1.12.0
Expand Down Expand Up @@ -64,7 +65,6 @@ require (
github.com/google/flatbuffers v1.12.1 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,9 @@ github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0=
Expand Down
4 changes: 2 additions & 2 deletions jsonrpc/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ type traceTransactionResponse struct {
func (d *Debug) TraceTransaction(hash common.Hash) (interface{}, error) {
ctx := context.Background()

tx, err := d.state.GetTransactionByHash(ctx, hash)
tx, err := d.state.GetTransactionByHash(ctx, hash, "")
if errors.Is(err, state.ErrNotFound) {
return genesisIsNotTraceableError{}, nil
}

rcpt, err := d.state.GetTransactionReceipt(ctx, hash)
rcpt, err := d.state.GetTransactionReceipt(ctx, hash, "")
if errors.Is(err, state.ErrNotFound) {
return genesisIsNotTraceableError{}, nil
}
Expand Down
Loading

0 comments on commit 8bfad9f

Please sign in to comment.