Skip to content

Commit

Permalink
feat: Enable shared mock settlement (dymensionxyz#549)
Browse files Browse the repository at this point in the history
Co-authored-by: Omri <[email protected]>
  • Loading branch information
srene and omritoptix authored Feb 2, 2024
1 parent 9129983 commit 996e681
Show file tree
Hide file tree
Showing 12 changed files with 1,387 additions and 3 deletions.
1 change: 0 additions & 1 deletion .go-version

This file was deleted.

5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path/filepath"
"time"

"github.com/dymensionxyz/dymint/da/grpc"
"github.com/dymensionxyz/dymint/settlement"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -32,7 +33,9 @@ type NodeConfig struct {
SettlementLayer string `mapstructure:"settlement_layer"`
SettlementConfig settlement.Config `mapstructure:",squash"`
Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"`
BootstrapTime time.Duration `mapstructure:"bootstrap_time"`
//Config params for mock grpc da
DAGrpc grpc.Config `mapstructure:",squash"`
BootstrapTime time.Duration `mapstructure:"bootstrap_time"`
}

// BlockManagerConfig consists of all parameters required by BlockManagerConfig
Expand Down
16 changes: 16 additions & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"path/filepath"
"time"

"github.com/dymensionxyz/dymint/da/grpc"
"github.com/dymensionxyz/dymint/settlement"
)

Expand Down Expand Up @@ -50,15 +51,30 @@ func DefaultConfig(home, chainId string) *NodeConfig {
chainId = DefaultChainID
}

//Setting default params for sl grpc mock
defaultSlGrpcConfig := settlement.GrpcConfig{
Host: "127.0.0.1",
Port: 7981,
RefreshTime: 1000,
}

defaultSLconfig := settlement.Config{
KeyringBackend: "test",
NodeAddress: "http://127.0.0.1:36657",
RollappID: chainId,
KeyringHomeDir: keyringDir,
DymAccountName: "sequencer",
GasPrices: "0.025udym",
SLGrpc: defaultSlGrpcConfig,
}
cfg.SettlementConfig = defaultSLconfig

//Setting default params for da grpc mock
defaultDAGrpc := grpc.Config{
Host: "127.0.0.1",
Port: 7980,
}
cfg.DAGrpc = defaultDAGrpc

return cfg
}
8 changes: 8 additions & 0 deletions settlement/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ type Config struct {

//For testing only. probably should be refactored
ProposerPubKey string `json:"proposer_pub_key"`
//Config used for sl shared grpc mock
SLGrpc GrpcConfig `mapstructure:",squash"`
}

type GrpcConfig struct {
Host string `json:"host"`
Port int `json:"port"`
RefreshTime int `json:"refresh_time"`
}

func (c Config) Validate() error {
Expand Down
319 changes: 319 additions & 0 deletions settlement/grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
package grpc

import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"path/filepath"
"strconv"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p/core/crypto"
tmp2p "github.com/tendermint/tendermint/p2p"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/cosmos/cosmos-sdk/crypto/keys/ed25519"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
rollapptypes "github.com/dymensionxyz/dymension/x/rollapp/types"
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/log"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"

"github.com/tendermint/tendermint/libs/pubsub"

slmock "github.com/dymensionxyz/dymint/settlement/grpc/mockserv/proto"
)

// LayerClient is an extension of the base settlement layer client
// for usage in tests and local development.
type LayerClient struct {
*settlement.BaseLayerClient
}

var _ settlement.LayerI = (*LayerClient)(nil)

// Init initializes the mock layer client.
func (m *LayerClient) Init(config settlement.Config, pubsub *pubsub.Server, logger log.Logger, options ...settlement.Option) error {
HubClientMock, err := newHubClient(config, pubsub, logger)
if err != nil {
return err
}
baseOptions := []settlement.Option{
settlement.WithHubClient(HubClientMock),
}
if options == nil {
options = baseOptions
} else {
options = append(baseOptions, options...)
}
m.BaseLayerClient = &settlement.BaseLayerClient{}
err = m.BaseLayerClient.Init(config, pubsub, logger, options...)
if err != nil {
return err
}
return nil
}

// HubClient implements The HubClient interface
type HubGrpcClient struct {
ctx context.Context
ProposerPubKey string
slStateIndex uint64
logger log.Logger
pubsub *pubsub.Server
latestHeight uint64
conn *grpc.ClientConn
sl slmock.MockSLClient
stopchan chan struct{}
refreshTime int
}

