Skip to content

Commit

Permalink
[pocketbase#1628] fixed realtime panic on concurrent clients iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
ganigeorgiev committed Jan 18, 2023
1 parent c1921ae commit 7001a22
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 13 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## v0.11.3

- Fix realtime API panic on concurrent clients iteration ([#1628](https://github.com/pocketbase/pocketbase/issues/1628))

- `app.SubscriptionsBroker().Clients()` now returns a shallow copy of the underlying map.

- Added `Discard()` and `IsDiscarded()` helper methods to the `subscriptions.Client` interface.

- Slow clients should no longer "block" the main action completion.


## v0.11.2

- Fixed `fs.DeleteByPrefix()` hang on invalid S3 settings ([#1575](https://github.com/pocketbase/pocketbase/discussions/1575#discussioncomment-4661089)).
Expand Down
29 changes: 23 additions & 6 deletions apis/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pocketbase/pocketbase/forms"
"github.com/pocketbase/pocketbase/models"
"github.com/pocketbase/pocketbase/resolvers"
"github.com/pocketbase/pocketbase/tools/routine"
"github.com/pocketbase/pocketbase/tools/search"
"github.com/pocketbase/pocketbase/tools/subscriptions"
)
Expand Down Expand Up @@ -43,10 +44,14 @@ func (api *realtimeApi) connect(c echo.Context) error {
client := subscriptions.NewDefaultClient()
api.app.SubscriptionsBroker().Register(client)
defer func() {
api.app.OnRealtimeDisconnectRequest().Trigger(&core.RealtimeDisconnectEvent{
disconnectEvent := &core.RealtimeDisconnectEvent{
HttpContext: c,
Client: client,
})
}

if err := api.app.OnRealtimeDisconnectRequest().Trigger(disconnectEvent); err != nil && api.app.IsDebug() {
log.Println(err)
}

api.app.SubscriptionsBroker().Unregister(client.Id())
}()
Expand Down Expand Up @@ -259,21 +264,27 @@ func (api *realtimeApi) bindEvents() {

api.app.OnModelAfterCreate().PreAdd(func(e *core.ModelEvent) error {
if record, ok := e.Model.(*models.Record); ok {
api.broadcastRecord("create", record)
if err := api.broadcastRecord("create", record); err != nil && api.app.IsDebug() {
log.Println(err)
}
}
return nil
})

api.app.OnModelAfterUpdate().PreAdd(func(e *core.ModelEvent) error {
if record, ok := e.Model.(*models.Record); ok {
api.broadcastRecord("update", record)
if err := api.broadcastRecord("update", record); err != nil && api.app.IsDebug() {
log.Println(err)
}
}
return nil
})

api.app.OnModelBeforeDelete().Add(func(e *core.ModelEvent) error {
if record, ok := e.Model.(*models.Record); ok {
api.broadcastRecord("delete", record)
if err := api.broadcastRecord("delete", record); err != nil && api.app.IsDebug() {
log.Println(err)
}
}
return nil
})
Expand Down Expand Up @@ -370,6 +381,8 @@ func (api *realtimeApi) broadcastRecord(action string, record *models.Record) er
encodedData := string(dataBytes)

for _, client := range clients {
client := client

for subscription, rule := range subscriptionRuleMap {
if !client.HasSubscription(subscription) {
continue
Expand Down Expand Up @@ -398,7 +411,11 @@ func (api *realtimeApi) broadcastRecord(action string, record *models.Record) er
}
}

client.Channel() <- msg
routine.FireAndForget(func() {
if !client.IsDiscarded() {
client.Channel() <- msg
}
})
}
}

Expand Down
20 changes: 13 additions & 7 deletions tools/subscriptions/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ func NewBroker() *Broker {
}
}

// Clients returns all registered clients.
// Clients returns a shallow copy of all registered clients indexed
// with their connection id.
func (b *Broker) Clients() map[string]Client {
b.mux.RLock()
defer b.mux.RUnlock()

return b.clients
copy := make(map[string]Client, len(b.clients))

for id, c := range b.clients {
copy[id] = c
}

return copy
}

// ClientById finds a registered client by its id.
Expand Down Expand Up @@ -56,9 +63,8 @@ func (b *Broker) Unregister(clientId string) {
b.mux.Lock()
defer b.mux.Unlock()

// Note:
// There is no need to explicitly close the client's channel since it will be GC-ed anyway.
// Addinitionally, closing the channel explicitly could panic when there are several
// subscriptions attached to the client that needs to receive the same event.
delete(b.clients, clientId)
if client, ok := b.clients[clientId]; ok {
client.Discard()
delete(b.clients, clientId)
}
}
7 changes: 7 additions & 0 deletions tools/subscriptions/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ func TestClients(t *testing.T) {
b.Register(subscriptions.NewDefaultClient())
b.Register(subscriptions.NewDefaultClient())

// check if it is a shallow copy
clients := b.Clients()
for k := range clients {
delete(clients, k)
}

// should return a new copy
if total := len(b.Clients()); total != 2 {
t.Fatalf("Expected 2 clients, got %v", total)
}
Expand Down
33 changes: 33 additions & 0 deletions tools/subscriptions/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ type Client interface {

// Get retrieves the key value from the client's context.
Get(key string) any

// Discard marks the client as "discarded", meaning that it
// shouldn't be used anymore for sending new messages.
//
// It is safe to call Discard() multiple times.
Discard()

// IsDiscarded indicates whether the client has been "discarded"
// and should no longer be used.
IsDiscarded() bool
}

// ensures that DefaultClient satisfies the Client interface
Expand All @@ -45,6 +55,7 @@ var _ Client = (*DefaultClient)(nil)
// DefaultClient defines a generic subscription client.
type DefaultClient struct {
mux sync.RWMutex
isDiscarded bool
id string
store map[string]any
channel chan Message
Expand All @@ -63,11 +74,17 @@ func NewDefaultClient() *DefaultClient {

// Id implements the [Client.Id] interface method.
func (c *DefaultClient) Id() string {
c.mux.RLock()
defer c.mux.RUnlock()

return c.id
}

// Channel implements the [Client.Channel] interface method.
func (c *DefaultClient) Channel() chan Message {
c.mux.RLock()
defer c.mux.RUnlock()

return c.channel
}

Expand Down Expand Up @@ -139,3 +156,19 @@ func (c *DefaultClient) Set(key string, value any) {

c.store[key] = value
}

// Discard implements the [Client.Discard] interface method.
func (c *DefaultClient) Discard() {
c.mux.Lock()
defer c.mux.Unlock()

c.isDiscarded = true
}

// IsDiscarded implements the [Client.IsDiscarded] interface method.
func (c *DefaultClient) IsDiscarded() bool {
c.mux.RLock()
defer c.mux.RUnlock()

return c.isDiscarded
}
14 changes: 14 additions & 0 deletions tools/subscriptions/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,17 @@ func TestSetAndGet(t *testing.T) {
t.Errorf("Expected 1, got %v", result)
}
}

func TestDiscard(t *testing.T) {
c := subscriptions.NewDefaultClient()

if v := c.IsDiscarded(); v {
t.Fatal("Expected false, got true")
}

c.Discard()

if v := c.IsDiscarded(); !v {
t.Fatal("Expected true, got false")
}
}

0 comments on commit 7001a22

Please sign in to comment.