Skip to content

Commit

Permalink
[pocketbase#468] added additional realtime events
Browse files Browse the repository at this point in the history
  • Loading branch information
ganigeorgiev committed Dec 2, 2022
1 parent 98cc8e2 commit 23fbfab
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 13 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
```go
app.OnBeforeBootstrap()
app.OnAfterBootstrap()
app.OnRealtimeDisconnectRequest()
app.OnRealtimeBeforeMessageSend()
app.OnRealtimeAfterMessageSend()
```

- Refactored the `migrate` command to support **external JavaScript migration files** using an embedded JS interpreter ([goja](https://github.com/dop251/goja)).
Expand Down
70 changes: 58 additions & 12 deletions apis/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ func (api *realtimeApi) connect(c echo.Context) error {
// register new subscription client
client := subscriptions.NewDefaultClient()
api.app.SubscriptionsBroker().Register(client)
defer api.app.SubscriptionsBroker().Unregister(client.Id())
defer func() {
api.app.OnRealtimeDisconnectRequest().Trigger(&core.RealtimeDisconnectEvent{
HttpContext: c,
Client: client,
})

api.app.SubscriptionsBroker().Unregister(client.Id())
}()

c.Response().Header().Set("Content-Type", "text/event-stream; charset=UTF-8")
c.Response().Header().Set("Cache-Control", "no-store")
Expand All @@ -51,12 +58,12 @@ func (api *realtimeApi) connect(c echo.Context) error {
// https://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
c.Response().Header().Set("X-Accel-Buffering", "no")

event := &core.RealtimeConnectEvent{
connectEvent := &core.RealtimeConnectEvent{
HttpContext: c,
Client: client,
}

if err := api.app.OnRealtimeConnectRequest().Trigger(event); err != nil {
if err := api.app.OnRealtimeConnectRequest().Trigger(connectEvent); err != nil {
return err
}

Expand All @@ -65,10 +72,31 @@ func (api *realtimeApi) connect(c echo.Context) error {
}

// signalize established connection (aka. fire "connect" message)
fmt.Fprint(c.Response(), "id:"+client.Id()+"\n")
fmt.Fprint(c.Response(), "event:PB_CONNECT\n")
fmt.Fprint(c.Response(), "data:{\"clientId\":\""+client.Id()+"\"}\n\n")
c.Response().Flush()
connectMsgEvent := &core.RealtimeMessageEvent{
HttpContext: c,
Client: client,
Message: &subscriptions.Message{
Name: "PB_CONNECT",
Data: `{"clientId":"` + client.Id() + `"}`,
},
}
connectMsgErr := api.app.OnRealtimeBeforeMessageSend().Trigger(connectMsgEvent, func(e *core.RealtimeMessageEvent) error {
w := e.HttpContext.Response()
fmt.Fprint(w, "id:"+client.Id()+"\n")
fmt.Fprint(w, "event:"+e.Message.Name+"\n")
fmt.Fprint(w, "data:"+e.Message.Data+"\n\n")
w.Flush()
return nil
})
if connectMsgErr != nil {
if api.app.IsDebug() {
log.Println("Realtime connection closed (failed to deliver PB_CONNECT):", client.Id(), connectMsgErr)
}
return nil
}
if err := api.app.OnRealtimeAfterMessageSend().Trigger(connectMsgEvent); err != nil && api.app.IsDebug() {
log.Println("OnRealtimeAfterMessageSend PB_CONNECT error:", err)
}

// start an idle timer to keep track of inactive/forgotten connections
idleDuration := 5 * time.Minute
Expand All @@ -88,11 +116,29 @@ func (api *realtimeApi) connect(c echo.Context) error {
return nil
}

w := c.Response()
fmt.Fprint(w, "id:"+client.Id()+"\n")
fmt.Fprint(w, "event:"+msg.Name+"\n")
fmt.Fprint(w, "data:"+msg.Data+"\n\n")
w.Flush()
msgEvent := &core.RealtimeMessageEvent{
HttpContext: c,
Client: client,
Message: &msg,
}
msgErr := api.app.OnRealtimeBeforeMessageSend().Trigger(msgEvent, func(e *core.RealtimeMessageEvent) error {
w := e.HttpContext.Response()
fmt.Fprint(w, "id:"+e.Client.Id()+"\n")
fmt.Fprint(w, "event:"+e.Message.Name+"\n")
fmt.Fprint(w, "data:"+e.Message.Data+"\n\n")
w.Flush()
return nil
})
if msgErr != nil {
if api.app.IsDebug() {
log.Println("Realtime connection closed (failed to deliver message):", client.Id(), msgErr)
}
return nil
}

if err := api.app.OnRealtimeAfterMessageSend().Trigger(msgEvent); err != nil && api.app.IsDebug() {
log.Println("OnRealtimeAfterMessageSend error:", err)
}

idleTimer.Stop()
idleTimer.Reset(idleDuration)
Expand Down
53 changes: 52 additions & 1 deletion apis/realtime_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package apis_test

import (
"errors"
"net/http"
"strings"
"testing"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/pocketbase/pocketbase/core"
"github.com/pocketbase/pocketbase/models"
"github.com/pocketbase/pocketbase/tests"
"github.com/pocketbase/pocketbase/tools/hook"
"github.com/pocketbase/pocketbase/tools/subscriptions"
)

Expand All @@ -25,7 +27,56 @@ func TestRealtimeConnect(t *testing.T) {
`data:{"clientId":`,
},
ExpectedEvents: map[string]int{
"OnRealtimeConnectRequest": 1,
"OnRealtimeConnectRequest": 1,
"OnRealtimeBeforeMessageSend": 1,
"OnRealtimeAfterMessageSend": 1,
"OnRealtimeDisconnectRequest": 1,
},
AfterTestFunc: func(t *testing.T, app *tests.TestApp, e *echo.Echo) {
if len(app.SubscriptionsBroker().Clients()) != 0 {
t.Errorf("Expected the subscribers to be removed after connection close, found %d", len(app.SubscriptionsBroker().Clients()))
}
},
},
{
Name: "PB_CONNECT interrupt",
Method: http.MethodGet,
Url: "/api/realtime",
ExpectedStatus: 200,
ExpectedEvents: map[string]int{
"OnRealtimeConnectRequest": 1,
"OnRealtimeBeforeMessageSend": 1,
"OnRealtimeDisconnectRequest": 1,
},
BeforeTestFunc: func(t *testing.T, app *tests.TestApp, e *echo.Echo) {
app.OnRealtimeBeforeMessageSend().Add(func(e *core.RealtimeMessageEvent) error {
if e.Message.Name == "PB_CONNECT" {
return errors.New("PB_CONNECT error")
}
return nil
})
},
AfterTestFunc: func(t *testing.T, app *tests.TestApp, e *echo.Echo) {
if len(app.SubscriptionsBroker().Clients()) != 0 {
t.Errorf("Expected the subscribers to be removed after connection close, found %d", len(app.SubscriptionsBroker().Clients()))
}
},
},
{
Name: "Skipping/ignoring messages",
Method: http.MethodGet,
Url: "/api/realtime",
ExpectedStatus: 200,
ExpectedEvents: map[string]int{
"OnRealtimeConnectRequest": 1,
"OnRealtimeBeforeMessageSend": 1,
"OnRealtimeAfterMessageSend": 1,
"OnRealtimeDisconnectRequest": 1,
},
BeforeTestFunc: func(t *testing.T, app *tests.TestApp, e *echo.Echo) {
app.OnRealtimeBeforeMessageSend().Add(func(e *core.RealtimeMessageEvent) error {
return hook.StopPropagation
})
},
AfterTestFunc: func(t *testing.T, app *tests.TestApp, e *echo.Echo) {
if len(app.SubscriptionsBroker().Clients()) != 0 {
Expand Down
15 changes: 15 additions & 0 deletions core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,21 @@ type App interface {
// the SSE client connection.
OnRealtimeConnectRequest() *hook.Hook[*RealtimeConnectEvent]

// OnRealtimeDisconnectRequest hook is triggered on disconnected/interrupted
// SSE client connection.
OnRealtimeDisconnectRequest() *hook.Hook[*RealtimeDisconnectEvent]

// OnRealtimeBeforeMessage hook is triggered right before sending
// an SSE message to a client.
//
// Returning [hook.StopPropagation] will prevent sending the message.
// Returning any other non-nil error will close the realtime connection.
OnRealtimeBeforeMessageSend() *hook.Hook[*RealtimeMessageEvent]

// OnRealtimeBeforeMessage hook is triggered right after sending
// an SSE message to a client.
OnRealtimeAfterMessageSend() *hook.Hook[*RealtimeMessageEvent]

// OnRealtimeBeforeSubscribeRequest hook is triggered before changing
// the client subscriptions, allowing you to further validate and
// modify the submitted change.
Expand Down
18 changes: 18 additions & 0 deletions core/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type BaseApp struct {

// realtime api event hooks
onRealtimeConnectRequest *hook.Hook[*RealtimeConnectEvent]
onRealtimeDisconnectRequest *hook.Hook[*RealtimeDisconnectEvent]
onRealtimeBeforeMessageSend *hook.Hook[*RealtimeMessageEvent]
onRealtimeAfterMessageSend *hook.Hook[*RealtimeMessageEvent]
onRealtimeBeforeSubscribeRequest *hook.Hook[*RealtimeSubscribeEvent]
onRealtimeAfterSubscribeRequest *hook.Hook[*RealtimeSubscribeEvent]

Expand Down Expand Up @@ -153,6 +156,9 @@ func NewBaseApp(dataDir string, encryptionEnv string, isDebug bool) *BaseApp {

// realtime API event hooks
onRealtimeConnectRequest: &hook.Hook[*RealtimeConnectEvent]{},
onRealtimeDisconnectRequest: &hook.Hook[*RealtimeDisconnectEvent]{},
onRealtimeBeforeMessageSend: &hook.Hook[*RealtimeMessageEvent]{},
onRealtimeAfterMessageSend: &hook.Hook[*RealtimeMessageEvent]{},
onRealtimeBeforeSubscribeRequest: &hook.Hook[*RealtimeSubscribeEvent]{},
onRealtimeAfterSubscribeRequest: &hook.Hook[*RealtimeSubscribeEvent]{},

Expand Down Expand Up @@ -471,6 +477,18 @@ func (app *BaseApp) OnRealtimeConnectRequest() *hook.Hook[*RealtimeConnectEvent]
return app.onRealtimeConnectRequest
}

func (app *BaseApp) OnRealtimeDisconnectRequest() *hook.Hook[*RealtimeDisconnectEvent] {
return app.onRealtimeDisconnectRequest
}

func (app *BaseApp) OnRealtimeBeforeMessageSend() *hook.Hook[*RealtimeMessageEvent] {
return app.onRealtimeBeforeMessageSend
}

func (app *BaseApp) OnRealtimeAfterMessageSend() *hook.Hook[*RealtimeMessageEvent] {
return app.onRealtimeAfterMessageSend
}

func (app *BaseApp) OnRealtimeBeforeSubscribeRequest() *hook.Hook[*RealtimeSubscribeEvent] {
return app.onRealtimeBeforeSubscribeRequest
}
Expand Down
11 changes: 11 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ type RealtimeConnectEvent struct {
Client subscriptions.Client
}

type RealtimeDisconnectEvent struct {
HttpContext echo.Context
Client subscriptions.Client
}

type RealtimeMessageEvent struct {
HttpContext echo.Context
Client subscriptions.Client
Message *subscriptions.Message
}

type RealtimeSubscribeEvent struct {
HttpContext echo.Context
Client subscriptions.Client
Expand Down
15 changes: 15 additions & 0 deletions tests/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,21 @@ func NewTestApp(optTestDataDir ...string) (*TestApp, error) {
return nil
})

t.OnRealtimeDisconnectRequest().Add(func(e *core.RealtimeDisconnectEvent) error {
t.EventCalls["OnRealtimeDisconnectRequest"]++
return nil
})

t.OnRealtimeBeforeMessageSend().Add(func(e *core.RealtimeMessageEvent) error {
t.EventCalls["OnRealtimeBeforeMessageSend"]++
return nil
})

t.OnRealtimeAfterMessageSend().Add(func(e *core.RealtimeMessageEvent) error {
t.EventCalls["OnRealtimeAfterMessageSend"]++
return nil
})

t.OnRealtimeBeforeSubscribeRequest().Add(func(e *core.RealtimeSubscribeEvent) error {
t.EventCalls["OnRealtimeBeforeSubscribeRequest"]++
return nil
Expand Down

0 comments on commit 23fbfab

Please sign in to comment.