Skip to content

Commit

Permalink
breachabirter+contraccourt: convert ProcessACK to function closure
Browse files Browse the repository at this point in the history
  • Loading branch information
halseth committed May 6, 2021
1 parent 4685341 commit bdc1f31
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 38 deletions.
34 changes: 13 additions & 21 deletions breacharbiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,19 @@ var (

// ContractBreachEvent is an event the breachArbiter will receive in case a
// contract breach is observed on-chain. It contains the necessary information
// to handle the breach, and a ProcessACK channel we will use to ACK the event
// to handle the breach, and a ProcessACK closure we will use to ACK the event
// when we have safely stored all the necessary information.
type ContractBreachEvent struct {
// ChanPoint is the channel point of the breached channel.
ChanPoint wire.OutPoint

// ProcessACK is an error channel where a nil error should be sent
// iff the breach retribution info is safely stored in the retribution
// ProcessACK is an closure that should be called with a nil error iff
// the breach retribution info is safely stored in the retribution
// store. In case storing the information to the store fails, a non-nil
// error should be sent.
ProcessACK chan error
// error should be used. When this closure returns, it means that the
// contract court has marked the channel pending close in the DB, and
// it is safe for the BreachArbiter to carry on its duty.
ProcessACK func(error)

// BreachRetribution is the information needed to act on this contract
// breach.
Expand Down Expand Up @@ -745,10 +747,8 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) {
b.Unlock()
brarLog.Errorf("Unable to check breach info in DB: %v", err)

select {
case breachEvent.ProcessACK <- err:
case <-b.quit:
}
// Notify about the failed lookup and return.
breachEvent.ProcessACK(err)
return
}

Expand All @@ -757,11 +757,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) {
// case we can safely ACK the handoff, and return.
if breached {
b.Unlock()

select {
case breachEvent.ProcessACK <- nil:
case <-b.quit:
}
breachEvent.ProcessACK(nil)
return
}

Expand All @@ -782,14 +778,10 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) {
// acknowledgment back to the close observer with the error. If
// the ack is successful, the close observer will mark the
// channel as pending-closed in the channeldb.
select {
case breachEvent.ProcessACK <- err:
// Bail if we failed to persist retribution info.
if err != nil {
return
}
breachEvent.ProcessACK(err)

case <-b.quit:
// Bail if we failed to persist retribution info.
if err != nil {
return
}

Expand Down
43 changes: 28 additions & 15 deletions breacharbiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,9 +1059,12 @@ func TestBreachHandoffSuccess(t *testing.T) {

// Signal a spend of the funding transaction and wait for the close
// observer to exit.
processACK := make(chan error)
breach := &ContractBreachEvent{
ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1),
ChanPoint: *chanPoint,
ProcessACK: func(brarErr error) {
processACK <- brarErr
},
BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
LocalOutputSignDesc: &input.SignDescriptor{
Expand All @@ -1075,7 +1078,7 @@ func TestBreachHandoffSuccess(t *testing.T) {

// We'll also wait to consume the ACK back from the breach arbiter.
select {
case err := <-breach.ProcessACK:
case err := <-processACK:
if err != nil {
t.Fatalf("handoff failed: %v", err)
}
Expand All @@ -1092,8 +1095,10 @@ func TestBreachHandoffSuccess(t *testing.T) {
// already ACKed, the breach arbiter should immediately ACK and ignore
// this event.
breach = &ContractBreachEvent{
ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1),
ChanPoint: *chanPoint,
ProcessACK: func(brarErr error) {
processACK <- brarErr
},
BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
LocalOutputSignDesc: &input.SignDescriptor{
Expand All @@ -1108,7 +1113,7 @@ func TestBreachHandoffSuccess(t *testing.T) {

// We'll also wait to consume the ACK back from the breach arbiter.
select {
case err := <-breach.ProcessACK:
case err := <-processACK:
if err != nil {
t.Fatalf("handoff failed: %v", err)
}
Expand Down Expand Up @@ -1140,9 +1145,12 @@ func TestBreachHandoffFail(t *testing.T) {
// Signal the notifier to dispatch spend notifications of the funding
// transaction using the transaction from bob's closing summary.
chanPoint := alice.ChanPoint
processACK := make(chan error)
breach := &ContractBreachEvent{
ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1),
ChanPoint: *chanPoint,
ProcessACK: func(brarErr error) {
processACK <- brarErr
},
BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
LocalOutputSignDesc: &input.SignDescriptor{
Expand All @@ -1156,7 +1164,7 @@ func TestBreachHandoffFail(t *testing.T) {

// We'll also wait to consume the ACK back from the breach arbiter.
select {
case err := <-breach.ProcessACK:
case err := <-processACK:
if err == nil {
t.Fatalf("breach write should have failed")
}
Expand All @@ -1181,8 +1189,10 @@ func TestBreachHandoffFail(t *testing.T) {
// Signal a spend of the funding transaction and wait for the close
// observer to exit. This time we are allowing the handoff to succeed.
breach = &ContractBreachEvent{
ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1),
ChanPoint: *chanPoint,
ProcessACK: func(brarErr error) {
processACK <- brarErr
},
BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
LocalOutputSignDesc: &input.SignDescriptor{
Expand All @@ -1196,7 +1206,7 @@ func TestBreachHandoffFail(t *testing.T) {
contractBreaches <- breach

select {
case err := <-breach.ProcessACK:
case err := <-processACK:
if err != nil {
t.Fatalf("handoff failed: %v", err)
}
Expand Down Expand Up @@ -1399,16 +1409,19 @@ func testBreachSpends(t *testing.T, test breachTest) {
t.Fatalf("unable to create breach retribution: %v", err)
}

processACK := make(chan error)
breach := &ContractBreachEvent{
ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1),
ChanPoint: *chanPoint,
ProcessACK: func(brarErr error) {
processACK <- brarErr
},
BreachRetribution: retribution,
}
contractBreaches <- breach

// We'll also wait to consume the ACK back from the breach arbiter.
select {
case err := <-breach.ProcessACK:
case err := <-processACK:
if err != nil {
t.Fatalf("handoff failed: %v", err)
}
Expand Down
17 changes: 15 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,9 +950,22 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
IsOurAddress: cc.Wallet.IsOurAddress,
ContractBreach: func(chanPoint wire.OutPoint,
breachRet *lnwallet.BreachRetribution) error {

// processACK will handle the breachArbiter ACKing the
// event.
finalErr := make(chan error, 1)
processACK := func(brarErr error) {
if brarErr != nil {
finalErr <- brarErr
return
}

finalErr <- nil
}

event := &ContractBreachEvent{
ChanPoint: chanPoint,
ProcessACK: make(chan error, 1),
ProcessACK: processACK,
BreachRetribution: breachRet,
}

Expand All @@ -965,7 +978,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,

// Wait for the breachArbiter to ACK the event.
select {
case err := <-event.ProcessACK:
case err := <-finalErr:
return err
case <-s.quit:
return ErrServerShuttingDown
Expand Down

0 comments on commit bdc1f31

Please sign in to comment.