var _ settlement.HubClient = &HubGrpcClient{}

func newHubClient(config settlement.Config, pubsub *pubsub.Server, logger log.Logger) (*HubGrpcClient, error) {
ctx := context.Background()

latestHeight := uint64(0)
slStateIndex := uint64(0)
proposer, err := initConfig(config)
if err != nil {
return nil, err
}
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))

logger.Debug("GRPC Dial ", "ip", config.SLGrpc.Host)

conn, err := grpc.Dial(config.SLGrpc.Host+":"+strconv.Itoa(config.SLGrpc.Port), opts...)
if err != nil {
logger.Error("Error grpc sl connecting")
return nil, err
}

client := slmock.NewMockSLClient(conn)
stopchan := make(chan struct{})

index, err := client.GetIndex(ctx, &slmock.SLGetIndexRequest{})
if err == nil {
slStateIndex = index.GetIndex()
var settlementBatch rollapptypes.MsgUpdateState
batchReply, err := client.GetBatch(ctx, &slmock.SLGetBatchRequest{Index: slStateIndex})
if err != nil {
return nil, err
}
err = json.Unmarshal(batchReply.GetBatch(), &settlementBatch)
if err != nil {
return nil, errors.New("error unmarshalling batch")
}
latestHeight = settlementBatch.StartHeight + settlementBatch.NumBlocks - 1
}
logger.Debug("Starting grpc SL ", "index", slStateIndex)

return &HubGrpcClient{
ctx: ctx,
ProposerPubKey: proposer,
logger: logger,
pubsub: pubsub,
latestHeight: latestHeight,
slStateIndex: slStateIndex,
conn: conn,
sl: client,
stopchan: stopchan,
refreshTime: config.SLGrpc.RefreshTime,
}, nil
}

func initConfig(conf settlement.Config) (proposer string, err error) {
if conf.KeyringHomeDir == "" {

if conf.ProposerPubKey != "" {
proposer = conf.ProposerPubKey
} else {
_, proposerPubKey, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return "", err
}
pubKeybytes, err := proposerPubKey.Raw()
if err != nil {
return "", err
}

proposer = hex.EncodeToString(pubKeybytes)
}
} else {
proposerKeyPath := filepath.Join(conf.KeyringHomeDir, "config/priv_validator_key.json")
key, err := tmp2p.LoadOrGenNodeKey(proposerKeyPath)
if err != nil {
return "", err
}
proposer = hex.EncodeToString(key.PubKey().Bytes())
}

return
}

// Start starts the mock client
func (c *HubGrpcClient) Start() error {
c.logger.Info("Starting grpc mock settlement")

go func() {
tick := time.NewTicker(time.Duration(c.refreshTime) * time.Millisecond)
defer tick.Stop()
for {
select {
case <-c.stopchan:
// stop
return
case <-tick.C:
index, err := c.sl.GetIndex(c.ctx, &slmock.SLGetIndexRequest{})
if err == nil {
if c.slStateIndex < index.GetIndex() {
c.logger.Info("Simulating new batch event")

time.Sleep(10 * time.Millisecond)
b, err := c.retrieveBatchAtStateIndex(index.GetIndex())
if err != nil {
panic(err)
}
err = c.pubsub.PublishWithEvents(context.Background(), &settlement.EventDataNewSettlementBatchAccepted{EndHeight: b.EndHeight}, map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}})
if err != nil {
panic(err)
}
c.slStateIndex = index.GetIndex()
}
}
}
}
}()
return nil
}

// Stop stops the mock client
func (c *HubGrpcClient) Stop() error {
c.logger.Info("Stopping grpc mock settlement")
close(c.stopchan)
return nil
}

// PostBatch saves the batch to the kv store
func (c *HubGrpcClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) {
settlementBatch := c.convertBatchtoSettlementBatch(batch, daClient, daResult)
c.saveBatch(settlementBatch)
go func() {
// sleep for 10 miliseconds to mimic a delay in batch acceptance
time.Sleep(10 * time.Millisecond)
err := c.pubsub.PublishWithEvents(context.Background(), &settlement.EventDataNewSettlementBatchAccepted{EndHeight: settlementBatch.EndHeight}, map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}})
if err != nil {
panic(err)
}
}()
}

