Skip to content

Commit

Permalink
Added a basic integration with dymension settlement layer (dymensionx…
Browse files Browse the repository at this point in the history
…yz#76)

* Added a basic integration with dymension settlement layer for writing and reading state updates.

* Updated github workflow to access private dymension repo.

* Updated github workflow go version on build.

* Updated golangcli job with access to private repo using token.

* Removed redundant test file.

* Removed package comment from cosmosclient.

* Ran go fmt on directory.

* Changed golangci-lint version in workflow.
  • Loading branch information
omritoptix authored Sep 11, 2022
1 parent c915ee2 commit 3d03386
Show file tree
Hide file tree
Showing 23 changed files with 2,735 additions and 207 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@ jobs:
golangci:
name: lint
runs-on: ubuntu-latest
env:
GOPRIVATE: "github.com/dymensionxyz/*"
GH_ACCESS_TOKEN: "${{ secrets.GH_ACCESS_TOKEN }}"
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: 1.17
go-version: 1.18
- run: git config --global url.https://[email protected]/.insteadOf https://github.com/
- name: golangci-lint
uses: golangci/[email protected]
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: latest
version: v1.47

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ jobs:

build:
runs-on: ubuntu-latest
env:
GOPRIVATE: "github.com/dymensionxyz/*"
GH_ACCESS_TOKEN: "${{ secrets.GH_ACCESS_TOKEN }}"
steps:
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.17
go-version: 1.18
- run: git config --global url.https://[email protected]/.insteadOf https://github.com/

- name: Build
run: go build -v ./...
Expand Down
176 changes: 96 additions & 80 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,10 @@ type Manager struct {
store store.Store
executor *state.BlockExecutor

dalc da.DataAvailabilityLayerClient
settlementlc settlement.LayerClient
retriever da.BatchRetriever
// daHeight is the height of the latest processed DA block
daHeight uint64
dalc da.DataAvailabilityLayerClient
settlementClient settlement.LayerClient
retriever da.BatchRetriever

// TODO(omritotpix): Remove the header sync fields
syncTargetDiode diodes.Diode

batchInProcess atomic.Value
Expand Down Expand Up @@ -89,7 +86,7 @@ func NewManager(
mempool mempool.Mempool,
proxyApp proxy.AppConnConsensus,
dalc da.DataAvailabilityLayerClient,
settlementlc settlement.LayerClient,
settlementClient settlement.LayerClient,
eventBus *tmtypes.EventBus,
pubsub *pubsub.Server,
logger log.Logger,
Expand All @@ -98,9 +95,6 @@ func NewManager(
if err != nil {
return nil, err
}
if s.DAHeight < conf.DAStartHeight {
s.DAHeight = conf.DAStartHeight
}

proposerAddress, err := getAddress(proposerKey)
if err != nil {
Expand Down Expand Up @@ -129,17 +123,16 @@ func NewManager(
batchInProcess.Store(false)

agg := &Manager{
pubsub: pubsub,
proposerKey: proposerKey,
conf: conf,
genesis: genesis,
lastState: s,
store: store,
executor: exec,
dalc: dalc,
settlementlc: settlementlc,
retriever: dalc.(da.BatchRetriever), // TODO(tzdybal): do it in more gentle way (after MVP)
daHeight: s.DAHeight,
pubsub: pubsub,
proposerKey: proposerKey,
conf: conf,
genesis: genesis,
lastState: s,
store: store,
executor: exec,
dalc: dalc,
settlementClient: settlementClient,
retriever: dalc.(da.BatchRetriever),
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
syncTargetDiode: diodes.NewOneToOne(1, nil),
blockInCh: make(chan newBlockEvent, 100),
Expand Down Expand Up @@ -168,14 +161,13 @@ func (m *Manager) SetDALC(dalc da.DataAvailabilityLayerClient) {
}

// getLatestBatchFromSL gets the latest batch from the SL
func (m *Manager) getLatestBatchFromSL(ctx context.Context) (settlement.ResultRetrieveBatch, error) {
var resultRetrieveBatch settlement.ResultRetrieveBatch
func (m *Manager) getLatestBatchFromSL(ctx context.Context) (*settlement.ResultRetrieveBatch, error) {
var resultRetrieveBatch *settlement.ResultRetrieveBatch
var err error
// TODO(omritoptix): Move to a separate function.
// Get latest batch from SL
err = retry.Do(
func() error {
resultRetrieveBatch, err = m.settlementlc.RetrieveBatch()
resultRetrieveBatch, err = m.settlementClient.RetrieveBatch()
if err != nil {
return err
}
Expand Down Expand Up @@ -326,17 +318,20 @@ func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) {
currentHeight := m.store.Height()
for currentHeight < syncTarget {
m.logger.Info("Syncing until target", "current height", currentHeight, "syncTarget", syncTarget)
resultRetrieveBatch, err := m.settlementlc.RetrieveBatch(currentHeight + 1)
// TODO(omritoptix): Handle in case we get batchNotfound
resultRetrieveBatch, err := m.settlementClient.RetrieveBatch(atomic.LoadUint64(&m.lastState.SLStateIndex) + 1)
if err != nil {
m.logger.Error("error while retrieving batch", "error", err)
m.logger.Error("Failed to sync until target. error while retrieving batch", "error", err)
continue
}
err = m.processNextDABatch(resultRetrieveBatch.MetaData.DA.Height)
err = m.processNextDABatch(ctx, resultRetrieveBatch.MetaData.DA.Height)
if err != nil {
m.logger.Error("error while processing next DA batch", "error", err)
m.logger.Error("Failed to sync until target. error while processing next DA batch", "error", err)
break
}
err = m.updateStateIndex(resultRetrieveBatch.StateIndex)
if err != nil {
return
}
currentHeight = m.store.Height()
}
}
Expand All @@ -349,53 +344,61 @@ func (m *Manager) ApplyBlockLoop(ctx context.Context) {
block := blockEvent.block
commit := blockEvent.commit
daHeight := blockEvent.daHeight
m.logger.Debug("block body retrieved from DALC",
"height", block.Header.Height,
"daHeight", daHeight,
"hash", block.Hash(),
)
if block.Header.Height > m.store.Height() {
m.logger.Info("Syncing block", "height", block.Header.Height)
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, block)
if err != nil {
m.logger.Error("failed to ApplyBlock", "error", err)
continue
}
err = m.store.SaveBlock(block, commit)
if err != nil {
m.logger.Error("failed to save block", "error", err)
continue
}
var appHash []byte
err = m.executor.Commit(ctx, &newState, block, responses)
if err != nil {
m.logger.Error("failed to Commit", "error", err)
continue
}
m.store.SetHeight(block.Header.Height)

err = m.store.SaveBlockResponses(block.Header.Height, responses)
if err != nil {
m.logger.Error("failed to save block responses", "error", err)
continue
}

copy(newState.AppHash[:], appHash)
newState.DAHeight = daHeight
m.lastState = newState
err = m.store.UpdateState(m.lastState)
if err != nil {
m.logger.Error("failed to save updated state", "error", err)
continue
}
err := m.applyBlock(ctx, block, commit, daHeight)
if err != nil {
continue
}
case <-ctx.Done():
return
}
}
}

func (m *Manager) processNextDABatch(daHeight uint64) error {
func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *types.Commit, daHeight uint64) error {
m.logger.Debug("block body retrieved from DALC",
"height", block.Header.Height,
"daHeight", daHeight,
"hash", block.Hash(),
)
if block.Header.Height > m.store.Height() {
m.logger.Info("Syncing block", "height", block.Header.Height)
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, block)
if err != nil {
m.logger.Error("failed to ApplyBlock", "error", err)
return err
}
err = m.store.SaveBlock(block, commit)
if err != nil {
m.logger.Error("failed to save block", "error", err)
return err
}
var appHash []byte
err = m.executor.Commit(ctx, &newState, block, responses)
if err != nil {
m.logger.Error("failed to Commit", "error", err)
return err
}
m.store.SetHeight(block.Header.Height)

err = m.store.SaveBlockResponses(block.Header.Height, responses)
if err != nil {
m.logger.Error("failed to save block responses", "error", err)
return err
}

copy(newState.AppHash[:], appHash)

m.lastState = newState
err = m.store.UpdateState(m.lastState)
if err != nil {
m.logger.Error("failed to save updated state", "error", err)
return err
}
}
return nil
}

func (m *Manager) processNextDABatch(ctx context.Context, daHeight uint64) error {
m.logger.Debug("trying to retrieve batch from DA", "daHeight", daHeight)
batchResp, err := m.fetchBatch(daHeight)
if err != nil {
Expand All @@ -405,7 +408,10 @@ func (m *Manager) processNextDABatch(daHeight uint64) error {
m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daHeight)
for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
m.blockInCh <- newBlockEvent{block, batch.Commits[i], daHeight}
err := m.applyBlock(ctx, block, batch.Commits[i], daHeight)
if err != nil {
return err
}
}
}
return nil
Expand Down Expand Up @@ -500,7 +506,6 @@ func (m *Manager) publishBlock(ctx context.Context) error {
return err
}

newState.DAHeight = atomic.LoadUint64(&m.daHeight)
// After this call m.lastState is the NEW state returned from ApplyBlock
m.lastState = newState

Expand Down Expand Up @@ -548,17 +553,27 @@ func (m *Manager) submitNextBatch(ctx context.Context) {
return
}
// Submit batch to SL
// TODO(omritoptix): Handle a case where the SL submission fails due to syncTarget out of sync. In that case
// we'll want to update the syncTarget before returning.
err = m.submitBatchToSL(ctx, nextBatch, resultSubmitToDA)
// TODO(omritoptix): Handle a case where the SL submission fails due to syncTarget out of sync with the latestHeight in the SL.
// In that case we'll want to update the syncTarget before returning.
_, err = m.submitBatchToSL(ctx, nextBatch, resultSubmitToDA)
if err != nil {
m.logger.Error("Failed to submit next batch to SL Layer", "startHeight", startHeight, "endHeight", endHeight, "error", err)
return
}
// Update the sync target
// Update the sync target and the state
atomic.StoreUint64(&m.syncTarget, endHeight)
}

func (m *Manager) updateStateIndex(stateIndex uint64) error {
atomic.StoreUint64(&m.lastState.SLStateIndex, stateIndex)
err := m.store.UpdateState(m.lastState)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
return err
}
return nil
}

func (m *Manager) createNextDABatch(startHeight uint64, endHeight uint64) (*types.Batch, error) {
// Create the batch
batch := &types.Batch{
Expand Down Expand Up @@ -586,20 +601,21 @@ func (m *Manager) createNextDABatch(startHeight uint64, endHeight uint64) (*type
return batch, nil
}

func (m *Manager) submitBatchToSL(ctx context.Context, batch *types.Batch, resultSubmitToDA *da.ResultSubmitBatch) error {
func (m *Manager) submitBatchToSL(ctx context.Context, batch *types.Batch, resultSubmitToDA *da.ResultSubmitBatch) (*settlement.ResultSubmitBatch, error) {
var resultSubmitToSL *settlement.ResultSubmitBatch
// Submit batch to SL
err := retry.Do(func() error {
resultSubmitToSL := m.settlementlc.SubmitBatch(batch, resultSubmitToDA)
resultSubmitToSL = m.settlementClient.SubmitBatch(batch, resultSubmitToDA)
if resultSubmitToSL.Code != settlement.StatusSuccess {
err := fmt.Errorf("failed to submit batch to SL layer: %s", resultSubmitToSL.Message)
return err
}
return nil
}, retry.Context(ctx), retry.LastErrorOnly(true))
if err != nil {
return err
return nil, err
}
return nil
return resultSubmitToSL, nil
}

func (m *Manager) submitBatchToDA(ctx context.Context, batch *types.Batch) (*da.ResultSubmitBatch, error) {
Expand Down
Loading

0 comments on commit 3d03386

Please sign in to comment.