Skip to content

Commit

Permalink
prefetcher: proxy wait channel requests while paused
Browse files Browse the repository at this point in the history
If the prefetcher is paused because it's on a cellular network, or the
app has been backgrounded, we shouldn't block the "wait channel"
requests indefinitely.  These are processed on the same queue as
regular prefetch requests, and so can't be processed until we're
unpaused.  But the caller is just asking for a channel they can wait
on; they don't actually expect this call itself to block.

So if the prefetcher is paused, return a proxy channel that the caller
can use, and launch a goroutine that will close this channel when the
request is eventually processed.

I considered _always_ returning a proxy channel, but it seemed nice to
still wait on `ctx.Done()` and the shutdown channel in the case where
we expect the request to be processed quickly.

Issue: HOTPOT-2569
  • Loading branch information
strib committed Apr 23, 2020
1 parent fca44aa commit 3f825d1
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 8 deletions.
89 changes: 81 additions & 8 deletions go/kbfs/libkbfs/prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ type blockPrefetcher struct {
// channel that's always closed, to avoid overhead on certain requests
closedCh <-chan struct{}

pauseLock sync.RWMutex
paused bool
pausedCh chan struct{}

// map to channels for cancelling queued prefetches
queuedPrefetchHandlesLock sync.Mutex
queuedPrefetchHandles map[data.BlockPointer]queuedPrefetch
Expand Down Expand Up @@ -197,6 +201,7 @@ func newBlockPrefetcher(retriever BlockRetriever,
queuedPrefetchHandles: make(map[data.BlockPointer]queuedPrefetch),
rescheduled: make(map[kbfsblock.ID]*rescheduledPrefetch),
closedCh: closedCh,
pausedCh: make(chan struct{}),
}
if config != nil {
p.log = config.MakeLogger("PRE")
Expand Down Expand Up @@ -1304,10 +1309,32 @@ func (p *blockPrefetcher) handlePrefetchRequest(req *prefetchRequest) {
}
}

func (p *blockPrefetcher) setPaused(paused bool) {
p.pauseLock.Lock()
defer p.pauseLock.Unlock()
oldPaused := p.paused
p.paused = paused
if oldPaused != paused {
close(p.pausedCh)
p.pausedCh = make(chan struct{})
}
}

func (p *blockPrefetcher) getPaused() (paused bool, ch <-chan struct{}) {
p.pauseLock.RLock()
defer p.pauseLock.RUnlock()
return p.paused, p.pausedCh
}

func (p *blockPrefetcher) handleAppStateChange(
appState *keybase1.MobileAppState) {
// Pause the prefetcher on cell connections.
defer func() {
p.setPaused(false)
}()

// Pause the prefetcher when backgrounded.
for *appState != keybase1.MobileAppState_FOREGROUND {
p.setPaused(true)
p.log.CDebugf(
context.TODO(), "Pausing prefetcher while backgrounded")
select {
Expand Down Expand Up @@ -1344,6 +1371,10 @@ func (p *blockPrefetcher) handleNetStateChange(
return
}

defer func() {
p.setPaused(false)
}()

for *netState == keybase1.MobileNetworkState_CELLULAR {
// Default to not syncing while on a cell network.
syncOnCellular := false
Expand All @@ -1360,6 +1391,7 @@ func (p *blockPrefetcher) handleNetStateChange(
break
}

p.setPaused(true)
p.log.CDebugf(
context.TODO(), "Pausing prefetcher on cell network")
select {
Expand Down Expand Up @@ -1632,6 +1664,29 @@ func (p *blockPrefetcher) ProcessBlockForPrefetch(ctx context.Context,

var errPrefetcherAlreadyShutDown = errors.New("Already shut down")

func (p *blockPrefetcher) proxyWaitCh(
ctx context.Context, ptr data.BlockPointer,
c <-chan <-chan struct{}) <-chan struct{} {
p.log.CDebugf(
ctx, "Proxying the wait channel for %s while prefetching is paused",
ptr)
proxyCh := make(chan struct{})
go func() {
var waitCh <-chan struct{}
select {
case waitCh = <-c:
case <-p.shutdownCh:
return
}
select {
case <-waitCh:
close(proxyCh)
case <-p.shutdownCh:
}
}()
return proxyCh
}

// WaitChannelForBlockPrefetch implements the Prefetcher interface for
// blockPrefetcher.
func (p *blockPrefetcher) WaitChannelForBlockPrefetch(
Expand All @@ -1649,14 +1704,32 @@ func (p *blockPrefetcher) WaitChannelForBlockPrefetch(
case <-ctx.Done():
return nil, ctx.Err()
}

// If we're paused for some reason, we still want to return a
// channel quickly to the caller, so proxy the real wait channel
// and return right away. The caller can still wait on the proxy
// channel while the real request is waiting on the prefetcher
// request queue.
paused, pausedCh := p.getPaused()
if paused {
return p.proxyWaitCh(ctx, ptr, c), nil
}

// Wait for response.
select {
case waitCh := <-c:
return waitCh, nil
case <-p.shutdownCh:
return nil, errPrefetcherAlreadyShutDown
case <-ctx.Done():
return nil, ctx.Err()
for {
select {
case waitCh := <-c:
return waitCh, nil
case <-pausedCh:
paused, pausedCh = p.getPaused()
if paused {
return p.proxyWaitCh(ctx, ptr, c), nil
}
case <-p.shutdownCh:
return nil, errPrefetcherAlreadyShutDown
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions go/kbfs/libkbfs/prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2371,6 +2371,14 @@ func TestPrefetcherCellularPause(t *testing.T) {
last = <-callCh
require.Equal(t, keybase1.MobileNetworkState_CELLULAR, last)

t.Log("Make sure we get a wait channel right away")
ptr := makeRandomBlockPointer(t)
ctx, cancel := context.WithTimeout(
context.Background(), individualTestTimeout)
defer cancel()
_, err := q.Prefetcher().WaitChannelForBlockPrefetch(ctx, ptr)
require.NoError(t, err)

t.Log("Unpause it to make it notify again")
stateCh <- keybase1.MobileNetworkState_NONE
notifySyncCh(t, prefetchSyncCh)
Expand Down

0 comments on commit 3f825d1

Please sign in to comment.