From 1a2ccef3b262080554b1d80591f1ac6133402491 Mon Sep 17 00:00:00 2001 From: Youngteac Hong Date: Sat, 9 Jan 2021 18:04:32 +0900 Subject: [PATCH] Extract MemPubSub from PubSub (#141) Before implementing the distributed PubSub implementation using ETCD, we need to extract the temporarily implemented MemoryPubSub and leave only the high-level abstraction logic in PubSub. --- yorkie/backend/backend.go | 5 +- yorkie/backend/pubsub/mempubsub/mempubsub.go | 156 +++++++++++++++++++ yorkie/backend/pubsub/pubsub.go | 152 +++--------------- yorkie/config.sample.json | 36 ++--- 4 files changed, 197 insertions(+), 152 deletions(-) create mode 100644 yorkie/backend/pubsub/mempubsub/mempubsub.go diff --git a/yorkie/backend/backend.go b/yorkie/backend/backend.go index 6bd099a79..46a594507 100644 --- a/yorkie/backend/backend.go +++ b/yorkie/backend/backend.go @@ -23,6 +23,7 @@ import ( "github.com/yorkie-team/yorkie/yorkie/backend/db" "github.com/yorkie-team/yorkie/yorkie/backend/db/mongo" "github.com/yorkie-team/yorkie/yorkie/backend/pubsub" + "github.com/yorkie-team/yorkie/yorkie/backend/pubsub/mempubsub" "github.com/yorkie-team/yorkie/yorkie/backend/sync" ) @@ -43,7 +44,7 @@ type Backend struct { DB db.DB MutexMap *sync.MutexMap - PubSub *pubsub.PubSub + PubSub pubsub.PubSub // closing is closed by backend close. closing chan struct{} @@ -67,7 +68,7 @@ func New(conf *Config, mongoConf *mongo.Config) (*Backend, error) { Config: conf, DB: client, MutexMap: sync.NewMutexMap(), - PubSub: pubsub.New(), + PubSub: mempubsub.New(), closing: make(chan struct{}), }, nil } diff --git a/yorkie/backend/pubsub/mempubsub/mempubsub.go b/yorkie/backend/pubsub/mempubsub/mempubsub.go new file mode 100644 index 000000000..ba8165058 --- /dev/null +++ b/yorkie/backend/pubsub/mempubsub/mempubsub.go @@ -0,0 +1,156 @@ +/* + * Copyright 2020 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mempubsub + +import ( + "sync" + + "github.com/yorkie-team/yorkie/pkg/document/time" + "github.com/yorkie-team/yorkie/pkg/log" + "github.com/yorkie-team/yorkie/pkg/types" + "github.com/yorkie-team/yorkie/yorkie/backend/pubsub" +) + +// subscriptions is a collection of subscriptions that subscribe to a specific +// topic. +type subscriptions struct { + internalMap map[string]*pubsub.Subscription +} + +func newSubscriptions() *subscriptions { + return &subscriptions{ + internalMap: make(map[string]*pubsub.Subscription), + } +} + +// Add adds the given subscription. +func (s *subscriptions) Add(sub *pubsub.Subscription) { + s.internalMap[sub.ID()] = sub +} + +// Map returns the internal map of this subscriptions. +func (s *subscriptions) Map() map[string]*pubsub.Subscription { + return s.internalMap +} + +// Delete deletes the subscription of the given id. +func (s *subscriptions) Delete(id string) { + if subscription, ok := s.internalMap[id]; ok { + subscription.Close() + } + delete(s.internalMap, id) +} + +// MemPubSub is the memory implementation of MemPubSub, used for single agent or +// tests. +type MemPubSub struct { + mu *sync.RWMutex + subscriptionsMap map[string]*subscriptions +} + +// New creates an instance of MemoryPubSub. +func New() *MemPubSub { + return &MemPubSub{ + mu: &sync.RWMutex{}, + subscriptionsMap: make(map[string]*subscriptions), + } +} + +// Subscribe subscribes to the given topics. +func (m *MemPubSub) Subscribe( + subscriber types.Client, + topics []string, +) (*pubsub.Subscription, map[string][]types.Client, error) { + if len(topics) == 0 { + return nil, nil, pubsub.ErrEmptyTopics + } + + m.mu.Lock() + defer m.mu.Unlock() + + log.Logger.Debugf( + `Subscribe(%s,%s) Start`, + topics[0], + subscriber.ID.String(), + ) + + sub := pubsub.NewSubscription(subscriber) + peersMap := make(map[string][]types.Client) + + for _, topic := range topics { + if _, ok := m.subscriptionsMap[topic]; !ok { + m.subscriptionsMap[topic] = newSubscriptions() + } + m.subscriptionsMap[topic].Add(sub) + + var peers []types.Client + for _, sub := range m.subscriptionsMap[topic].Map() { + peers = append(peers, sub.Subscriber()) + } + peersMap[topic] = peers + } + + log.Logger.Debugf( + `Subscribe(%s,%s) End`, + topics[0], + subscriber.ID.String(), + ) + return sub, peersMap, nil +} + +// Unsubscribe unsubscribes the given topics. +func (m *MemPubSub) Unsubscribe(topics []string, sub *pubsub.Subscription) { + m.mu.Lock() + defer m.mu.Unlock() + + log.Logger.Debugf(`Unsubscribe(%s,%s) Start`, topics[0], sub.SubscriberID()) + + for _, topic := range topics { + if subs, ok := m.subscriptionsMap[topic]; ok { + subs.Delete(sub.ID()) + } + } + log.Logger.Debugf(`Unsubscribe(%s,%s) End`, topics[0], sub.SubscriberID()) +} + +// Publish publishes the given event to the given Topic. +func (m *MemPubSub) Publish( + publisherID *time.ActorID, + topic string, + event pubsub.DocEvent, +) { + m.mu.RLock() + defer m.mu.RUnlock() + + log.Logger.Debugf(`Publish(%s,%s) Start`, event.DocKey, publisherID.String()) + + if subs, ok := m.subscriptionsMap[topic]; ok { + for _, sub := range subs.Map() { + if sub.Subscriber().ID.Compare(publisherID) != 0 { + log.Logger.Debugf( + `Publish(%s,%s) to %s`, + event.DocKey, + publisherID.String(), + sub.SubscriberID(), + ) + sub.Events() <- event + } + } + } + + log.Logger.Debugf(`Publish(%s,%s) End`, event.DocKey, publisherID.String()) +} diff --git a/yorkie/backend/pubsub/pubsub.go b/yorkie/backend/pubsub/pubsub.go index 3c10250a2..0a55a8e75 100644 --- a/yorkie/backend/pubsub/pubsub.go +++ b/yorkie/backend/pubsub/pubsub.go @@ -18,12 +18,10 @@ package pubsub import ( "errors" - "sync" "github.com/google/uuid" "github.com/yorkie-team/yorkie/pkg/document/time" - "github.com/yorkie-team/yorkie/pkg/log" "github.com/yorkie-team/yorkie/pkg/types" ) @@ -48,7 +46,8 @@ type Subscription struct { events chan DocEvent } -func newSubscription(subscriber types.Client) *Subscription { +// NewSubscription creates a new instance of Subscription. +func NewSubscription(subscriber types.Client) *Subscription { return &Subscription{ id: uuid.New().String(), subscriber: subscriber, @@ -59,8 +58,13 @@ func newSubscription(subscriber types.Client) *Subscription { } } +// ID returns the id of this subscription. +func (s *Subscription) ID() string { + return s.id +} + // Events returns the DocEvent channel of this subscription. -func (s *Subscription) Events() <-chan DocEvent { +func (s *Subscription) Events() chan DocEvent { return s.events } @@ -84,133 +88,17 @@ func (s *Subscription) Close() { close(s.events) } -// Subscriptions is a collection of subscriptions that subscribe to a specific -// topic. -type Subscriptions struct { - internalMap map[string]*Subscription -} - -func newSubscriptions() *Subscriptions { - return &Subscriptions{ - internalMap: make(map[string]*Subscription), - } -} - -// Add adds the given subscription. -func (s *Subscriptions) Add(sub *Subscription) { - s.internalMap[sub.id] = sub -} - -// Map returns the internal map of this Subscriptions. -func (s *Subscriptions) Map() map[string]*Subscription { - return s.internalMap -} - -// Delete deletes the subscription of the given id. -func (s *Subscriptions) Delete(id string) { - if subscription, ok := s.internalMap[id]; ok { - subscription.Close() - } - delete(s.internalMap, id) -} - // PubSub is a structure to support event publishing/subscription. -// TODO: Temporary Memory PubSub. -// - We will need to replace this with distributed pubSub. -type PubSub struct { - mu *sync.RWMutex - subscriptionsMap map[string]*Subscriptions -} - -// New creates an instance of Pubsub. -func New() *PubSub { - return &PubSub{ - mu: &sync.RWMutex{}, - subscriptionsMap: make(map[string]*Subscriptions), - } -} - -// Subscribe subscribes to the given topics. -func (m *PubSub) Subscribe( - subscriber types.Client, - topics []string, -) (*Subscription, map[string][]types.Client, error) { - if len(topics) == 0 { - return nil, nil, ErrEmptyTopics - } - - m.mu.Lock() - defer m.mu.Unlock() - - log.Logger.Debugf( - `Subscribe(%s,%s) Start`, - topics[0], - subscriber.ID.String(), - ) - - subscription := newSubscription(subscriber) - peersMap := make(map[string][]types.Client) - - for _, topic := range topics { - if _, ok := m.subscriptionsMap[topic]; !ok { - m.subscriptionsMap[topic] = newSubscriptions() - } - m.subscriptionsMap[topic].Add(subscription) - - var peers []types.Client - for _, sub := range m.subscriptionsMap[topic].Map() { - peers = append(peers, sub.subscriber) - } - peersMap[topic] = peers - } - - log.Logger.Debugf( - `Subscribe(%s,%s) End`, - topics[0], - subscriber.ID.String(), - ) - return subscription, peersMap, nil -} - -// Unsubscribe unsubscribes the given topics. -func (m *PubSub) Unsubscribe(topics []string, sub *Subscription) { - m.mu.Lock() - defer m.mu.Unlock() - - log.Logger.Debugf(`Unsubscribe(%s,%s) Start`, topics[0], sub.SubscriberID()) - - for _, topic := range topics { - if subscriptions, ok := m.subscriptionsMap[topic]; ok { - subscriptions.Delete(sub.id) - } - } - log.Logger.Debugf(`Unsubscribe(%s,%s) End`, topics[0], sub.SubscriberID()) -} - -// Publish publishes the given event to the given Topic. -func (m *PubSub) Publish( - publisherID *time.ActorID, - topic string, - event DocEvent, -) { - m.mu.RLock() - defer m.mu.RUnlock() - - log.Logger.Debugf(`Publish(%s,%s) Start`, event.DocKey, publisherID.String()) - - if subscriptions, ok := m.subscriptionsMap[topic]; ok { - for _, sub := range subscriptions.Map() { - if sub.subscriber.ID.Compare(publisherID) != 0 { - log.Logger.Debugf( - `Publish(%s,%s) to %s`, - event.DocKey, - publisherID.String(), - sub.SubscriberID(), - ) - sub.events <- event - } - } - } - - log.Logger.Debugf(`Publish(%s,%s) End`, event.DocKey, publisherID.String()) +type PubSub interface { + // Subscribe subscribes to the given topics. + Subscribe( + subscriber types.Client, + topics []string, + ) (*Subscription, map[string][]types.Client, error) + + // Unsubscribe unsubscribes the given topics. + Unsubscribe(topics []string, sub *Subscription) + + // Publish publishes the given event to the given Topic. + Publish(publisherID *time.ActorID, topic string, event DocEvent) } diff --git a/yorkie/config.sample.json b/yorkie/config.sample.json index 7cf624c51..faf69a26c 100644 --- a/yorkie/config.sample.json +++ b/yorkie/config.sample.json @@ -1,20 +1,20 @@ { - "RPC": { - "Port": 11101, - "CertFile": "", - "KeyFile": "" - }, - "Metrics": { - "Port": 11102 - }, - "Mongo": { - "ConnectionTimeoutSec": 5, - "ConnectionURI": "mongodb://localhost:27017", - "YorkieDatabase": "yorkie-meta", - "PingTimeoutSec": 5 - }, - "Backend": { - "SnapshotThreshold": 500, - "SnapshotInterval": 100 - } + "RPC": { + "Port": 11101, + "CertFile": "", + "KeyFile": "" + }, + "Metrics": { + "Port": 11102 + }, + "Mongo": { + "ConnectionTimeoutSec": 5, + "ConnectionURI": "mongodb://localhost:27017", + "YorkieDatabase": "yorkie-meta", + "PingTimeoutSec": 5 + }, + "Backend": { + "SnapshotThreshold": 500, + "SnapshotInterval": 100 + } }