Skip to content

Commit

Permalink
move Syncer to G, and batch calls into send stale notifications CORE-…
Browse files Browse the repository at this point in the history
…4757 (keybase#6378)

* wip

* one second

* fix tests

* lock full reload

* fix syncer init

* fix

* just clear user
  • Loading branch information
mmaxim authored Mar 27, 2017
1 parent e3400a1 commit 8d31882
Showing 11 changed files with 127 additions and 37 deletions.
5 changes: 1 addition & 4 deletions go/chat/inboxsource.go
Original file line number Diff line number Diff line change
@@ -421,8 +421,6 @@ type HybridInboxSource struct {
libkb.Contextified
utils.DebugLabeler
*baseInboxSource

syncer *Syncer
}

func NewHybridInboxSource(g *libkb.GlobalContext,
@@ -433,7 +431,6 @@ func NewHybridInboxSource(g *libkb.GlobalContext,
Contextified: libkb.NewContextified(g),
DebugLabeler: utils.NewDebugLabeler(g, "HybridInboxSource", false),
baseInboxSource: newBaseInboxSource(g, getChatInterface, tlfInfoSource),
syncer: NewSyncer(g),
}
}

@@ -574,7 +571,7 @@ func (s *HybridInboxSource) handleInboxError(ctx context.Context, err storage.Er
if verr, ok := err.(storage.VersionMismatchError); ok {
s.Debug(ctx, "handleInboxError: version mismatch, syncing and sending stale notifications: %s",
verr.Error())
s.syncer.Sync(ctx, s.getChatInterface(), uid)
s.G().Syncer.Sync(ctx, s.getChatInterface(), uid)
return nil
}
return err
4 changes: 2 additions & 2 deletions go/chat/push.go
Original file line number Diff line number Diff line change
@@ -195,8 +195,8 @@ func (g *PushHandler) Activity(ctx context.Context, m gregor.OutOfBandMessage, b
if pushErr != nil {
g.Debug(ctx, "chat activity: newMessage: push error, alerting")
}
NewSyncer(g.G()).SendChatStaleNotifications(context.Background(), m.UID().Bytes(),
[]chat1.ConversationID{nm.ConvID})
g.G().Syncer.SendChatStaleNotifications(context.Background(), m.UID().Bytes(),
[]chat1.ConversationID{nm.ConvID}, true)
}

if badger != nil && nm.UnreadUpdate != nil {
1 change: 1 addition & 0 deletions go/chat/sender_test.go
Original file line number Diff line number Diff line change
@@ -148,6 +148,7 @@ func setupTest(t *testing.T, numUsers int) (*kbtest.ChatMockWorld, chat1.RemoteI
tc.G.MessageDeliverer.(*Deliverer).SetClock(world.Fc)
tc.G.MessageDeliverer.Start(context.TODO(), u.User.GetUID().ToBytes())
tc.G.MessageDeliverer.Connected(context.TODO())
tc.G.Syncer = NewSyncer(tc.G)

return world, ri, sender, baseSender, &listener, tlf
}
106 changes: 92 additions & 14 deletions go/chat/sync.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package chat

import (
"context"
"encoding/hex"
"sync"
"time"

"github.com/keybase/client/go/chat/storage"
"github.com/keybase/client/go/chat/types"
@@ -11,6 +12,8 @@ import (
"github.com/keybase/client/go/protocol/chat1"
"github.com/keybase/client/go/protocol/gregor1"
"github.com/keybase/client/go/protocol/keybase1"
"github.com/keybase/clockwork"
"golang.org/x/net/context"
)

type Syncer struct {
@@ -20,14 +23,85 @@ type Syncer struct {

isConnected bool
offlinables []types.Offlinable

notificationLock sync.Mutex
clock clockwork.Clock
sendDelay time.Duration
shutdownCh chan struct{}
fullReloadCh chan gregor1.UID
flushCh chan struct{}
notificationQueue map[string][]chat1.ConversationID
}

func NewSyncer(g *libkb.GlobalContext) *Syncer {
return &Syncer{
Contextified: libkb.NewContextified(g),
DebugLabeler: utils.NewDebugLabeler(g, "Syncer", false),
isConnected: true,
s := &Syncer{
Contextified: libkb.NewContextified(g),
DebugLabeler: utils.NewDebugLabeler(g, "Syncer", false),
isConnected: true,
clock: clockwork.NewRealClock(),
shutdownCh: make(chan struct{}),
fullReloadCh: make(chan gregor1.UID),
flushCh: make(chan struct{}),
notificationQueue: make(map[string][]chat1.ConversationID),
sendDelay: time.Millisecond * 1000,
}

go s.sendNotificationLoop()
return s
}

func (s *Syncer) SetClock(clock clockwork.Clock) {
s.clock = clock
}

func (s *Syncer) Shutdown() {
s.Debug(context.Background(), "shutting down")
close(s.shutdownCh)
}

func (s *Syncer) dedupConvIDs(convIDs []chat1.ConversationID) (res []chat1.ConversationID) {
m := make(map[string]bool)
for _, convID := range convIDs {
m[convID.String()] = true
}
for hexConvID := range m {
convID, _ := hex.DecodeString(hexConvID)
res = append(res, convID)
}
return res
}

func (s *Syncer) sendNotificationsOnce() {
s.notificationLock.Lock()
defer s.notificationLock.Unlock()
for uid, convIDs := range s.notificationQueue {
convIDs = s.dedupConvIDs(convIDs)
s.Debug(context.Background(), "flushing notifications: uid: %s len: %d", uid, len(convIDs))
s.G().NotifyRouter.HandleChatThreadsStale(context.Background(), keybase1.UID(uid), convIDs)
}
s.notificationQueue = make(map[string][]chat1.ConversationID)
}

func (s *Syncer) sendNotificationLoop() {
s.Debug(context.Background(), "starting notification loop")
for {
select {
case <-s.shutdownCh:
return
case uid := <-s.fullReloadCh:
s.notificationLock.Lock()
kuid := keybase1.UID(uid.String())
s.G().NotifyRouter.HandleChatInboxStale(context.Background(), kuid)
s.G().NotifyRouter.HandleChatThreadsStale(context.Background(), kuid, nil)
s.notificationQueue[uid.String()] = nil
s.notificationLock.Unlock()
case <-s.clock.After(s.sendDelay):
s.sendNotificationsOnce()
case <-s.flushCh:
s.sendNotificationsOnce()
}
}

}

func (s *Syncer) getConvIDs(convs []chat1.Conversation) (res []chat1.ConversationID) {
@@ -38,15 +112,19 @@ func (s *Syncer) getConvIDs(convs []chat1.Conversation) (res []chat1.Conversatio
}

func (s *Syncer) SendChatStaleNotifications(ctx context.Context, uid gregor1.UID,
convIDs []chat1.ConversationID) {

kuid := keybase1.UID(uid.String())
convIDs []chat1.ConversationID, immediate bool) {
if len(convIDs) == 0 {
s.Debug(ctx, "sending inbox stale message")
s.G().NotifyRouter.HandleChatInboxStale(context.Background(), kuid)
s.fullReloadCh <- uid
} else {
s.Debug(ctx, "sending threads stale message: len: %d", len(convIDs))
s.notificationLock.Lock()
s.notificationQueue[uid.String()] = append(s.notificationQueue[uid.String()], convIDs...)
s.notificationLock.Unlock()
if immediate {
s.flushCh <- struct{}{}
}
}
s.Debug(ctx, "sending threads stale message: len: %d", len(convIDs))
s.G().NotifyRouter.HandleChatThreadsStale(context.Background(), kuid, convIDs)
}

func (s *Syncer) isServerInboxClear(ctx context.Context, inbox *storage.Inbox, srvVers int) bool {
@@ -146,7 +224,7 @@ func (s *Syncer) sync(ctx context.Context, cli chat1.RemoteInterface, uid gregor
s.Debug(ctx, "Sync: failed to clear inbox: %s", err.Error())
}
// Send notifications for a full clear
s.SendChatStaleNotifications(ctx, uid, nil)
s.SendChatStaleNotifications(ctx, uid, nil, true)
case chat1.SyncInboxResType_CURRENT:
s.Debug(ctx, "Sync: version is current, standing pat: %v", vers)
case chat1.SyncInboxResType_INCREMENTAL:
@@ -158,10 +236,10 @@ func (s *Syncer) sync(ctx context.Context, cli chat1.RemoteInterface, uid gregor
s.Debug(ctx, "Sync: failed to sync conversations to inbox: %s", err.Error())

// Send notifications for a full clear
s.SendChatStaleNotifications(ctx, uid, nil)
s.SendChatStaleNotifications(ctx, uid, nil, true)
} else {
// Send notifications for a successful partial sync
s.SendChatStaleNotifications(ctx, uid, s.getConvIDs(incr.Convs))
s.SendChatStaleNotifications(ctx, uid, s.getConvIDs(incr.Convs), true)
}
}

11 changes: 6 additions & 5 deletions go/chat/sync_test.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package chat

import (
"testing"
"time"

"github.com/keybase/client/go/chat/storage"
"github.com/keybase/client/go/kbtest"
@@ -83,13 +84,13 @@ func TestSyncerConnected(t *testing.T) {
require.NoError(t, syncer.Sync(context.TODO(), ri, uid))
select {
case <-list.inboxStale:
default:
case <-time.After(20 * time.Second):
require.Fail(t, "no inbox stale received")
}
select {
case cids := <-list.threadsStale:
require.Zero(t, len(cids))
default:
case <-time.After(20 * time.Second):
require.Fail(t, "no threads stale received")
}
_, _, err := ibox.ReadAll(context.TODO())
@@ -122,7 +123,7 @@ func TestSyncerConnected(t *testing.T) {
case cids := <-list.threadsStale:
require.Equal(t, 1, len(cids))
require.Equal(t, convs[1].GetConvID(), cids[0])
default:
case <-time.After(20 * time.Second):
require.Fail(t, "no threads stale received")
}
vers, iconvs, err := ibox.ReadAll(context.TODO())
@@ -151,13 +152,13 @@ func TestSyncerConnected(t *testing.T) {
require.NoError(t, syncer.Sync(context.TODO(), ri, uid))
select {
case <-list.inboxStale:
default:
case <-time.After(20 * time.Second):
require.Fail(t, "no inbox stale received")
}
select {
case cids := <-list.threadsStale:
require.Zero(t, len(cids))
default:
case <-time.After(20 * time.Second):
require.Fail(t, "no threads stale received")
}
_, _, err = ibox.ReadAll(context.TODO())
10 changes: 10 additions & 0 deletions go/chat/types/interfaces.go
Original file line number Diff line number Diff line change
@@ -82,3 +82,13 @@ type ServerCacheVersions interface {
MatchInbox(ctx context.Context, vers int) (int, error)
Fetch(ctx context.Context) (chat1.ServerCacheVers, error)
}

type Syncer interface {
Connected(ctx context.Context, cli chat1.RemoteInterface, uid gregor1.UID) error
Disconnected(ctx context.Context)
Sync(ctx context.Context, cli chat1.RemoteInterface, uid gregor1.UID) error
RegisterOfflinable(offlinable Offlinable)
SendChatStaleNotifications(ctx context.Context, uid gregor1.UID, convIDs []chat1.ConversationID,
immediate bool)
Shutdown()
}
4 changes: 4 additions & 0 deletions go/libkb/globals.go
Original file line number Diff line number Diff line change
@@ -106,6 +106,7 @@ type GlobalContext struct {
ConvSource chattypes.ConversationSource // source of remote message bodies for chat
MessageDeliverer chattypes.MessageDeliverer // background message delivery service
ServerCacheVersions chattypes.ServerCacheVersions // server side versions for chat caches
Syncer chattypes.Syncer // keeps various parts of chat system in sync

// Can be overloaded by tests to get an improvement in performance
NewTriplesec func(pw []byte, salt []byte) (Triplesec, error)
@@ -484,6 +485,9 @@ func (g *GlobalContext) Shutdown() error {
if g.MessageDeliverer != nil {
g.MessageDeliverer.Stop(context.Background())
}
if g.Syncer != nil {
g.Syncer.Shutdown()
}

for _, hook := range g.ShutdownHooks {
epick.Push(hook())
1 change: 1 addition & 0 deletions go/service/chat_local_test.go
Original file line number Diff line number Diff line change
@@ -81,6 +81,7 @@ func (c *chatTestContext) as(t *testing.T, user *kbtest.FakeUser) *chatTestUserC
func() chat1.RemoteInterface { return mockRemote },
h.tlfInfoSource)
tc.G.ServerCacheVersions = storage.NewServerVersions(tc.G)
tc.G.Syncer = chat.NewSyncer(tc.G)

h.setTestRemoteClient(mockRemote)
h.gh = newGregorHandler(tc.G)
11 changes: 2 additions & 9 deletions go/service/gregor.go
Original file line number Diff line number Diff line change
@@ -118,7 +118,6 @@ type gregorHandler struct {
firehoseHandlers []libkb.GregorFirehoseHandler
badger *badges.Badger
chatHandler *chat.PushHandler
chatSync *chat.Syncer
reachability *reachability

// This mutex protects the con object
@@ -175,16 +174,10 @@ func newGregorHandler(g *libkb.GlobalContext) *gregorHandler {
freshReplay: true,
pushStateFilter: func(m gregor.Message) bool { return true },
badger: nil,
chatSync: chat.NewSyncer(g),
chatHandler: chat.NewPushHandler(g),
broadcastCh: make(chan gregor1.Message, 10000),
}

// Set up Offlinables on Syncer
gh.chatSync.RegisterOfflinable(g.InboxSource)
gh.chatSync.RegisterOfflinable(g.ConvSource)
gh.chatSync.RegisterOfflinable(g.MessageDeliverer)

// Attempt to create a gregor client initially, if we are not logged in
// or don't have user/device info in G, then this won't work
if err := gh.resetGregorClient(); err != nil {
@@ -518,7 +511,7 @@ func (g *gregorHandler) OnConnect(ctx context.Context, conn *rpc.Connection,
if err == nil {
chatCli := chat1.RemoteClient{Cli: cli}
uid := gcli.User.(gregor1.UID)
if err := g.chatSync.Connected(ctx, chatCli, uid); err != nil {
if err := g.G().Syncer.Connected(ctx, chatCli, uid); err != nil {
return err
}
}
@@ -569,7 +562,7 @@ func (g *gregorHandler) OnDisconnected(ctx context.Context, status rpc.Disconnec
g.Debug(context.Background(), "disconnected: %v", status)

// Alert chat syncer that we are now disconnected
g.chatSync.Disconnected(ctx)
g.G().Syncer.Disconnected(ctx)

// Call out to reachability module if we have one
if g.reachability != nil {
2 changes: 1 addition & 1 deletion go/service/kbfs.go
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ func (h *KBFSHandler) notifyConversation(uid keybase1.UID, filename string, publ
}

h.G().Log.Debug("sending ChatThreadsStale notification (conversations: %d)", len(convIDs))
chat.NewSyncer(h.G()).SendChatStaleNotifications(context.Background(), uid.ToBytes(), convIDs)
h.G().Syncer.SendChatStaleNotifications(context.Background(), uid.ToBytes(), convIDs, false)
}

func (h *KBFSHandler) conversationIDs(uid keybase1.UID, tlf string, public bool) ([]chat1.ConversationID, error) {
9 changes: 7 additions & 2 deletions go/service/main.go
Original file line number Diff line number Diff line change
@@ -252,6 +252,8 @@ func (d *Service) createMessageDeliverer() {

sender := chat.NewBlockingSender(d.G(), chat.NewBoxer(d.G(), tlf), d.attachmentstore, ri)
d.G().MessageDeliverer = chat.NewDeliverer(d.G(), sender)

d.G().Syncer.RegisterOfflinable(d.G().MessageDeliverer)
}

func (d *Service) startMessageDeliverer() {
@@ -267,11 +269,14 @@ func (d *Service) createChatSources() {

boxer := chat.NewBoxer(d.G(), tlf)
d.G().InboxSource = chat.NewInboxSource(d.G(), d.G().Env.GetInboxSourceType(), ri, tlf)

d.G().ConvSource = chat.NewConversationSource(d.G(), d.G().Env.GetConvSourceType(),
boxer, storage.New(d.G()), ri)

d.G().ServerCacheVersions = storage.NewServerVersions(d.G())
d.G().Syncer = chat.NewSyncer(d.G())

// Set up Offlinables on Syncer
d.G().Syncer.RegisterOfflinable(d.G().InboxSource)
d.G().Syncer.RegisterOfflinable(d.G().ConvSource)

// Add a tlfHandler into the user changed handler group so we can keep identify info
// fresh

0 comments on commit 8d31882

Please sign in to comment.