Skip to content

Commit

Permalink
Changed message receiving channel to handle the entire BroadcastInbox…
Browse files Browse the repository at this point in the history
… message so the clients can get the sequence number. Changed the SyncMessages function to ConfirmedAccumulator function. Added new functionality for the broadcaster to broadcast the confirmedAccumulator message. Wrote corresponding test. Added versioning to the BroadcastMessage. Added functionality for the arb-relay to handle updating its cache. In progress handling for client test to validate the confirmed accumulator is being sent.
  • Loading branch information
geetaristo authored and hkalodner committed May 10, 2021
1 parent 94d7ddf commit 5ca3712
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 40 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"sol_docs:generate": "python docs/sol_contract_docs/generate_docs.py"
},
"engines": {
"node": ">= 8.0.0 < 13.0.0",
"node": ">= 8.0.0 < 16.0.0",
"npm": "^6.0.0",
"yarn": "^1.0.0"
},
Expand Down
24 changes: 10 additions & 14 deletions packages/arb-node-core/cmd/arb-relay/arb-relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,17 @@ func (ar *ArbRelay) Start() {
logger.Error().Err(err).Msg("broadcast client unable to connect")
}

_ = messages
/*
go func() {
for {
select {
case receivedMsgs := <-messages:
for i := range receivedMsgs.Messages {
m := receivedMsgs.Messages[i]
ar.broadcaster.Broadcast(m.BeforeAccumulator, m.InboxMessage, m.Signature)
}
}
confirmedAccumulator := ar.broadcastClient.ConfirmedAccumulatorListener
go func() {
for {
select {
case msg := <-messages:
ar.broadcaster.Broadcast(msg.FeedItem.PrevAcc, msg.FeedItem.BatchItem, msg.Signature)
case ca := <-confirmedAccumulator:
ar.broadcaster.ConfirmedAccumulator(ca)
}
}()
*/
}
}()
}

func (ar *ArbRelay) Stop() {
Expand Down
28 changes: 20 additions & 8 deletions packages/arb-util/broadcastclient/broadcastclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import (
"sync"
"time"

"github.com/offchainlabs/arbitrum/packages/arb-node-core/monitor"

"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/mailru/easygo/netpoll"
"github.com/offchainlabs/arbitrum/packages/arb-util/broadcaster"
"github.com/offchainlabs/arbitrum/packages/arb-util/common"
"github.com/rs/zerolog/log"
)

Expand All @@ -43,6 +42,7 @@ type BroadcastClient struct {
startingBroadcastClientMutex *sync.Mutex
RetryCount int
retrying bool
ConfirmedAccumulatorListener chan common.Hash
}

var logger = log.With().Caller().Str("component", "broadcaster").Logger()
Expand All @@ -60,16 +60,22 @@ func NewBroadcastClient(websocketUrl string, lastInboxSeqNum *big.Int) *Broadcas
return bc
}

