Skip to content

Commit

Permalink
Revert "poll on FolderBranch in SubMan for empty TLF (keybase#23854)" (
Browse files Browse the repository at this point in the history
…keybase#23933)

This reverts commit bc0aa80.
  • Loading branch information
songgao authored Apr 24, 2020
1 parent 0c01269 commit b45846f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 189 deletions.
177 changes: 28 additions & 149 deletions go/kbfs/libkbfs/subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,12 @@ import (

"github.com/keybase/client/go/kbfs/data"
"github.com/keybase/client/go/kbfs/tlfhandle"
"github.com/keybase/client/go/logger"
"github.com/keybase/client/go/protocol/keybase1"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)

const (
folderBranchPollingInterval = time.Second
maxPurgeableSubscriptionManagerClient = 3
)

// userPath is always the full path including the /keybase prefix, but may
// not be canonical or cleaned. The goal is to track whatever the user of this
// type is dealing with without needing them to know if a path is canonicalized
Expand Down Expand Up @@ -130,7 +124,6 @@ type nonPathSubscription struct {
type subscriptionManager struct {
clientID SubscriptionManagerClientID
config Config
log logger.Logger
notifier SubscriptionNotifier

onlineStatusTracker *onlineStatusTracker
Expand All @@ -143,7 +136,6 @@ type subscriptionManager struct {
nonPathSubscriptionIDToTopic map[SubscriptionID]keybase1.SubscriptionTopic
subscriptionIDs map[SubscriptionID]bool
subscriptionCountByFolderBranch map[data.FolderBranch]int
folderBranchPollerCancelers map[SubscriptionID]context.CancelFunc
}

func (sm *subscriptionManager) notifyOnlineStatus() {
Expand All @@ -165,11 +157,9 @@ func newSubscriptionManager(clientID SubscriptionManagerClientID, config Config,
nonPathSubscriptionIDToTopic: make(map[SubscriptionID]keybase1.SubscriptionTopic),
clientID: clientID,
config: config,
log: config.MakeLogger("SubMan"),
notifier: notifier,
subscriptionIDs: make(map[SubscriptionID]bool),
subscriptionCountByFolderBranch: make(map[data.FolderBranch]int),
folderBranchPollerCancelers: make(map[SubscriptionID]context.CancelFunc),
}
sm.onlineStatusTracker = newOnlineStatusTracker(config, sm.notifyOnlineStatus)
return sm
Expand Down Expand Up @@ -287,31 +277,41 @@ func (sm *subscriptionManager) makeNonPathSubscriptionDebouncedNotify(
}, limit)
}

type subscribePathRequest struct {
sid SubscriptionID
path string // original, uncleaned path from GUI
topic keybase1.PathSubscriptionTopic
deduplicateInterval *time.Duration
}

func (sm *subscriptionManager) subscribePathWithFolderBranchLocked(
req subscribePathRequest,
parsedPath *parsedPath, fb data.FolderBranch) error {
// SubscribePath implements the SubscriptionManager interface.
func (sm *subscriptionManager) SubscribePath(ctx context.Context,
sid SubscriptionID, path string, topic keybase1.PathSubscriptionTopic,
deduplicateInterval *time.Duration) error {
parsedPath, err := parsePath(userPath(path))
if err != nil {
return err
}
fb, err := parsedPath.getFolderBranch(ctx, sm.config)
if err != nil {
return err
}
if fb == (data.FolderBranch{}) {
// ignore non-existent TLF.
// TODO: deal with this case HOTPOTP-501
return nil
}
nitp := getCleanInTlfPath(parsedPath)

ref := pathSubscriptionRef{
folderBranch: fb,
path: nitp,
}

subscriptionIDSetter, err := sm.checkSubscriptionIDLocked(req.sid)
sm.lock.Lock()
defer sm.lock.Unlock()
subscriptionIDSetter, err := sm.checkSubscriptionIDLocked(sid)
if err != nil {
return err
}
sm.registerForChangesLocked(ref.folderBranch)

limit := rate.Inf
if req.deduplicateInterval != nil {
limit = rate.Every(*req.deduplicateInterval)
if deduplicateInterval != nil {
limit = rate.Every(*deduplicateInterval)
}
ps, ok := sm.pathSubscriptions[ref]
if !ok {
Expand All @@ -328,131 +328,14 @@ func (sm *subscriptionManager) subscribePathWithFolderBranchLocked(
ps.debouncedNotify.shutdown()
ps.debouncedNotify = sm.makePathSubscriptionDebouncedNotify(ref, limit)
}
ps.subscriptionIDs[req.sid] = req.topic
ps.pathsToNotify[req.path] = struct{}{}
ps.subscriptionIDs[sid] = topic
ps.pathsToNotify[path] = struct{}{}

sm.pathSubscriptionIDToRef[req.sid] = ref
sm.pathSubscriptionIDToRef[sid] = ref
subscriptionIDSetter()
return nil
}

func (sm *subscriptionManager) cancelAndDeleteFolderBranchPollerLocked(
sid SubscriptionID) (deleted bool) {
if cancel, ok := sm.folderBranchPollerCancelers[sid]; ok {
cancel()
delete(sm.folderBranchPollerCancelers, sid)
return true
}
return false
}

func (sm *subscriptionManager) cancelAndDeleteFolderBranchPoller(
sid SubscriptionID) (deleted bool) {
sm.lock.Lock()
defer sm.lock.Unlock()
return sm.cancelAndDeleteFolderBranchPollerLocked(sid)
}

func (sm *subscriptionManager) pollOnFolderBranchForSubscribePathRequest(
ctx context.Context, req subscribePathRequest, parsedPath *parsedPath) {
ticker := time.NewTicker(folderBranchPollingInterval)
for {
select {
case <-ticker.C:
fb, err := parsedPath.getFolderBranch(ctx, sm.config)
if err != nil {
_ = sm.cancelAndDeleteFolderBranchPoller(req.sid)
return
}

if fb == (data.FolderBranch{}) {
continue
}

// We have a folderBranch now! Go ahead and complete the
// subscription, and send a notification too.

sm.lock.Lock()
defer sm.lock.Unlock()
// Check if we're done while holding the lock to protect
// against racing against unsubscribe.
select {
case <-ctx.Done():
// No need to call cancelAndDeleteFolderBranchPollerLocked here
// since we always cancel and delete at the same tiem and if
// it's canceled it must have been deleted too.
return
default:
}

err = sm.subscribePathWithFolderBranchLocked(req, parsedPath, fb)
if err != nil {
sm.log.CErrorf(ctx,
"subscribePathWithFolderBranchLocked sid=%s err=%v", req.sid, err)
}

sm.notifier.OnPathChange(
sm.clientID, []SubscriptionID{req.sid},
req.path, []keybase1.PathSubscriptionTopic{req.topic})

_ = sm.cancelAndDeleteFolderBranchPollerLocked(req.sid)
return
case <-ctx.Done():
_ = sm.cancelAndDeleteFolderBranchPoller(req.sid)
return
}
}
}

func (sm *subscriptionManager) subscribePathWithoutFolderBranchLocked(
req subscribePathRequest, parsedPath *parsedPath) {
ctx, cancel := context.WithCancel(context.Background())
sm.folderBranchPollerCancelers[req.sid] = cancel
go sm.pollOnFolderBranchForSubscribePathRequest(ctx, req, parsedPath)
}

// SubscribePath implements the SubscriptionManager interface.
func (sm *subscriptionManager) SubscribePath(ctx context.Context,
sid SubscriptionID, path string, topic keybase1.PathSubscriptionTopic,
deduplicateInterval *time.Duration) error {
parsedPath, err := parsePath(userPath(path))
if err != nil {
return err
}

// Lock here to protect against racing with unsubscribe. Specifically, we
// don't want to launch the poller if an unsubscribe call for this sid
// comes in before we get fb from parsedPath.getFolderBranch().
//
// We could still end up with a lingering subscription if unsubscribe
// happens too fast and RPC somehow gives use the unsubscribe call before
// the subscribe call, but that's probably rare enough to ignore here.
//
// In the future if this end up contributing a deadlock because
// folderBranch starts using the subscription manager somehow, we can add a
// "recently unsubscribed" cache to the subscription manager and move this
// lock further down. This cache should also mitigate the issue where the
// unsubscribe call gets deliverd before subscribe.
sm.lock.Lock()
defer sm.lock.Unlock()

fb, err := parsedPath.getFolderBranch(ctx, sm.config)
if err != nil {
return err
}
req := subscribePathRequest{
sid: sid,
path: path,
topic: topic,
deduplicateInterval: deduplicateInterval,
}
if fb != (data.FolderBranch{}) {
return sm.subscribePathWithFolderBranchLocked(req, parsedPath, fb)
}
sm.subscribePathWithoutFolderBranchLocked(req, parsedPath)
return nil
}

// SubscribeNonPath implements the SubscriptionManager interface.
func (sm *subscriptionManager) SubscribeNonPath(
ctx context.Context, sid SubscriptionID, topic keybase1.SubscriptionTopic,
Expand Down Expand Up @@ -491,12 +374,6 @@ func (sm *subscriptionManager) SubscribeNonPath(

func (sm *subscriptionManager) unsubscribePathLocked(
ctx context.Context, subscriptionID SubscriptionID) {
// First check if this is a subscription we don't yet have a folderBranch
// for.
if sm.cancelAndDeleteFolderBranchPollerLocked(subscriptionID) {
return
}

ref, ok := sm.pathSubscriptionIDToRef[subscriptionID]
if !ok {
return
Expand Down Expand Up @@ -641,6 +518,8 @@ type subscriptionManagerManager struct {
purgeableClientIDsFIFO []SubscriptionManagerClientID
}

const maxPurgeableSubscriptionManagerClient = 3

func newSubscriptionManagerManager(config Config) *subscriptionManagerManager {
return &subscriptionManagerManager{
config: config,
Expand Down
40 changes: 0 additions & 40 deletions go/kbfs/libkbfs/subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,43 +203,3 @@ func TestSubscriptionManagerFavoritesChange(t *testing.T) {
t.Logf("Waiting for last notification (done1) before finishing the test.")
waiter1()
}

func TestSubscriptionManagerSubscribePathNoFolderBranch(t *testing.T) {
config, sm, notifier, finish := initSubscriptionManagerTest(t)
defer finish()

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
ctx, err := libcontext.NewContextWithCancellationDelayer(
libcontext.NewContextReplayable(
ctx, func(c context.Context) context.Context {
return ctx
}))
require.NoError(t, err)

waiter0, done0 := waitForCall(t, 4*time.Second)

t.Logf("Subscribe to CHILDREN at TLF root using sid1, before we have a folderBranch. Then create a file. We should get a notification.")
sid1 := SubscriptionID("sid1")

err = sm.SubscribePath(ctx, sid1, "/keybase/private/jdoe",
keybase1.PathSubscriptionTopic_CHILDREN, nil)
require.NoError(t, err)
notifier.EXPECT().OnPathChange(testSubscriptionManagerClientID,
[]SubscriptionID{sid1}, "/keybase/private/jdoe",
[]keybase1.PathSubscriptionTopic{keybase1.PathSubscriptionTopic_CHILDREN}).AnyTimes().Do(done0)

tlfHandle, err := GetHandleFromFolderNameAndType(
ctx, config.KBPKI(), config.MDOps(), config, "jdoe", tlf.Private)
require.NoError(t, err)
rootNode, _, err := config.KBFSOps().GetOrCreateRootNode(
ctx, tlfHandle, data.MasterBranch)
require.NoError(t, err)
_, _, err = config.KBFSOps().CreateFile(
ctx, rootNode, rootNode.ChildName("file"), false, NoExcl)
require.NoError(t, err)
err = config.KBFSOps().SyncAll(ctx, rootNode.GetFolderBranch())
require.NoError(t, err)

waiter0()
}

0 comments on commit b45846f

Please sign in to comment.