diff --git a/go/kbfs/libkbfs/prefetcher.go b/go/kbfs/libkbfs/prefetcher.go index eac8b1f66b73..6dc53011c138 100644 --- a/go/kbfs/libkbfs/prefetcher.go +++ b/go/kbfs/libkbfs/prefetcher.go @@ -155,6 +155,10 @@ type blockPrefetcher struct { // channel that's always closed, to avoid overhead on certain requests closedCh <-chan struct{} + pauseLock sync.RWMutex + paused bool + pausedCh chan struct{} + // map to channels for cancelling queued prefetches queuedPrefetchHandlesLock sync.Mutex queuedPrefetchHandles map[data.BlockPointer]queuedPrefetch @@ -197,6 +201,7 @@ func newBlockPrefetcher(retriever BlockRetriever, queuedPrefetchHandles: make(map[data.BlockPointer]queuedPrefetch), rescheduled: make(map[kbfsblock.ID]*rescheduledPrefetch), closedCh: closedCh, + pausedCh: make(chan struct{}), } if config != nil { p.log = config.MakeLogger("PRE") @@ -1304,10 +1309,32 @@ func (p *blockPrefetcher) handlePrefetchRequest(req *prefetchRequest) { } } +func (p *blockPrefetcher) setPaused(paused bool) { + p.pauseLock.Lock() + defer p.pauseLock.Unlock() + oldPaused := p.paused + p.paused = paused + if oldPaused != paused { + close(p.pausedCh) + p.pausedCh = make(chan struct{}) + } +} + +func (p *blockPrefetcher) getPaused() (paused bool, ch <-chan struct{}) { + p.pauseLock.RLock() + defer p.pauseLock.RUnlock() + return p.paused, p.pausedCh +} + func (p *blockPrefetcher) handleAppStateChange( appState *keybase1.MobileAppState) { - // Pause the prefetcher on cell connections. + defer func() { + p.setPaused(false) + }() + + // Pause the prefetcher when backgrounded. for *appState != keybase1.MobileAppState_FOREGROUND { + p.setPaused(true) p.log.CDebugf( context.TODO(), "Pausing prefetcher while backgrounded") select { @@ -1344,6 +1371,10 @@ func (p *blockPrefetcher) handleNetStateChange( return } + defer func() { + p.setPaused(false) + }() + for *netState == keybase1.MobileNetworkState_CELLULAR { // Default to not syncing while on a cell network. syncOnCellular := false @@ -1360,6 +1391,7 @@ func (p *blockPrefetcher) handleNetStateChange( break } + p.setPaused(true) p.log.CDebugf( context.TODO(), "Pausing prefetcher on cell network") select { @@ -1632,6 +1664,29 @@ func (p *blockPrefetcher) ProcessBlockForPrefetch(ctx context.Context, var errPrefetcherAlreadyShutDown = errors.New("Already shut down") +func (p *blockPrefetcher) proxyWaitCh( + ctx context.Context, ptr data.BlockPointer, + c <-chan <-chan struct{}) <-chan struct{} { + p.log.CDebugf( + ctx, "Proxying the wait channel for %s while prefetching is paused", + ptr) + proxyCh := make(chan struct{}) + go func() { + var waitCh <-chan struct{} + select { + case waitCh = <-c: + case <-p.shutdownCh: + return + } + select { + case <-waitCh: + close(proxyCh) + case <-p.shutdownCh: + } + }() + return proxyCh +} + // WaitChannelForBlockPrefetch implements the Prefetcher interface for // blockPrefetcher. func (p *blockPrefetcher) WaitChannelForBlockPrefetch( @@ -1649,14 +1704,32 @@ func (p *blockPrefetcher) WaitChannelForBlockPrefetch( case <-ctx.Done(): return nil, ctx.Err() } + + // If we're paused for some reason, we still want to return a + // channel quickly to the caller, so proxy the real wait channel + // and return right away. The caller can still wait on the proxy + // channel while the real request is waiting on the prefetcher + // request queue. + paused, pausedCh := p.getPaused() + if paused { + return p.proxyWaitCh(ctx, ptr, c), nil + } + // Wait for response. - select { - case waitCh := <-c: - return waitCh, nil - case <-p.shutdownCh: - return nil, errPrefetcherAlreadyShutDown - case <-ctx.Done(): - return nil, ctx.Err() + for { + select { + case waitCh := <-c: + return waitCh, nil + case <-pausedCh: + paused, pausedCh = p.getPaused() + if paused { + return p.proxyWaitCh(ctx, ptr, c), nil + } + case <-p.shutdownCh: + return nil, errPrefetcherAlreadyShutDown + case <-ctx.Done(): + return nil, ctx.Err() + } } } diff --git a/go/kbfs/libkbfs/prefetcher_test.go b/go/kbfs/libkbfs/prefetcher_test.go index 39652250859f..ba8d06f5e150 100644 --- a/go/kbfs/libkbfs/prefetcher_test.go +++ b/go/kbfs/libkbfs/prefetcher_test.go @@ -2371,6 +2371,14 @@ func TestPrefetcherCellularPause(t *testing.T) { last = <-callCh require.Equal(t, keybase1.MobileNetworkState_CELLULAR, last) + t.Log("Make sure we get a wait channel right away") + ptr := makeRandomBlockPointer(t) + ctx, cancel := context.WithTimeout( + context.Background(), individualTestTimeout) + defer cancel() + _, err := q.Prefetcher().WaitChannelForBlockPrefetch(ctx, ptr) + require.NoError(t, err) + t.Log("Unpause it to make it notify again") stateCh <- keybase1.MobileNetworkState_NONE notifySyncCh(t, prefetchSyncCh)