func (bc *BroadcastClient) Connect() (chan monitor.SequencerFeedItem, error) {
messageReceiver := make(chan monitor.SequencerFeedItem)
func (bc *BroadcastClient) Connect() (chan broadcaster.BroadcastInboxMessage, error) {
messageReceiver := make(chan broadcaster.BroadcastInboxMessage)
bc.ConfirmedAccumulatorListener = make(chan common.Hash)
return bc.connect(messageReceiver)
}

func (bc *BroadcastClient) connect(messageReceiver chan monitor.SequencerFeedItem) (chan monitor.SequencerFeedItem, error) {
func (bc *BroadcastClient) GetConfirmedAccumulatorListner() chan common.Hash {
return bc.ConfirmedAccumulatorListener
}

func (bc *BroadcastClient) connect(messageReceiver chan broadcaster.BroadcastInboxMessage) (chan broadcaster.BroadcastInboxMessage, error) {
if len(bc.websocketUrl) == 0 {
// Nothing to do
return nil, nil
}

logger.Info().Str("url", bc.websocketUrl).Msg("connecting to arbitrum inbox message broadcaster")
conn, _, _, err := ws.DefaultDialer.Dial(context.Background(), bc.websocketUrl)
if err != nil {
Expand Down Expand Up @@ -124,8 +130,14 @@ func (bc *BroadcastClient) connect(messageReceiver chan monitor.SequencerFeedIte
return
}

for _, message := range res.Messages {
messageReceiver <- message.FeedItem
if res.Version == 1 {
for _, message := range res.Messages {
messageReceiver <- *message
}

if res.ConfirmedAccumulator.IsConfirmed {
bc.ConfirmedAccumulatorListener <- res.ConfirmedAccumulator.Accumulator
}
}
})

Expand Down Expand Up @@ -161,7 +173,7 @@ func (bc *BroadcastClient) Ping() (<-chan string, error) {
return out, nil
}

func (bc *BroadcastClient) RetryConnect(messageReceiver chan monitor.SequencerFeedItem) {
func (bc *BroadcastClient) RetryConnect(messageReceiver chan broadcaster.BroadcastInboxMessage) {
MaxWaitMs := 15000
waitMs := 500
bc.retrying = true
Expand Down
8 changes: 6 additions & 2 deletions packages/arb-util/broadcastclient/broadcastclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/offchainlabs/arbitrum/packages/arb-util/broadcaster"
"github.com/offchainlabs/arbitrum/packages/arb-util/inbox"
)

func TestBroadcastClientConnectsAndReceivesMessages(t *testing.T) {
Expand Down Expand Up @@ -51,15 +50,20 @@ func makeBroadcastClient(t *testing.T, expectedCount int, wg *sync.WaitGroup) {
t.Errorf("Can not connect: %v\n", err)
}

accList := broadcastClient.GetConfirmedAccumulatorListner()

for {
select {
case receivedMsg := <-messageReceiver:
t.Logf("Received Message, Sequence Number: %v\n", inbox.GetSequenceNumber(receivedMsg.BatchItem.SequencerMessage))
t.Logf("Received Message, Sequence Message: %v\n", receivedMsg.FeedItem.BatchItem.SequencerMessage)
messageCount++

if messageCount == expectedCount {
broadcastClient.Close()
return
}
case confirmedAccumulator := <-accList:
t.Logf("Received confirmedAccumulator, Sequence Message: %v\n", confirmedAccumulator.ShortString())
}
}

Expand Down
7 changes: 4 additions & 3 deletions packages/arb-util/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package broadcaster

import (
"github.com/offchainlabs/arbitrum/packages/arb-util/inbox"
"net"
"sync"
"time"

"github.com/offchainlabs/arbitrum/packages/arb-util/inbox"

"github.com/gobwas/ws"
"github.com/gobwas/ws-examples/src/gopool"
"github.com/mailru/easygo/netpoll"
Expand Down Expand Up @@ -205,8 +206,8 @@ func (b *Broadcaster) Broadcast(prevAcc common.Hash, batchItem inbox.SequencerBa
return b.clientManager.Broadcast(prevAcc, batchItem, signature)
}

func (b *Broadcaster) SyncMessages(accumulator common.Hash) {
b.clientManager.syncMessages(accumulator)
func (b *Broadcaster) ConfirmedAccumulator(accumulator common.Hash) {
b.clientManager.confirmedAccumulator(accumulator)
}

func (b *Broadcaster) messageCacheCount() int {
Expand Down
124 changes: 120 additions & 4 deletions packages/arb-util/broadcaster/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/mailru/easygo/netpoll"
"github.com/offchainlabs/arbitrum/packages/arb-util/common"
)

func TestBroadcasterSendsCachedMessagesOnClientConnect(t *testing.T) {
Expand Down Expand Up @@ -51,15 +53,20 @@ func TestBroadcasterSendsCachedMessagesOnClientConnect(t *testing.T) {

wg.Wait()

b.SyncMessages(feedItem1.BatchItem.Accumulator) // remove the first message we generated
if b.messageCacheCount() != 1 { // should have left the second message
// give the above connections time to reconnect
time.Sleep(2 * time.Second)

// Confirmed Accumulator will also broadcast to the clients.
b.ConfirmedAccumulator(feedItem1.BatchItem.Accumulator) // remove the first message we generated
if b.messageCacheCount() != 1 { // should have left the second message
t.Errorf("1. Failed to clear cached inbox message. MessageCacheCount: %v", b.messageCacheCount())
}

b.SyncMessages(feedItem2.BatchItem.Accumulator) // remove the second message we generated
if b.messageCacheCount() != 0 { // should have emptied.
b.ConfirmedAccumulator(feedItem2.BatchItem.Accumulator) // remove the second message we generated
if b.messageCacheCount() != 0 { // should have emptied.
t.Errorf("2. Failed to clear cached inbox message. MessageCacheCount: %v", b.messageCacheCount())
}

}

func connectAndGetCachedMessages(t *testing.T, i int, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -102,6 +109,115 @@ func connectAndGetCachedMessages(t *testing.T, i int, wg *sync.WaitGroup) {
}
}

func TestBroadcasterSendsConfirmedAccumulatorMessages(t *testing.T) {
broadcasterSettings := Settings{
Addr: ":9642",
Workers: 128,
Queue: 1,
IoTimeout: 2 * time.Second,
}

b := NewBroadcaster(broadcasterSettings)

err := b.Start()
if err != nil {
t.Fatal(err)
}
defer b.Stop()

newBroadcastMessage := SequencedMessages()

_, feedItem, _ := newBroadcastMessage()
time.Sleep(1 * time.Second)

accumulatorConfirmed := make(chan common.Hash)
var wg sync.WaitGroup
wg.Add(1)
go receivedConfirmedAccumulator(t, &wg, accumulatorConfirmed)

time.Sleep(2 * time.Second)

// Confirmed Accumulator will also broadcast to the clients.
b.ConfirmedAccumulator(feedItem.BatchItem.Accumulator) // remove the first message we generated

acc := <-accumulatorConfirmed
if acc != feedItem.BatchItem.Accumulator {
t.Error("Did not receive expected accumultaor")
}

wg.Wait()
}

func receivedConfirmedAccumulator(t *testing.T, wg *sync.WaitGroup, accumulatorConfirmed chan common.Hash) {

confirmedAccumulatorReceived := 0
conn, _, _, err := ws.DefaultDialer.Dial(context.Background(), "ws://127.0.0.1:9642/")
if err != nil {
t.Errorf("Can not connect: %v\n", err)
return
}

poller, err := netpoll.New(nil)
if err != nil {
t.Error("error starting net poller")
return
}

desc, err := netpoll.HandleRead(conn)
if err != nil {
t.Error("error getting netpoll descriptor")
return
}

_ = poller.Start(desc, func(ev netpoll.Event) {
if ev&netpoll.EventReadHup != 0 {
t.Error("received hang up")
_ = poller.Stop(desc)
_ = conn.Close()
wg.Done()
return
}

msg, _, err := wsutil.ReadServerData(conn)
if err != nil {
t.Error("error calling ReadServerData")
_ = poller.Stop(desc)
_ = conn.Close()
wg.Done()
return
}

res := BroadcastMessage{}
err = json.Unmarshal(msg, &res)
if err != nil {
logger.Error().Err(err).Msg("error unmarshalling message")
_ = poller.Stop(desc)
_ = conn.Close()
wg.Done()

return
}

if res.Version != 1 {
t.Error("This is not version 1")
}

if res.ConfirmedAccumulator.IsConfirmed {
confirmedAccumulatorReceived++
accumulatorConfirmed <- res.ConfirmedAccumulator.Accumulator
}

if confirmedAccumulatorReceived == 1 { // this gets called twice from the test
_ = poller.Stop(desc)
_ = conn.Close()
wg.Done()
return
}

})

}

func TestBroadcasterRespondsToPing(t *testing.T) {
broadcasterSettings := Settings{
Addr: ":9643",
Expand Down
34 changes: 27 additions & 7 deletions packages/arb-util/broadcaster/clientmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package broadcaster
import (
"bytes"
"encoding/json"
"github.com/offchainlabs/arbitrum/packages/arb-util/inbox"
"math/rand"
"net"
"sort"
"strconv"
"sync"

"github.com/offchainlabs/arbitrum/packages/arb-util/inbox"

"github.com/gobwas/ws"
"github.com/gobwas/ws-examples/src/gopool"
"github.com/gobwas/ws/wsutil"
Expand Down Expand Up @@ -63,8 +64,7 @@ func (cm *ClientManager) Register(conn net.Conn, desc *netpoll.Desc) *ClientConn

if len(cm.broadcastMessages) > 0 {
// send the newly connected client all the messages we've got...
bm := BroadcastMessage{}
bm.Messages = cm.broadcastMessages
bm := BroadcastMessage{Version: 1, Messages: cm.broadcastMessages}

_ = clientConnection.write(bm)
}
Expand Down Expand Up @@ -98,8 +98,8 @@ func (cm *ClientManager) Remove(clientConnection *ClientConnection) {
cm.mu.Unlock()
}

// SyncSequence clears out everything prior to finding the matching accumulator
func (cm *ClientManager) syncMessages(accumulator common.Hash) {
// ConfirmedAccumulator clears out everything prior to finding the matching accumulator
func (cm *ClientManager) confirmedAccumulator(accumulator common.Hash) error {
cm.mu.Lock()
defer cm.mu.Unlock()

Expand All @@ -116,6 +116,27 @@ func (cm *ClientManager) syncMessages(accumulator common.Hash) {
}

cm.broadcastMessages = broadcastMessages

bm := BroadcastMessage{Version: 1}
bm.ConfirmedAccumulator = ConfirmedAccumulator{
IsConfirmed: true,
Accumulator: accumulator,
}

var buf bytes.Buffer
w := wsutil.NewWriter(&buf, ws.StateServerSide, ws.OpText)
encoder := json.NewEncoder(w)
if err := encoder.Encode(bm); err != nil {
return err
}

if err := w.Flush(); err != nil {
return err
}

cm.out <- buf.Bytes()

return nil
}

// Broadcast sends message to all clients.
Expand Down Expand Up @@ -161,8 +182,7 @@ func (cm *ClientManager) Broadcast(prevAcc common.Hash, batchItem inbox.Sequence

}

bm := BroadcastMessage{}
bm.Messages = broadcastMessages
bm := BroadcastMessage{Version: 1, Messages: broadcastMessages}

if err := encoder.Encode(bm); err != nil {
return err
Expand Down
Loading

0 comments on commit 5ca3712

Please sign in to comment.