// GetLatestBatch returns the latest batch from the kv store
func (c *HubGrpcClient) GetLatestBatch(rollappID string) (*settlement.ResultRetrieveBatch, error) {
c.logger.Info("GetLatestBatch grpc", "index", c.slStateIndex)
batchResult, err := c.GetBatchAtIndex(rollappID, atomic.LoadUint64(&c.slStateIndex))
if err != nil {
return nil, err
}
return batchResult, nil
}

// GetBatchAtIndex returns the batch at the given index
func (c *HubGrpcClient) GetBatchAtIndex(rollappID string, index uint64) (*settlement.ResultRetrieveBatch, error) {
batchResult, err := c.retrieveBatchAtStateIndex(index)
if err != nil {
return &settlement.ResultRetrieveBatch{
BaseResult: settlement.BaseResult{Code: settlement.StatusError, Message: err.Error()},
}, err
}
return batchResult, nil
}

// GetSequencers returns a list of sequencers. Currently only returns a single sequencer
func (c *HubGrpcClient) GetSequencers(rollappID string) ([]*types.Sequencer, error) {
pubKeyBytes, err := hex.DecodeString(c.ProposerPubKey)
if err != nil {
return nil, err
}
var pubKey cryptotypes.PubKey = &ed25519.PubKey{Key: pubKeyBytes}
return []*types.Sequencer{
{
PublicKey: pubKey,
Status: types.Proposer,
},
}, nil
}

func (c *HubGrpcClient) saveBatch(batch *settlement.Batch) {
c.logger.Debug("Saving batch to grpc settlement layer", "start height",
batch.StartHeight, "end height", batch.EndHeight)
b, err := json.Marshal(batch)
if err != nil {
panic(err)
}
// Save the batch to the next state index
c.logger.Debug("Saving batch to grpc settlement layer", "index", c.slStateIndex+1)
setBatchReply, err := c.sl.SetBatch(c.ctx, &slmock.SLSetBatchRequest{Index: c.slStateIndex + 1, Batch: b})
if err != nil {
panic(err)
}
if setBatchReply.GetResult() != c.slStateIndex+1 {
panic(err)
}

c.slStateIndex = setBatchReply.GetResult()

setIndexReply, err := c.sl.SetIndex(c.ctx, &slmock.SLSetIndexRequest{Index: c.slStateIndex})
if err != nil || setIndexReply.GetIndex() != c.slStateIndex {
panic(err)
}
c.logger.Debug("Setting grpc SL Index to ", "index", setIndexReply.GetIndex())
// Save latest height in memory and in store
atomic.StoreUint64(&c.latestHeight, batch.EndHeight)
}

func (c *HubGrpcClient) convertBatchtoSettlementBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) *settlement.Batch {
settlementBatch := &settlement.Batch{
StartHeight: batch.StartHeight,
EndHeight: batch.EndHeight,
MetaData: &settlement.BatchMetaData{
DA: &settlement.DAMetaData{
Height: daResult.DAHeight,
Client: daClient,
},
},
}
for _, block := range batch.Blocks {
settlementBatch.AppHashes = append(settlementBatch.AppHashes, block.Header.AppHash)
}
return settlementBatch
}

func (c *HubGrpcClient) retrieveBatchAtStateIndex(slStateIndex uint64) (*settlement.ResultRetrieveBatch, error) {
c.logger.Debug("Retrieving batch from grpc settlement layer", "SL state index", slStateIndex)

getBatchReply, err := c.sl.GetBatch(c.ctx, &slmock.SLGetBatchRequest{Index: slStateIndex})
if err != nil {
return nil, settlement.ErrBatchNotFound
}
b := getBatchReply.GetBatch()
if b == nil {
return nil, settlement.ErrBatchNotFound
}
var settlementBatch settlement.Batch
err = json.Unmarshal(b, &settlementBatch)
if err != nil {
return nil, errors.New("error unmarshalling batch")
}
batchResult := settlement.ResultRetrieveBatch{
BaseResult: settlement.BaseResult{Code: settlement.StatusSuccess, StateIndex: slStateIndex},
Batch: &settlementBatch,
}
return &batchResult, nil
}
Loading

0 comments on commit 996e681

Please sign in to comment.