Skip to content

Commit

Permalink
query: exit if all queries are answered after receiving a response
Browse files Browse the repository at this point in the history
Doing so prevents us from waiting until the query's timeout is reached
if we happened to receive the last response in order to satisfy all of
our in-flight queries.
  • Loading branch information
wpaulino authored and cpacia committed Aug 21, 2019
1 parent 17b4768 commit 39d89fd
Showing 1 changed file with 32 additions and 15 deletions.
47 changes: 32 additions & 15 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,25 @@ func queryChainServiceBatch(
peerStates := make(map[string]wire.Message)
var mtxPeerStates sync.RWMutex

// allDone is a helper closure we'll use to determine whether we can
// exit due to all of our queries being answered.
allDone := func() bool {
for i := 0; i < len(queryStates); i++ {
if atomic.LoadUint32(&queryStates[i]) !=
uint32(queryAnswered) {
return false
}
}
return true
}

// allDoneSignal is a channel we'll close to signal to the main loop
// that all of the queries have been answered.
var allDoneOnce sync.Once
allDoneSignal := make(chan struct{})

peerGoroutine := func(sp *ServerPeer, quit <-chan struct{},
matchSignal <-chan struct{}) {
matchSignal <-chan struct{}, allDoneSignal chan struct{}) {

// Subscribe to messages from the peer.
sp.subscribeRecvMsg(subscription)
Expand Down Expand Up @@ -448,13 +465,22 @@ func queryChainServiceBatch(
break
}

log.Tracef("Query #%v answered, updating state",
handleQuery)

// We got a match signal so we can mark this
// query a success.
atomic.StoreUint32(&queryStates[handleQuery],
uint32(queryAnswered))

log.Tracef("Query #%v answered, updating state",
handleQuery)
// If we're done answering all of our queries,
// we can exit now.
if allDone() {
allDoneOnce.Do(func() {
close(allDoneSignal)
})
return
}
}

// Before exiting the peer goroutine, reset the query
Expand Down Expand Up @@ -495,6 +521,7 @@ func queryChainServiceBatch(
matchSignals[sp] = make(chan struct{})
go peerGoroutine(
peer, peerQuits[sp], matchSignals[sp],
allDoneSignal,
)
}

Expand Down Expand Up @@ -525,20 +552,10 @@ func queryChainServiceBatch(
}
}
case <-time.After(qo.timeout):
// Check if we're done; if so, quit.
allDone := true
for i := 0; i < len(queryStates); i++ {
if atomic.LoadUint32(&queryStates[i]) !=
uint32(queryAnswered) {
allDone = false
}
}
if allDone {
return
}
case <-allDoneSignal:
return
case <-queryQuit:
return

case <-s.quit:
return
}
Expand Down

0 comments on commit 39d89fd

Please sign in to comment.