Skip to content

Commit

Permalink
Merge pull request maticnetwork#320 from maticnetwork/sidechannel-mod…
Browse files Browse the repository at this point in the history
…ules

Sidechannel modules (WIP)
  • Loading branch information
mankenavenkatesh authored May 2, 2020
2 parents 838c0ed + 4e106f3 commit 772b185
Show file tree
Hide file tree
Showing 33 changed files with 1,541 additions and 551 deletions.
5 changes: 2 additions & 3 deletions bridge/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ func GetStartCmd() *cobra.Command {
Run: func(cmd *cobra.Command, args []string) {

// create codec
cdc := app.MakeCodec()

cdc := app.MakeCodec()
// queue connector & http client
_queueConnector := queue.NewQueueConnector(helper.GetConfig().AmqpURL)
_queueConnector.StartWorker()
Expand All @@ -52,7 +51,7 @@ func GetStartCmd() *cobra.Command {
// selected services to start
services := []common.Service{}
services = append(services,
listener.NewListenerService(cdc, _queueConnector),
listener.NewListenerService(cdc, _queueConnector, _httpClient),
processor.NewProcessorService(cdc, _queueConnector, _httpClient, _txBroadcaster),
)

Expand Down
8 changes: 7 additions & 1 deletion bridge/setu/listener/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/maticnetwork/heimdall/bridge/setu/queue"
"github.com/maticnetwork/heimdall/bridge/setu/util"
"github.com/maticnetwork/heimdall/helper"

httpClient "github.com/tendermint/tendermint/rpc/client"
)

// Listener defines a block header listerner for Rootchain, Maticchain, Heimdall
Expand Down Expand Up @@ -64,12 +66,15 @@ type BaseListener struct {
// queue connector
queueConnector *queue.QueueConnector

// http client to subscribe to
httpClient *httpClient.HTTP

// storage client
storageClient *leveldb.DB
}

// NewBaseListener creates a new BaseListener.
func NewBaseListener(cdc *codec.Codec, queueConnector *queue.QueueConnector, chainClient *ethclient.Client, name string, impl Listener) *BaseListener {
func NewBaseListener(cdc *codec.Codec, queueConnector *queue.QueueConnector, httpClient *httpClient.HTTP, chainClient *ethclient.Client, name string, impl Listener) *BaseListener {

logger := util.Logger().With("service", "listener", "module", name)
contractCaller, err := helper.NewContractCaller()
Expand All @@ -92,6 +97,7 @@ func NewBaseListener(cdc *codec.Codec, queueConnector *queue.QueueConnector, cha

cliCtx: cliCtx,
queueConnector: queueConnector,
httpClient: httpClient,
contractConnector: contractCaller,
chainClient: chainClient,

Expand Down
88 changes: 47 additions & 41 deletions bridge/setu/listener/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package listener
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -64,17 +63,36 @@ func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.
// the ending of the interval
ticker := time.NewTicker(interval)

var eventTypes []string
eventTypes = append(eventTypes, "message.action='checkpoint'")
eventTypes = append(eventTypes, "message.action='event-record'")
// var eventTypes []string
// eventTypes = append(eventTypes, "message.action='checkpoint'")
// eventTypes = append(eventTypes, "message.action='event-record'")
// eventTypes = append(eventTypes, "message.action='tick'")
// ADD EVENT TYPE for SLASH-LIMIT

// start listening
for {
select {
case <-ticker.C:
fromBlock, toBlock := hl.fetchFromAndToBlock()
if fromBlock < toBlock {
for _, eventType := range eventTypes {
fromBlock, toBlock, err := hl.fetchFromAndToBlock()
if err != nil {
hl.Logger.Error("Error fetching fromBlock and toBlock...skipping events query", "error", err)
} else if fromBlock < toBlock {

hl.Logger.Info("Fetching new events between", "fromBlock", fromBlock, "toBlock", toBlock)

// Querying and processing Begin events
for i := fromBlock; i <= toBlock; i++ {
events, err := helper.GetBeginBlockEvents(hl.httpClient, int64(i))
if err != nil {
hl.Logger.Error("Error fetching begin block events", "error", err)
}
for _, event := range events {
hl.ProcessBlockEvent(sdk.StringifyEvent(event), int64(i))
}
}

// Querying and processing tx Events. Below for loop is kept for future purpose to process events from tx
/* for _, eventType := range eventTypes {
var query []string
query = append(query, eventType)
query = append(query, fmt.Sprintf("tx.height>=%v", fromBlock))
Expand Down Expand Up @@ -107,7 +125,7 @@ func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.
page = 0
}
}
}
} */
// set last block to storage
if err := hl.storageClient.Put([]byte(heimdallLastBlockKey), []byte(strconv.FormatUint(toBlock, 10)), nil); err != nil {
hl.Logger.Error("hl.storageClient.Put", "Error", err)
Expand All @@ -122,28 +140,25 @@ func (hl *HeimdallListener) StartPolling(ctx context.Context, pollInterval time.
}
}

func (hl *HeimdallListener) fetchFromAndToBlock() (fromBlock uint64, toBlock uint64) {
func (hl *HeimdallListener) fetchFromAndToBlock() (uint64, uint64, error) {
// toBlock - get latest blockheight from heimdall node
fromBlock := uint64(0)
toBlock := uint64(0)

nodeStatus, err := helper.GetNodeStatus(hl.cliCtx)
if err != nil {
hl.Logger.Error("Error while fetching latest block", "error", err)
return
hl.Logger.Error("Error while fetching heimdall node status", "error", err)
return fromBlock, toBlock, err
}
toBlock = uint64(nodeStatus.SyncInfo.LatestBlockHeight)

// Process them after two blocks since side-tx takes 2 block to process
if toBlock <= 2 {
return
}
toBlock = toBlock - 2

// fromBlock - get last block from storage
hasLastBlock, _ := hl.storageClient.Has([]byte(heimdallLastBlockKey), nil)
if hasLastBlock {
lastBlockBytes, err := hl.storageClient.Get([]byte(heimdallLastBlockKey), nil)
if err != nil {
hl.Logger.Info("Error while fetching last block bytes from storage", "error", err)
return
return fromBlock, toBlock, err
}

if result, err := strconv.ParseUint(string(lastBlockBytes), 10, 64); err == nil {
Expand All @@ -152,38 +167,33 @@ func (hl *HeimdallListener) fetchFromAndToBlock() (fromBlock uint64, toBlock uin
} else {
hl.Logger.Info("Error parsing last block bytes from storage", "error", err)
toBlock = 0
return
return fromBlock, toBlock, err
}
}
return
return fromBlock, toBlock, err
}

// ProcessEvent - process event from heimdall.
func (hl *HeimdallListener) ProcessEvent(event sdk.StringEvent, tx sdk.TxResponse) {
hl.Logger.Info("Process received event from Heimdall", "eventType", event.Type)
// ProcessBlockEvent - process Blockevents (BeginBlock, EndBlock events) from heimdall.
func (hl *HeimdallListener) ProcessBlockEvent(event sdk.StringEvent, blockHeight int64) {
hl.Logger.Info("Received block event from Heimdall", "eventType", event.Type)
eventBytes, err := json.Marshal(event)
if err != nil {
hl.Logger.Error("Error while parsing event", "error", err, "eventType", event.Type)
hl.Logger.Error("Error while parsing block event", "error", err, "eventType", event.Type)
return
}

// txBytes, err := json.Marshal(tx)
// if err != nil {
// hl.Logger.Error("Error while parsing tx", "error", err, "txHash", tx.TxHash)
// return
// }

switch event.Type {
case clerkTypes.EventTypeRecord:
hl.sendTask("sendDepositRecordToMatic", eventBytes, tx.Height, tx.TxHash)
hl.sendBlockTask("sendDepositRecordToMatic", eventBytes, blockHeight)
case checkpointTypes.EventTypeCheckpoint:
hl.sendTask("sendCheckpointToRootchain", eventBytes, tx.Height, tx.TxHash)
hl.sendBlockTask("sendCheckpointToRootchain", eventBytes, blockHeight)

default:
hl.Logger.Info("EventType mismatch", "eventType", event.Type)
hl.Logger.Info("BlockEvent Type mismatch", "eventType", event.Type)
}
}

func (hl *HeimdallListener) sendTask(taskName string, eventBytes []byte, txHeight int64, txHash string) {
func (hl *HeimdallListener) sendBlockTask(taskName string, eventBytes []byte, blockHeight int64) {
// create machinery task
signature := &tasks.Signature{
Name: taskName,
Expand All @@ -194,19 +204,15 @@ func (hl *HeimdallListener) sendTask(taskName string, eventBytes []byte, txHeigh
},
{
Type: "int64",
Value: txHeight,
},
{
Type: "string",
Value: txHash,
Value: blockHeight,
},
},
}
signature.RetryCount = 3
hl.Logger.Info("Sending task", "taskName", taskName, "currentTime", time.Now())
hl.Logger.Info("Sending block level task", "taskName", taskName, "currentTime", time.Now(), "blockHeight", blockHeight)
// send task
_, err := hl.queueConnector.Server.SendTask(signature)
if err != nil {
hl.Logger.Error("Error sending task", "taskName", taskName, "error", err)
hl.Logger.Error("Error sending block level task", "taskName", taskName, "blockHeight", blockHeight, "error", err)
}
}
9 changes: 5 additions & 4 deletions bridge/setu/listener/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/maticnetwork/heimdall/bridge/setu/util"
"github.com/maticnetwork/heimdall/helper"
"github.com/tendermint/tendermint/libs/common"
httpClient "github.com/tendermint/tendermint/rpc/client"
)

const (
Expand All @@ -26,23 +27,23 @@ type ListenerService struct {
}

// NewListenerService returns new service object for listneing to events
func NewListenerService(cdc *codec.Codec, queueConnector *queue.QueueConnector) *ListenerService {
func NewListenerService(cdc *codec.Codec, queueConnector *queue.QueueConnector, httpClient *httpClient.HTTP) *ListenerService {

// creating listener object
listenerService := &ListenerService{}

listenerService.BaseService = *common.NewBaseService(logger, ListenerServiceStr, listenerService)

rootchainListener := NewRootChainListener()
rootchainListener.BaseListener = *NewBaseListener(cdc, queueConnector, helper.GetMainClient(), RootChainListenerStr, rootchainListener)
rootchainListener.BaseListener = *NewBaseListener(cdc, queueConnector, httpClient, helper.GetMainClient(), RootChainListenerStr, rootchainListener)
listenerService.listeners = append(listenerService.listeners, rootchainListener)

maticchainListener := &MaticChainListener{}
maticchainListener.BaseListener = *NewBaseListener(cdc, queueConnector, helper.GetMaticClient(), MaticChainListenerStr, maticchainListener)
maticchainListener.BaseListener = *NewBaseListener(cdc, queueConnector, httpClient, helper.GetMaticClient(), MaticChainListenerStr, maticchainListener)
listenerService.listeners = append(listenerService.listeners, maticchainListener)

heimdallListener := &HeimdallListener{}
heimdallListener.BaseListener = *NewBaseListener(cdc, queueConnector, nil, HeimdallListenerStr, heimdallListener)
heimdallListener.BaseListener = *NewBaseListener(cdc, queueConnector, httpClient, nil, HeimdallListenerStr, heimdallListener)
listenerService.listeners = append(listenerService.listeners, heimdallListener)

return listenerService
Expand Down
12 changes: 9 additions & 3 deletions bridge/setu/processor/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ func (cp *CheckpointProcessor) sendCheckpointToHeimdall(headerBlockStr string) (
// 1. check if i am the current proposer.
// 2. check if this checkpoint has to be submitted to rootchain
// 3. if so, create and broadcast checkpoint transaction to rootchain
func (cp *CheckpointProcessor) sendCheckpointToRootchain(eventBytes string, txHeight int64, txHash string) error {
cp.Logger.Info("Received sendCheckpointToRootchain request", "eventBytes", eventBytes, "txHeight", txHeight, "txHash", txHash)
func (cp *CheckpointProcessor) sendCheckpointToRootchain(eventBytes string, blockHeight int64) error {

cp.Logger.Info("Received sendCheckpointToRootchain request", "eventBytes", eventBytes, "blockHeight", blockHeight)
var event = sdk.StringEvent{}
if err := json.Unmarshal([]byte(eventBytes), &event); err != nil {
cp.Logger.Error("Error unmarshalling event from heimdall", "error", err)
Expand All @@ -171,13 +172,18 @@ func (cp *CheckpointProcessor) sendCheckpointToRootchain(eventBytes string, txHe

var startBlock uint64
var endBlock uint64
var txHash string

for _, attr := range event.Attributes {
if attr.Key == checkpointTypes.AttributeKeyStartBlock {
startBlock, _ = strconv.ParseUint(attr.Value, 10, 64)
}
if attr.Key == checkpointTypes.AttributeKeyEndBlock {
endBlock, _ = strconv.ParseUint(attr.Value, 10, 64)
}
if attr.Key == hmTypes.AttributeKeyTxHash {
txHash = attr.Value
}
}

checkpointContext, err := cp.getCheckpointContext()
Expand All @@ -196,7 +202,7 @@ func (cp *CheckpointProcessor) sendCheckpointToRootchain(eventBytes string, txHe
cp.Logger.Error("Error decoding txHash while sending checkpoint to rootchain", "txHash", txHash, "error", err)
return err
}
if err := cp.createAndSendCheckpointToRootchain(checkpointContext, startBlock, endBlock, txHeight, txHash); err != nil {
if err := cp.createAndSendCheckpointToRootchain(checkpointContext, startBlock, endBlock, blockHeight, txHash); err != nil {
cp.Logger.Error("Error sending checkpoint to rootchain", "error", err)
return err
}
Expand Down
7 changes: 6 additions & 1 deletion bridge/setu/processor/clerk.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (cp *ClerkProcessor) sendStateSyncedToHeimdall(eventName string, logBytes s
"borChainId", chainParams.BorChainID,
"txHash", hmTypes.BytesToHeimdallHash(vLog.TxHash.Bytes()),
"logIndex", uint64(vLog.Index),
"blockNumber", vLog.BlockNumber,
)
return nil
}
Expand All @@ -98,13 +99,17 @@ func (cp *ClerkProcessor) sendStateSyncedToHeimdall(eventName string, logBytes s
"borChainId", chainParams.BorChainID,
"txHash", hmTypes.BytesToHeimdallHash(vLog.TxHash.Bytes()),
"logIndex", uint64(vLog.Index),
"blockNumber", vLog.BlockNumber,
)

msg := clerkTypes.NewMsgEventRecord(
hmTypes.BytesToHeimdallAddress(helper.GetAddress()),
hmTypes.BytesToHeimdallHash(vLog.TxHash.Bytes()),
uint64(vLog.Index),
vLog.BlockNumber,
event.Id.Uint64(),
hmTypes.BytesToHeimdallAddress(event.ContractAddress.Bytes()),
event.Data,
chainParams.BorChainID,
)

Expand All @@ -120,7 +125,7 @@ func (cp *ClerkProcessor) sendStateSyncedToHeimdall(eventName string, logBytes s
// HandleRecordConfirmation - handles clerk record confirmation event from heimdall.
// 1. check if this record has to be broadcasted to maticchain
// 2. create and broadcast record transaction to maticchain
func (cp *ClerkProcessor) sendDepositRecordToMatic(eventBytes string, txHeight int64, txHash string) (err error) {
func (cp *ClerkProcessor) sendDepositRecordToMatic(eventBytes string, blockHeight int64) (err error) {
var event = sdk.StringEvent{}
if err := json.Unmarshal([]byte(eventBytes), &event); err != nil {
cp.Logger.Error("Error unmarshalling event from heimdall", "error", err)
Expand Down
8 changes: 7 additions & 1 deletion bridge/setu/processor/fee.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/maticnetwork/heimdall/helper"
topupTypes "github.com/maticnetwork/heimdall/topup/types"
hmTypes "github.com/maticnetwork/heimdall/types"

sdk "github.com/cosmos/cosmos-sdk/types"
)

// FeeProcessor - process fee related events
Expand Down Expand Up @@ -57,23 +59,27 @@ func (fp *FeeProcessor) sendTopUpFeeToHeimdall(eventName string, logBytes string
fp.Logger.Info("Ignoring task to send topup to heimdall as already processed",
"event", eventName,
"validatorId", event.ValidatorId,
"Signer", event.Signer,
"Fee", event.Fee,
"txHash", hmTypes.BytesToHeimdallHash(vLog.TxHash.Bytes()),
"logIndex", uint64(vLog.Index),
"blockNumber", vLog.BlockNumber,
)
return nil
}

fp.Logger.Info("✅ sending topup to heimdall",
"event", eventName,
"validatorId", event.ValidatorId,
"Signer", event.Signer,
"Fee", event.Fee,
"txHash", hmTypes.BytesToHeimdallHash(vLog.TxHash.Bytes()),
"logIndex", uint64(vLog.Index),
"blockNumber", vLog.BlockNumber,
)

// create msg checkpoint ack message
msg := topupTypes.NewMsgTopup(helper.GetFromAddress(fp.cliCtx), event.ValidatorId.Uint64(), hmTypes.BytesToHeimdallHash(vLog.TxHash.Bytes()), uint64(vLog.Index))
msg := topupTypes.NewMsgTopup(helper.GetFromAddress(fp.cliCtx), event.ValidatorId.Uint64(), hmTypes.BytesToHeimdallAddress(event.Signer.Bytes()), sdk.NewIntFromBigInt(event.Fee), hmTypes.BytesToHeimdallHash(vLog.TxHash.Bytes()), uint64(vLog.Index), vLog.BlockNumber)

// return broadcast to heimdall
if err := fp.txBroadcaster.BroadcastToHeimdall(msg); err != nil {
Expand Down
Loading

0 comments on commit 772b185

Please sign in to comment.