Skip to content

Commit

Permalink
Ensure chain notifier is started before accessed.
Browse files Browse the repository at this point in the history
The use case comes from the RPC layer that is ready before the
chain notifier which is used in the sub server.
  • Loading branch information
roeierez authored and Roasbeef committed May 19, 2020
1 parent 470537e commit e52982f
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 0 deletions.
10 changes: 10 additions & 0 deletions chainntnfs/bitcoindnotify/bitcoind.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type BitcoindNotifier struct {
epochClientCounter uint64 // To be used atomically.

start sync.Once
active int32 // To be used atomically.
stopped int32 // To be used atomically.

chainConn *chain.BitcoindClient
Expand Down Expand Up @@ -130,6 +131,11 @@ func (b *BitcoindNotifier) Stop() error {
return nil
}

// Started returns true if this instance has been started, and false otherwise.
func (b *BitcoindNotifier) Started() bool {
return atomic.LoadInt32(&b.active) != 0
}

func (b *BitcoindNotifier) startNotifier() error {
// Connect to bitcoind, and register for notifications on connected,
// and disconnected blocks.
Expand Down Expand Up @@ -158,6 +164,10 @@ func (b *BitcoindNotifier) startNotifier() error {
b.wg.Add(1)
go b.notificationDispatcher()

// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&b.active, 1)

return nil
}

Expand Down
10 changes: 10 additions & 0 deletions chainntnfs/btcdnotify/btcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type BtcdNotifier struct {
epochClientCounter uint64 // To be used atomically.

start sync.Once
active int32 // To be used atomically.
stopped int32 // To be used atomically.

chainConn *rpcclient.Client
Expand Down Expand Up @@ -141,6 +142,11 @@ func (b *BtcdNotifier) Start() error {
return startErr
}

// Started returns true if this instance has been started, and false otherwise.
func (b *BtcdNotifier) Started() bool {
return atomic.LoadInt32(&b.active) != 0
}

// Stop shutsdown the BtcdNotifier.
func (b *BtcdNotifier) Stop() error {
// Already shutting down?
Expand Down Expand Up @@ -212,6 +218,10 @@ func (b *BtcdNotifier) startNotifier() error {
b.wg.Add(1)
go b.notificationDispatcher()

// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&b.active, 1)

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions chainntnfs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ type ChainNotifier interface {
// ready, and able to receive notification registrations from clients.
Start() error

// Started returns true if this instance has been started, and false otherwise.
Started() bool

// Stops the concrete ChainNotifier. Once stopped, the ChainNotifier
// should disallow any future requests from potential clients.
// Additionally, all pending client notifications will be canceled
Expand Down
10 changes: 10 additions & 0 deletions chainntnfs/neutrinonotify/neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type NeutrinoNotifier struct {
epochClientCounter uint64 // To be used atomically.

start sync.Once
active int32 // To be used atomically.
stopped int32 // To be used atomically.

bestBlockMtx sync.RWMutex
Expand Down Expand Up @@ -144,6 +145,11 @@ func (n *NeutrinoNotifier) Stop() error {
return nil
}

// Started returns true if this instance has been started, and false otherwise.
func (n *NeutrinoNotifier) Started() bool {
return atomic.LoadInt32(&n.active) != 0
}

func (n *NeutrinoNotifier) startNotifier() error {
// Start our concurrent queues before starting the rescan, to ensure
// onFilteredBlockConnected and onRelavantTx callbacks won't be
Expand Down Expand Up @@ -200,6 +206,10 @@ func (n *NeutrinoNotifier) startNotifier() error {
n.wg.Add(1)
go n.notificationDispatcher()

// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&n.active, 1)

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions contractcourt/chain_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (m *mockNotifier) Start() error {
return nil
}

func (m *mockNotifier) Started() bool {
return true
}

func (m *mockNotifier) Stop() error {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ func (m *mockNotifier) Start() error {
return nil
}

func (m *mockNotifier) Started() bool {
return true
}

func (m *mockNotifier) Stop() error {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions fundingmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (m *mockNotifier) Start() error {
return nil
}

func (m *mockNotifier) Started() bool {
return true
}

func (m *mockNotifier) Stop() error {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions htlcswitch/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,10 @@ func (m *mockNotifier) Start() error {
return nil
}

func (m *mockNotifier) Started() bool {
return true
}

func (m *mockNotifier) Stop() error {
return nil
}
Expand Down
17 changes: 17 additions & 0 deletions lnrpc/chainrpc/chainnotifier_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ var (
// has been shut down.
ErrChainNotifierServerShuttingDown = errors.New("chain notifier RPC " +
"subserver shutting down")

// ErrChainNotifierServerNotActive indicates that the chain notifier hasn't
// finished the startup process.
ErrChainNotifierServerNotActive = errors.New("chain notifier RPC is" +
"still in the process of starting")
)

// fileExists reports whether the named file or directory exists.
Expand Down Expand Up @@ -196,6 +201,10 @@ func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error {
func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest,
confStream ChainNotifier_RegisterConfirmationsNtfnServer) error {

if !s.cfg.ChainNotifier.Started() {
return ErrChainNotifierServerNotActive
}

// We'll start by reconstructing the RPC request into what the
// underlying ChainNotifier expects.
var txid chainhash.Hash
Expand Down Expand Up @@ -292,6 +301,10 @@ func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest,
func (s *Server) RegisterSpendNtfn(in *SpendRequest,
spendStream ChainNotifier_RegisterSpendNtfnServer) error {

if !s.cfg.ChainNotifier.Started() {
return ErrChainNotifierServerNotActive
}

// We'll start by reconstructing the RPC request into what the
// underlying ChainNotifier expects.
var op *wire.OutPoint
Expand Down Expand Up @@ -399,6 +412,10 @@ func (s *Server) RegisterSpendNtfn(in *SpendRequest,
func (s *Server) RegisterBlockEpochNtfn(in *BlockEpoch,
epochStream ChainNotifier_RegisterBlockEpochNtfnServer) error {

if !s.cfg.ChainNotifier.Started() {
return ErrChainNotifierServerNotActive
}

// We'll start by reconstructing the RPC request into what the
// underlying ChainNotifier expects.
var hash chainhash.Hash
Expand Down
4 changes: 4 additions & 0 deletions mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ func (m *mockNotfier) Start() error {
return nil
}

func (m *mockNotfier) Started() bool {
return true
}

func (m *mockNotfier) Stop() error {
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions sweep/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (m *MockNotifier) Start() error {
return nil
}

// Started checks if started
func (m *MockNotifier) Started() bool {
return true
}

// Stop the notifier.
func (m *MockNotifier) Stop() error {
return nil
Expand Down

0 comments on commit e52982f

Please sign in to comment.