Skip to content

Commit

Permalink
Simplify consensus context atomic values (ava-labs#2473)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Jan 13, 2023
1 parent 166fe00 commit 92690ac
Show file tree
Hide file tree
Showing 46 changed files with 299 additions and 367 deletions.
6 changes: 3 additions & 3 deletions api/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ func TestPassingChecks(t *testing.T) {
func TestPassingThenFailingChecks(t *testing.T) {
require := require.New(t)

var shouldCheckErr utils.AtomicBool
var shouldCheckErr utils.Atomic[bool]
check := CheckerFunc(func(context.Context) (interface{}, error) {
if shouldCheckErr.GetValue() {
if shouldCheckErr.Get() {
return errUnhealthy.Error(), errUnhealthy
}
return "", nil
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestPassingThenFailingChecks(t *testing.T) {
require.True(liveness)
}

shouldCheckErr.SetValue(true)
shouldCheckErr.Set(true)

awaitHealthy(h, false)
awaitLiveness(h, false)
Expand Down
8 changes: 4 additions & 4 deletions api/health/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ func (w *worker) RegisterCheck(name string, checker Checker) error {
}

func (w *worker) RegisterMonotonicCheck(name string, checker Checker) error {
var result utils.AtomicInterface
return w.RegisterCheck(name, CheckerFunc(func(ctx context.Context) (interface{}, error) {
details := result.GetValue()
var result utils.Atomic[any]
return w.RegisterCheck(name, CheckerFunc(func(ctx context.Context) (any, error) {
details := result.Get()
if details != nil {
return details, nil
}

details, err := checker.HealthCheck(ctx)
if err == nil {
result.SetValue(details)
result.Set(details)
}
return details, err
}))
Expand Down
2 changes: 1 addition & 1 deletion api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func lockMiddleware(
// not done state-syncing/bootstrapping, writes back an error.
func rejectMiddleware(handler http.Handler, ctx *snow.ConsensusContext) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // If chain isn't done bootstrapping, ignore API calls
if ctx.GetState() != snow.NormalOp {
if ctx.State.Get() != snow.NormalOp {
w.WriteHeader(http.StatusServiceUnavailable)
// Doesn't matter if there's an error while writing. They'll get the StatusServiceUnavailable code.
_, _ = w.Write([]byte("API call rejected because chain is not done bootstrapping"))
Expand Down
9 changes: 2 additions & 7 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,14 +454,9 @@ func (m *manager) buildChain(chainParams ChainParameters, sb Subnet) (*chain, er
ConsensusAcceptor: m.ConsensusAcceptorGroup,
Registerer: consensusMetrics,
}
// We set the state to Initializing here because failing to set the state
// before it's first access would cause a panic.
ctx.SetState(snow.Initializing)

if subnetConfig, ok := m.SubnetConfigs[chainParams.SubnetID]; ok {
if subnetConfig.ValidatorOnly {
ctx.SetValidatorOnly()
}
ctx.ValidatorOnly.Set(subnetConfig.ValidatorOnly)
}

// Get a factory for the vm we want to use on our chain
Expand Down Expand Up @@ -1095,7 +1090,7 @@ func (m *manager) IsBootstrapped(id ids.ID) bool {
return false
}

return chain.Context().GetState() == snow.NormalOp
return chain.Context().State.Get() == snow.NormalOp
}

func (m *manager) subnetsNotBootstrapped() []ids.ID {
Expand Down
8 changes: 4 additions & 4 deletions database/leveldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Database struct {
// metrics is only initialized and used when [MetricUpdateFrequency] is >= 0
// in the config
metrics metrics
closed utils.AtomicBool
closed utils.Atomic[bool]
closeOnce sync.Once
// closeCh is closed when Close() is called.
closeCh chan struct{}
Expand Down Expand Up @@ -351,7 +351,7 @@ func (db *Database) Compact(start []byte, limit []byte) error {
}

func (db *Database) Close() error {
db.closed.SetValue(true)
db.closed.Set(true)
db.closeOnce.Do(func() {
close(db.closeCh)
})
Expand All @@ -360,7 +360,7 @@ func (db *Database) Close() error {
}

func (db *Database) HealthCheck(context.Context) (interface{}, error) {
if db.closed.GetValue() {
if db.closed.Get() {
return nil, database.ErrClosed
}
return nil, nil
Expand Down Expand Up @@ -447,7 +447,7 @@ type iter struct {

func (it *iter) Next() bool {
// Short-circuit and set an error if the underlying database has been closed.
if it.db.closed.GetValue() {
if it.db.closed.Get() {
it.key = nil
it.val = nil
it.err = database.ErrClosed
Expand Down
6 changes: 3 additions & 3 deletions database/rpcdb/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
type DatabaseClient struct {
client rpcdbpb.DatabaseClient

closed utils.AtomicBool
closed utils.Atomic[bool]
}

// NewClient returns a database instance connected to a remote database instance
Expand Down Expand Up @@ -127,7 +127,7 @@ func (db *DatabaseClient) Compact(start, limit []byte) error {

// Close attempts to close the database
func (db *DatabaseClient) Close() error {
db.closed.SetValue(true)
db.closed.Set(true)
resp, err := db.client.Close(context.Background(), &rpcdbpb.CloseRequest{})
if err != nil {
return err
Expand Down Expand Up @@ -239,7 +239,7 @@ type iterator struct {
// Next attempts to move the iterator to the next element and returns if this
// succeeded
func (it *iterator) Next() bool {
if it.db.closed.GetValue() {
if it.db.closed.Get() {
it.data = nil
it.errs.Add(database.ErrClosed)
return false
Expand Down
20 changes: 10 additions & 10 deletions network/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ type peer struct {
// True if this peer has sent us a valid Version message and
// is running a compatible version.
// Only modified on the connection's reader routine.
gotVersion utils.AtomicBool
gotVersion utils.Atomic[bool]

// True if the peer:
// * Has sent us a Version message
// * Has sent us a PeerList message
// * Is running a compatible version
// Only modified on the connection's reader routine.
finishedHandshake utils.AtomicBool
finishedHandshake utils.Atomic[bool]

// onFinishHandshake is closed when the peer finishes the p2p handshake.
onFinishHandshake chan struct{}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (p *peer) LastReceived() time.Time {
}

func (p *peer) Ready() bool {
return p.finishedHandshake.GetValue()
return p.finishedHandshake.Get()
}

func (p *peer) AwaitReady(ctx context.Context) error {
Expand Down Expand Up @@ -638,7 +638,7 @@ func (p *peer) sendNetworkMessages() {
return
}

if p.finishedHandshake.GetValue() {
if p.finishedHandshake.Get() {
if err := p.VersionCompatibility.Compatible(p.version); err != nil {
p.Log.Debug("disconnecting from peer",
zap.String("reason", "version not compatible"),
Expand Down Expand Up @@ -689,7 +689,7 @@ func (p *peer) handle(msg message.InboundMessage) {
msg.OnFinishedHandling()
return
}
if !p.finishedHandshake.GetValue() {
if !p.finishedHandshake.Get() {
p.Log.Debug(
"dropping message",
zap.String("reason", "handshake isn't finished"),
Expand Down Expand Up @@ -794,7 +794,7 @@ func (p *peer) observeUptime(subnetID ids.ID, uptime uint32) {
}

func (p *peer) handleVersion(msg *p2p.Version) {
if p.gotVersion.GetValue() {
if p.gotVersion.Get() {
// TODO: this should never happen, should we close the connection here?
p.Log.Verbo("dropping duplicated version message",
zap.Stringer("nodeID", p.id),
Expand Down Expand Up @@ -926,7 +926,7 @@ func (p *peer) handleVersion(msg *p2p.Version) {
return
}

p.gotVersion.SetValue(true)
p.gotVersion.Set(true)

peerIPs, err := p.Network.Peers(p.id)
if err != nil {
Expand Down Expand Up @@ -957,13 +957,13 @@ func (p *peer) handleVersion(msg *p2p.Version) {
}

func (p *peer) handlePeerList(msg *p2p.PeerList) {
if !p.finishedHandshake.GetValue() {
if !p.gotVersion.GetValue() {
if !p.finishedHandshake.Get() {
if !p.gotVersion.Get() {
return
}

p.Network.Connected(p.id)
p.finishedHandshake.SetValue(true)
p.finishedHandshake.Set(true)
close(p.onFinishHandshake)
}

Expand Down
19 changes: 8 additions & 11 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ type Node struct {
shutdownOnce sync.Once

// True if node is shutting down or is done shutting down
shuttingDown utils.AtomicBool
shuttingDown utils.Atomic[bool]

// Sets the exit code
shuttingDownExitCode utils.AtomicInterface
shuttingDownExitCode utils.Atomic[int]

// Incremented only once on initialization.
// Decremented when node is done shutting down.
Expand Down Expand Up @@ -285,7 +285,7 @@ func (n *Node) initNetworking(primaryNetVdrs validators.Set) error {
// shutdown.
timer := timer.NewTimer(func() {
// If the timeout fires and we're already shutting down, nothing to do.
if !n.shuttingDown.GetValue() {
if !n.shuttingDown.Get() {
n.Log.Warn("failed to connect to bootstrap nodes",
zap.Stringer("beacons", n.beacons),
zap.Duration("duration", n.Config.BootstrapBeaconConnectionTimeout),
Expand Down Expand Up @@ -362,7 +362,7 @@ func (n *Node) Dispatch() error {
// When [n].Shutdown() is called, [n.APIServer].Close() is called.
// This causes [n.APIServer].Dispatch() to return an error.
// If that happened, don't log/return an error here.
if !n.shuttingDown.GetValue() {
if !n.shuttingDown.Get() {
n.Log.Fatal("API server dispatch failed",
zap.Error(err),
)
Expand Down Expand Up @@ -1338,10 +1338,10 @@ func (n *Node) Initialize(
// Shutdown this node
// May be called multiple times
func (n *Node) Shutdown(exitCode int) {
if !n.shuttingDown.GetValue() { // only set the exit code once
n.shuttingDownExitCode.SetValue(exitCode)
if !n.shuttingDown.Get() { // only set the exit code once
n.shuttingDownExitCode.Set(exitCode)
}
n.shuttingDown.SetValue(true)
n.shuttingDown.Set(true)
n.shutdownOnce.Do(n.shutdown)
}

Expand Down Expand Up @@ -1425,8 +1425,5 @@ func (n *Node) shutdown() {
}

func (n *Node) ExitCode() int {
if exitCode, ok := n.shuttingDownExitCode.GetValue().(int); ok {
return exitCode
}
return 0
return n.shuttingDownExitCode.Get()
}
48 changes: 5 additions & 43 deletions snow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,55 +75,17 @@ type ConsensusContext struct {
// accepted.
ConsensusAcceptor Acceptor

// Non-zero iff this chain bootstrapped.
state utils.AtomicInterface
// State indicates the current state of this consensus instance.
State utils.Atomic[State]

// True iff this chain is executing transactions as part of bootstrapping.
executing utils.AtomicBool
Executing utils.Atomic[bool]

// True iff this chain is currently state-syncing
stateSyncing utils.AtomicBool
StateSyncing utils.Atomic[bool]

// Indicates this chain is available to only validators.
validatorOnly utils.AtomicBool
}

func (ctx *ConsensusContext) SetState(newState State) {
ctx.state.SetValue(newState)
}

func (ctx *ConsensusContext) GetState() State {
stateInf := ctx.state.GetValue()
return stateInf.(State)
}

// IsExecuting returns true iff this chain is still executing transactions.
func (ctx *ConsensusContext) IsExecuting() bool {
return ctx.executing.GetValue()
}

// Executing marks this chain as executing or not.
// Set to "true" if there's an ongoing transaction.
func (ctx *ConsensusContext) Executing(b bool) {
ctx.executing.SetValue(b)
}

func (ctx *ConsensusContext) IsRunningStateSync() bool {
return ctx.stateSyncing.GetValue()
}

func (ctx *ConsensusContext) RunningStateSync(b bool) {
ctx.stateSyncing.SetValue(b)
}

// IsValidatorOnly returns true iff this chain is available only to validators
func (ctx *ConsensusContext) IsValidatorOnly() bool {
return ctx.validatorOnly.GetValue()
}

// SetValidatorOnly marks this chain as available only to validators
func (ctx *ConsensusContext) SetValidatorOnly() {
ctx.validatorOnly.SetValue(true)
ValidatorOnly utils.Atomic[bool]
}

func DefaultContextTest() *Context {
Expand Down
2 changes: 1 addition & 1 deletion snow/engine/avalanche/bootstrap/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (*bootstrapper) Notify(context.Context, common.Message) error {
func (b *bootstrapper) Start(ctx context.Context, startReqID uint32) error {
b.Ctx.Log.Info("starting bootstrap")

b.Ctx.SetState(snow.Bootstrapping)
b.Ctx.State.Set(snow.Bootstrapping)
if err := b.VM.SetState(ctx, snow.Bootstrapping); err != nil {
return fmt.Errorf("failed to notify VM that bootstrapping has started: %w",
err)
Expand Down
Loading

0 comments on commit 92690ac

Please sign in to comment.