Skip to content

Commit

Permalink
clocks: tagged deadlines (algorand#5649)
Browse files Browse the repository at this point in the history
Co-authored-by: Nickolai Zeldovich <[email protected]>
Co-authored-by: chris erway <[email protected]>
Co-authored-by: cce <[email protected]>
Co-authored-by: Pavel Zbitskiy <[email protected]>
  • Loading branch information
5 people authored Aug 18, 2023
1 parent 93161f4 commit 9f74a55
Show file tree
Hide file tree
Showing 19 changed files with 650 additions and 243 deletions.
9 changes: 4 additions & 5 deletions agreement/agreementtest/simulate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/db"
"github.com/algorand/go-algorand/util/timers"
)
Expand All @@ -52,15 +51,15 @@ func makeInstant() *instant {
return i
}

func (i *instant) Decode([]byte) (timers.Clock, error) {
func (i *instant) Decode([]byte) (timers.Clock[agreement.TimeoutType], error) {
return i, nil
}

func (i *instant) Encode() []byte {
return nil
}

func (i *instant) TimeoutAt(d time.Duration) <-chan time.Time {
func (i *instant) TimeoutAt(d time.Duration, timeoutType agreement.TimeoutType) <-chan time.Time {
ta := make(chan time.Time)
select {
case <-i.timeoutAtCalled:
Expand All @@ -69,13 +68,13 @@ func (i *instant) TimeoutAt(d time.Duration) <-chan time.Time {
return ta
}

if d == agreement.FilterTimeout(0, protocol.ConsensusCurrentVersion) && !i.HasPending("pseudonode") {
if timeoutType == agreement.TimeoutFilter && !i.HasPending("pseudonode") {
close(ta)
}
return ta
}

func (i *instant) Zero() timers.Clock {
func (i *instant) Zero() timers.Clock[agreement.TimeoutType] {
i.Z0 <- struct{}{}
// pause here until runRound is called
i.Z1 <- struct{}{}
Expand Down
7 changes: 3 additions & 4 deletions agreement/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package agreement
import (
"context"
"fmt"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
Expand Down Expand Up @@ -190,7 +189,7 @@ func (d *demux) verifyBundle(ctx context.Context, m message, r round, p period,
// next blocks until it observes an external input event of interest for the state machine.
//
// If ok is false, there are no more events so the agreement service should quit.
func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Duration, currentRound round) (e externalEvent, ok bool) {
func (d *demux) next(s *Service, deadline Deadline, fastDeadline Deadline, currentRound round) (e externalEvent, ok bool) {
defer func() {
if !ok {
return
Expand Down Expand Up @@ -250,8 +249,8 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat
}

ledgerNextRoundCh := s.Ledger.Wait(nextRound)
deadlineCh := s.Clock.TimeoutAt(deadline)
fastDeadlineCh := s.Clock.TimeoutAt(fastDeadline)
deadlineCh := s.Clock.TimeoutAt(deadline.Duration, deadline.Type)
fastDeadlineCh := s.Clock.TimeoutAt(fastDeadline.Duration, fastDeadline.Type)

d.UpdateEventsQueue(eventQueueDemux, 0)
d.monitor.dec(demuxCoserviceType)
Expand Down
10 changes: 5 additions & 5 deletions agreement/demux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,14 +422,14 @@ func TestDemuxNext(t *testing.T) {
}

// implement timers.Clock
func (t *demuxTester) Zero() timers.Clock {
func (t *demuxTester) Zero() timers.Clock[TimeoutType] {
// we don't care about this function in this test.
return t
}

// implement timers.Clock
func (t *demuxTester) TimeoutAt(delta time.Duration) <-chan time.Time {
if delta == fastTimeoutChTime {
func (t *demuxTester) TimeoutAt(delta time.Duration, timeoutType TimeoutType) <-chan time.Time {
if timeoutType == TimeoutFastRecovery {
return nil
}

Expand All @@ -450,7 +450,7 @@ func (t *demuxTester) Encode() []byte {
}

// implement timers.Clock
func (t *demuxTester) Decode([]byte) (timers.Clock, error) {
func (t *demuxTester) Decode([]byte) (timers.Clock[TimeoutType], error) {
// we don't care about this function in this test.
return t, nil
}
Expand Down Expand Up @@ -675,7 +675,7 @@ func (t *demuxTester) TestUsecase(testcase demuxTestUsecase) bool {
close(s.quit)
}

e, ok := dmx.next(s, time.Second, fastTimeoutChTime, 300)
e, ok := dmx.next(s, Deadline{Duration: time.Second, Type: TimeoutDeadline}, Deadline{Duration: fastTimeoutChTime, Type: TimeoutFastRecovery}, 300)

if !assert.Equal(t, testcase.ok, ok) {
return false
Expand Down
4 changes: 2 additions & 2 deletions agreement/fuzzer/fuzzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Fuzzer struct {
wallClock int32
agreements []*agreement.Service
facades []*NetworkFacade
clocks []timers.Clock
clocks []timers.Clock[agreement.TimeoutType]
disconnected [][]bool
crashAccessors []db.Accessor
router *Router
Expand Down Expand Up @@ -80,7 +80,7 @@ func MakeFuzzer(config FuzzerConfig) *Fuzzer {
networkName: config.FuzzerName,
agreements: make([]*agreement.Service, config.NodesCount),
facades: make([]*NetworkFacade, config.NodesCount),
clocks: make([]timers.Clock, config.NodesCount),
clocks: make([]timers.Clock[agreement.TimeoutType], config.NodesCount),
disconnected: make([][]bool, config.NodesCount),
crashAccessors: make([]db.Accessor, config.NodesCount),
accounts: make([]account.Participation, config.NodesCount),
Expand Down
9 changes: 5 additions & 4 deletions agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/timers"
Expand All @@ -48,7 +49,7 @@ type NetworkFacadeMessage struct {
type NetworkFacade struct {
network.GossipNode
NetworkFilter
timers.Clock
timers.Clock[agreement.TimeoutType]
nodeID int
mux *network.Multiplexer
fuzzer *Fuzzer
Expand Down Expand Up @@ -345,7 +346,7 @@ func (n *NetworkFacade) Disconnect(sender network.Peer) {
n.fuzzer.Disconnect(n.nodeID, sourceNode)
}

func (n *NetworkFacade) Zero() timers.Clock {
func (n *NetworkFacade) Zero() timers.Clock[agreement.TimeoutType] {
n.clockSync.Lock()
defer n.clockSync.Unlock()

Expand Down Expand Up @@ -375,7 +376,7 @@ func (n *NetworkFacade) Rezero() {
// Since implements the Clock interface.
func (n *NetworkFacade) Since() time.Duration { return 0 }

func (n *NetworkFacade) TimeoutAt(d time.Duration) <-chan time.Time {
func (n *NetworkFacade) TimeoutAt(d time.Duration, timeoutType agreement.TimeoutType) <-chan time.Time {
defer n.timeoutAtInitOnce.Do(func() {
n.timeoutAtInitWait.Done()
})
Expand Down Expand Up @@ -414,7 +415,7 @@ func (n *NetworkFacade) Encode() []byte {
return buf.Bytes()
}

func (n *NetworkFacade) Decode(in []byte) (timers.Clock, error) {
func (n *NetworkFacade) Decode(in []byte) (timers.Clock[agreement.TimeoutType], error) {
n.clockSync.Lock()
defer n.clockSync.Unlock()

Expand Down
Loading

0 comments on commit 9f74a55

Please sign in to comment.