diff --git a/.golangci.yml b/.golangci.yml index a5ea0cb9..8a8ea9d3 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -16,3 +16,4 @@ issues: linters: - gochecknoglobals - funlen + - godox diff --git a/docs/hub/config.md b/docs/hub/config.md index ac2b0314..ac0614c6 100644 --- a/docs/hub/config.md +++ b/docs/hub/config.md @@ -15,33 +15,34 @@ Most configuration parameters are hot reloaded: changes made to environment vari When using environment variables, list must be space separated. As flags parameters, they must be comma separated. -| Parameter | Description | -|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `acme_cert_dir` | the directory where to store Let's Encrypt certificates | -| `acme_hosts` | a list of hosts for which Let's Encrypt certificates must be issued | -| `acme_http01_addr` | the address used by the acme server to listen on (example: `0.0.0.0:8080`), defaults to `:http`. | -| `addr` | the address to listen on (example: `127.0.0.1:3000`, defaults to `:http` or `:https` depending if HTTPS is enabled or not). Note that Let's Encrypt only supports the default port: to use Let's Encrypt, **do not set this parameter**. | -| `allow_anonymous` | set to `1` to allow subscribers with no valid JWT to connect | -| `cert_file` | a cert file (to use a custom certificate) | -| `key_file` | a key file (to use a custom certificate) | -| `compress` | set to `0` to disable HTTP compression support, defaults to enabled | -| `cors_allowed_origins` | a list of allowed CORS origins, can be `*` for all | -| `debug` | set to `1` to enable the debug mode, **dangerous, don't enable in production** (logs updates' content, why an update is not send to a specific subscriber and recovery stack traces) | -| `demo` | set to `1` to enable the demo mode (automatically enabled when `debug=1`) | -| `heartbeat_interval` | interval between heartbeats (useful with some proxies, and old browsers), defaults to `15s`, set to `0s` to disable | -| `transport_url` | URL representation of the history database. Provided database are `null` to disabled history, `bolt` to use [bbolt](https://github.com/etcd-io/bbolt) (example `bolt:///var/run/mercure.db?size=100&cleanup_frequency=0.4`), defaults to `bolt://updates.db` | -| `jwt_key` | the JWT key to use for both publishers and subscribers | -| `jwt_algorithm` | the JWT verification algorithm to use for both publishers and subscribers, e.g. HS256 (default) or RS512 | -| `log_format` | the log format, can be `JSON`, `FLUENTD` or `TEXT` (default) | -| `publish_allowed_origins` | a list of origins allowed to publish (only applicable when using cookie-based auth) | -| `publisher_jwt_key` | must contain the secret key to valid publishers' JWT, can be omitted if `jwt_key` is set | -| `publisher_jwt_algorithm` | the JWT verification algorithm to use for publishers, e.g. HS256 (default) or RS512 | -| `read_timeout` | maximum duration for reading the entire request, including the body, set to `0s` to disable (default), example: `2m` | -| `subscriber_jwt_key` | must contain the secret key to valid subscribers' JWT, can be omitted if `jwt_key` is set | -| `subscriber_jwt_algorithm` | the JWT verification algorithm to use for subscribers, e.g. HS256 (default) or RS512 | -| `write_timeout` | maximum duration before timing out writes of the response, set to `0s` to disable (default), example: `2m` | -| `use_forwarded_headers` | set to `1` to use the `X-Forwarded-For`, and `X-Real-IP` for the remote (client) IP address, `X-Forwarded-Proto` or `X-Forwarded-Scheme` for the scheme (http or https), `X-Forwarded-Host` for the host and the RFC 7239 `Forwarded` header, which may include both client IPs and schemes. If this option is enabled, the reverse proxy must override or remove these headers or you will be at risk | - +| Parameter | Description | +|----------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `acme_cert_dir` | the directory where to store Let's Encrypt certificates | +| `acme_hosts` | a list of hosts for which Let's Encrypt certificates must be issued | +| `acme_http01_addr` | the address used by the acme server to listen on (example: `0.0.0.0:8080`), defaults to `:http`. | +| `addr` | the address to listen on (example: `127.0.0.1:3000`, defaults to `:http` or `:https` depending if HTTPS is enabled or not). Note that Let's Encrypt only supports the default port: to use Let's Encrypt, **do not set this parameter**. | +| `allow_anonymous` | set to `true` to allow subscribers with no valid JWT to connect | +| `cert_file` | a cert file (to use a custom certificate) | +| `key_file` | a key file (to use a custom certificate) | +| `compress` | set to `false` to disable HTTP compression support, defaults to enabled | +| `cors_allowed_origins` | a list of allowed CORS origins, can be `*` for all | +| `debug` | set to `true` to enable the debug mode, **dangerous, don't enable in production** (logs updates' content, why an update is not send to a specific subscriber and recovery stack traces) | +| `demo` | set to `true` to enable the demo mode (automatically enabled when `debug=true`) | +| `heartbeat_interval` | interval between heartbeats (useful with some proxies, and old browsers), defaults to `15s`, set to `0s` to disable | +| `transport_url` | URL representation of the history database. Provided database are `null` to disabled history, `bolt` to use [bbolt](https://github.com/etcd-io/bbolt) (example `bolt:///var/run/mercure.db?size=100&cleanup_frequency=0.4`), defaults to `bolt://updates.db` | +| `jwt_key` | the JWT key to use for both publishers and subscribers | +| `jwt_algorithm` | the JWT verification algorithm to use for both publishers and subscribers, e.g. HS256 (default) or RS512 | +| `log_format` | the log format, can be `JSON`, `FLUENTD` or `TEXT` (default) | +| `publish_allowed_origins` | a list of origins allowed to publish (only applicable when using cookie-based auth) | +| `publisher_jwt_key` | must contain the secret key to valid publishers' JWT, can be omitted if `jwt_key` is set | +| `publisher_jwt_algorithm` | the JWT verification algorithm to use for publishers, e.g. HS256 (default) or RS512 | +| `read_timeout` | maximum duration for reading the entire request, including the body, set to `0s` to disable (default), example: `2m` | +| `subscriber_jwt_key` | must contain the secret key to valid subscribers' JWT, can be omitted if `jwt_key` is set | +| `subscriber_jwt_algorithm` | the JWT verification algorithm to use for subscribers, e.g. HS256 (default) or RS512 | +| `write_timeout` | maximum duration before timing out writes of the response, set to `0s` to disable (default), example: `2m` | +| `use_forwarded_headers` | set to `true` to use the `X-Forwarded-For`, and `X-Real-IP` for the remote (client) IP address, `X-Forwarded-Proto` or `X-Forwarded-Scheme` for the scheme (http or https), `X-Forwarded-Host` for the host and the RFC 7239 `Forwarded` header, which may include both client IPs and schemes. If this option is enabled, the reverse proxy must override or remove these headers or you will be at risk | +| `dispatch_subscriptions` | set to `true` to dispatch updates when a subscription between the Hub and a subscriber is established or closed. The topic follows the template `https://mercure.rocks/subscriptions/{subscriptionID}`. To receive connection updates, subscribers must have `https://mercure.rocks/targets/subscriptions` or an URL matching the template `https://mercure.rocks/targets/subscriptions/{topic}` (`{topic}` is URL-encoded topic of the subscription) as targets | +| `subscriptions_include_ip` | set to `true` to include the subscriber's IP in the subscription update | If `acme_hosts` or both `cert_file` and `key_file` are provided, an HTTPS server supporting HTTP/2 connection will be started. If not, an HTTP server will be started (**not secure**). diff --git a/hub/config.go b/hub/config.go index 7f43dfb6..d436af39 100644 --- a/hub/config.go +++ b/hub/config.go @@ -23,6 +23,8 @@ func SetConfigDefaults(v *viper.Viper) { v.SetDefault("compress", false) v.SetDefault("use_forwarded_headers", false) v.SetDefault("demo", false) + v.SetDefault("dispatch_subscriptions", false) + v.SetDefault("subscriptions_include_ip", false) } // ValidateConfig validates a Viper instance @@ -64,6 +66,8 @@ func SetFlags(fs *pflag.FlagSet, v *viper.Viper) { fs.BoolP("use-forwarded-headers", "f", false, "enable headers forwarding") fs.BoolP("demo", "D", false, "enable the demo mode") fs.StringP("log-format", "l", "", "the log format (JSON, FLUENTD or TEXT)") + fs.BoolP("dispatch-subscriptions", "s", false, "dispatch updates when subscriptions are created or terminated") + fs.BoolP("subscriptions-include-ip", "I", false, "include the IP address of the subscriber in the subscription update") fs.VisitAll(func(f *pflag.Flag) { v.BindPFlag(strings.ReplaceAll(f.Name, "-", "_"), fs.Lookup(f.Name)) diff --git a/hub/config_test.go b/hub/config_test.go index cdb0dd3c..0f4de3a8 100644 --- a/hub/config_test.go +++ b/hub/config_test.go @@ -37,7 +37,7 @@ func TestSetFlags(t *testing.T) { fs := pflag.NewFlagSet("test", pflag.PanicOnError) SetFlags(fs, v) - assert.Subset(t, []string{"cert_file", "compress", "demo", "jwt_algorithm", "transport_url", "acme_hosts", "acme_cert_dir", "subscriber_jwt_key", "log_format", "jwt_key", "allow_anonymous", "debug", "read_timeout", "publisher_jwt_algorithm", "write_timeout", "key_file", "use_forwarded_headers", "subscriber_jwt_algorithm", "addr", "publisher_jwt_key", "heartbeat_interval", "cors_allowed_origins", "publish_allowed_origins"}, v.AllKeys()) + assert.Subset(t, v.AllKeys(), []string{"cert_file", "compress", "demo", "jwt_algorithm", "transport_url", "acme_hosts", "acme_cert_dir", "subscriber_jwt_key", "log_format", "jwt_key", "allow_anonymous", "debug", "read_timeout", "publisher_jwt_algorithm", "write_timeout", "key_file", "use_forwarded_headers", "subscriber_jwt_algorithm", "addr", "publisher_jwt_key", "heartbeat_interval", "cors_allowed_origins", "publish_allowed_origins", "dispatch_subscriptions", "subscriptions_include_ip"}) } func TestInitConfig(t *testing.T) { diff --git a/hub/subscribe.go b/hub/subscribe.go index 85699539..d36ed799 100644 --- a/hub/subscribe.go +++ b/hub/subscribe.go @@ -2,16 +2,29 @@ package hub import ( "context" + "encoding/json" "fmt" "io" + "net" "net/http" + "net/url" "strings" "time" + "github.com/gofrs/uuid" log "github.com/sirupsen/logrus" "github.com/yosida95/uritemplate" ) +type subscription struct { + ID string `json:"@id"` + Type string `json:"@type"` + Topic string `json:"topic"` + Active bool `json:"active"` + mercureClaim + Address string `json:"address,omitempty"` +} + // SubscribeHandler create a keep alive connection and send the events to the subscribers func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) { f, ok := w.(http.Flusher) @@ -80,39 +93,57 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri } fields["subscriber_topics"] = topics - var rawTopics = make([]string, 0, len(topics)) - var templateTopics = make([]*uritemplate.Template, 0, len(topics)) - for _, topic := range topics { - if tpl := h.getURITemplate(topic); tpl == nil { - rawTopics = append(rawTopics, topic) - } else { - templateTopics = append(templateTopics, tpl) - } - } + rawTopics, templateTopics := h.parseTopics(topics) authorizedAlltargets, authorizedTargets := authorizedTargets(claims, false) subscriber := NewSubscriber(authorizedAlltargets, authorizedTargets, topics, rawTopics, templateTopics, retrieveLastEventID(r)) + encodedTopics := escapeTopics(topics) + + // Connection events must be sent before creating the pipe to prevent a deadlock + connectionID := uuid.Must(uuid.NewV4()).String() + var address string + if h.config.GetBool("subscriptions_include_ip") { + address, _, _ = net.SplitHostPort(r.RemoteAddr) + } + h.dispatchSubscriptionUpdate(topics, encodedTopics, connectionID, claims, true, address) pipe, err := h.transport.CreatePipe(subscriber.LastEventID) if err != nil { http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + h.dispatchSubscriptionUpdate(topics, encodedTopics, connectionID, claims, false, address) log.WithFields(fields).Error(err) return nil, nil, false } - sendHeaders(w) + log.WithFields(fields).Info("New subscriber") // Listen to the closing of the http connection via the Request's Context go func() { <-r.Context().Done() pipe.Close() + + h.dispatchSubscriptionUpdate(topics, encodedTopics, connectionID, claims, false, address) log.WithFields(fields).Info("Subscriber disconnected") }() return subscriber, pipe, true } +func (h *Hub) parseTopics(topics []string) (rawTopics []string, templateTopics []*uritemplate.Template) { + rawTopics = make([]string, 0, len(topics)) + templateTopics = make([]*uritemplate.Template, 0, len(topics)) + for _, topic := range topics { + if tpl := h.getURITemplate(topic); tpl == nil { + rawTopics = append(rawTopics, topic) + } else { + templateTopics = append(templateTopics, tpl) + } + } + + return rawTopics, templateTopics +} + // getURITemplate retrieves or creates the uritemplate.Template associated with this topic, or nil if it's not a template func (h *Hub) getURITemplate(topic string) *uritemplate.Template { var tpl *uritemplate.Template @@ -202,3 +233,53 @@ func (h *Hub) cleanup(s *Subscriber) { } h.uriTemplates.Unlock() } + +func (h *Hub) dispatchSubscriptionUpdate(topics, encodedTopics []string, connectionID string, claims *claims, active bool, address string) { + if !h.config.GetBool("dispatch_subscriptions") { + return + } + + for k, topic := range topics { + connection := &subscription{ + ID: "https://mercure.rocks/subscriptions/" + encodedTopics[k] + "/" + connectionID, + Type: "https://mercure.rocks/Subscription", + Topic: topic, + Active: active, + Address: address, + } + + if claims == nil { + connection.mercureClaim.Publish = []string{} + connection.mercureClaim.Subscribe = []string{} + } else { + if connection.mercureClaim.Publish == nil { + connection.mercureClaim.Publish = []string{} + } + if connection.mercureClaim.Subscribe == nil { + connection.mercureClaim.Subscribe = []string{} + } + } + + json, err := json.MarshalIndent(connection, "", " ") + if err != nil { + panic(err) + } + + u := &Update{ + Topics: []string{connection.ID}, + Targets: map[string]struct{}{"https://mercure.rocks/targets/subscriptions": {}, "https://mercure.rocks/targets/subscriptions/" + encodedTopics[k]: {}}, + Event: Event{Data: string(json), ID: uuid.Must(uuid.NewV4()).String()}, + } + + h.transport.Write(u) + } +} + +func escapeTopics(topics []string) []string { + encodedTopics := make([]string, 0, len(topics)) + for _, topic := range topics { + encodedTopics = append(encodedTopics, url.QueryEscape(topic)) + } + + return encodedTopics +} diff --git a/hub/subscribe_test.go b/hub/subscribe_test.go index 0af91119..c0c2c2b4 100644 --- a/hub/subscribe_test.go +++ b/hub/subscribe_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "net/url" "os" + "strings" "sync" "testing" "time" @@ -48,6 +49,9 @@ func (rt *responseTester) Write(buf []byte) (int, error) { if rt.body == rt.expectedBody { rt.cancel() + } else if !strings.HasPrefix(rt.expectedBody, rt.body) { + rt.t.Errorf(`Received body "%s" doesn't match expected body "%s"`, rt.body, rt.expectedBody) + rt.cancel() } return len(buf), nil @@ -244,7 +248,7 @@ func testSubscribe(numberOfSubscribers int, t *testing.T) { } func TestSubscribe(t *testing.T) { - log.SetLevel(log.DebugLevel) + log.SetLevel(log.ErrorLevel) testSubscribe(3, t) } @@ -329,6 +333,90 @@ func TestSubscribeTarget(t *testing.T) { hub.Stop() } +func TestSubscriptionEvents(t *testing.T) { + hub := createDummy() + hub.config.Set("dispatch_subscriptions", true) + hub.config.Set("subscriptions_include_ip", true) + + var wg sync.WaitGroup + ctx1, cancel1 := context.WithCancel(context.Background()) + ctx2, cancel2 := context.WithCancel(context.Background()) + wg.Add(3) + go func() { + // Authorized to receive connection events + defer wg.Done() + req := httptest.NewRequest("GET", defaultHubURL+"?topic=https://mercure.rocks/subscriptions/{topic}/{connectionID}", nil).WithContext(ctx1) + req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{"https://mercure.rocks/targets/subscriptions"})}) + w := httptest.NewRecorder() + hub.SubscribeHandler(w, req) + + resp := w.Result() + defer resp.Body.Close() + body, _ := ioutil.ReadAll(resp.Body) + + assert.Equal(t, http.StatusOK, resp.StatusCode) + bodyContent := string(body) + assert.Contains(t, bodyContent, `data: "@id": "https://mercure.rocks/subscriptions/https%3A%2F%2Fexample.com/`) + assert.Contains(t, bodyContent, `data: "@type": "https://mercure.rocks/Subscription",`) + assert.Contains(t, bodyContent, `data: "topic": "https://example.com",`) + assert.Contains(t, bodyContent, `data: "publish": [],`) + assert.Contains(t, bodyContent, `data: "subscribe": []`) + assert.Contains(t, bodyContent, `data: "active": true,`) + assert.Contains(t, bodyContent, `data: "active": false,`) + assert.Contains(t, bodyContent, `data: "address": "`) + }() + + go func() { + // Not authorized to receive connection events + defer wg.Done() + req := httptest.NewRequest("GET", defaultHubURL+"?topic=https://mercure.rocks/subscriptions/{topic}/{connectionID}", nil).WithContext(ctx2) + req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{})}) + w := httptest.NewRecorder() + hub.SubscribeHandler(w, req) + + resp := w.Result() + defer resp.Body.Close() + body, _ := ioutil.ReadAll(resp.Body) + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, ":\n", string(body)) + }() + + go func() { + defer wg.Done() + + s, _ := hub.transport.(*LocalTransport) + for { + s.RLock() + ready := len(s.pipes) == 2 + s.RUnlock() + + log.Info("Waiting for subscriber...") + if ready { + break + } + } + + ctx, cancelRequest2 := context.WithCancel(context.Background()) + req := httptest.NewRequest("GET", defaultHubURL+"?topic=https://example.com", nil).WithContext(ctx) + req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{})}) + + w := &responseTester{ + expectedStatusCode: http.StatusOK, + expectedBody: ":\n", + t: t, + cancel: cancelRequest2, + } + hub.SubscribeHandler(w, req) + time.Sleep(1 * time.Second) // TODO: find a better way to wait for the disconnection update to be dispatched + cancel2() + cancel1() + }() + + wg.Wait() + hub.Stop() +} + func TestSubscribeAllTargets(t *testing.T) { hub := createDummy() s, _ := hub.transport.(*LocalTransport) diff --git a/spec/mercure.md b/spec/mercure.md index c62a7bfd..b8b1db56 100644 --- a/spec/mercure.md +++ b/spec/mercure.md @@ -19,9 +19,8 @@ organization = "Les-Tilleuls.coop" email = "kevin@les-tilleuls.coop" [author.address.postal] city = "Lille" - street = "2 rue Hegel" - code = "59000" - postalline= ["Bâtiment Canal"] + street = "82 rue Winston Churchill" + code = "59160" country = "France" %%% @@ -195,18 +194,18 @@ following data: alternate URLs. The hub **MUST** dispatch this update to subscribers that are subscribed to both canonical or alternate URLs. - * `data`: The content of the new version of this topic. + * `data`: the content of the new version of this topic. - * `target` (optional): Target audience of this update. This key can be present several times. See + * `target` (optional): target audience of this update. This key can be present several times. See section #Authorization for further information. - * `id` (optional): The topic's revision identifier: it will be used as the SSE's `id` property. - If omitted, the hub **MUST** generate a valid globally unique id. It **MAY** be a UUID. Even if - provided, the hub **MAY** ignore the id provided by the client and generate its own id. + * `id` (optional): the topic's revision identifier: it will be used as the SSE's `id` property. If + omitted, the hub **MUST** generate a valid globally unique id. It **MAY** be a UUID [@RFC4122]. + Even if provided, the hub **MAY** ignore the id provided by the client and generate its own id. - * `type` (optional): The SSE's `event` property (a specific event type). + * `type` (optional): the SSE's `event` property (a specific event type). - * `retry` (optional): The SSE's `retry` property (the reconnection time). + * `retry` (optional): the SSE's `retry` property (the reconnection time). In the event of success, the HTTP response's body **MUST** be the `id` associated to this update generated by the hub and a success HTTP status code **MUST** be returned. The publisher **MUST** be @@ -298,7 +297,7 @@ authorized to receive updates destined for all targets. To allow re-establishment in case of connection lost, events dispatched by the hub **SHOULD** include an `id` property. The value contained in this `id` property **SHOULD** be a globally unique -identifier. To do so, a UUID [@!RFC4122] **MAY** be used. +identifier. To do so, a UUID [@RFC4122] **MAY** be used. According to the server-sent events specification, in case of connection lost the subscriber will try to automatically re-connect. During the @@ -324,6 +323,45 @@ after a long disconnection time). The hub **MAY** also specify the reconnection time using the `retry` key, as specified in the server-sent events format. +# Subscription Events + +The hub **MAY** publish an update when a subscription to a topic is created or terminated. If this +feature is implemented by the hub, an update **MUST** be dispatched every time that a subscription +is created or terminated, and for each topic to which the client subscribes. + +The topic of this update **MUST** follow the pattern +`https://mercure.rocks/subscriptions/{topic}/{subscriptionID}` where `topic` is the URL-encoded +value of the subscribed topic and `subscriptionID` is an unique identifier for this subscription. +`subscriptionID` **MAY** be a UUID [@RFC4122]. + +The content of the update **MUST** be a JSON-LD [@!W3C.REC-json-ld-20140116] document containing at +least the following properties: + + * `@id`: the identifier of this update, it **MUST** be the same value as the subscription update's + topic + + * `@type`: the fixed value `https://mercure.rocks/Subscription` + + * `topic`: the topic to which the subscription refers + + * `active`: `true` when the subscription is created, and `false` when it is terminated + + * `subscribe`: the subscription targets provided by the subscriber (see section #Authorization) + + * `publish`: the publication targets provided by the subscriber (see section #Authorization) + + * `address` (optional): the IP address ([@!RFC791], [@!RFC8200]) of the subscriber + +The JSON-LD document **MAY** contain other properties. + +In order to only allow authorized subscribers to receive subscription events, the subscription +update **MUST** be marked as intended for subscribers providing the following targets: + + * the fixed value `https://mercure.rocks/targets/subscriptions` + + * a URL following the pattern `https://mercure.rocks/targets/subscriptions/{topic}` where topic is + the URL-encoded value of the subscribed topic + # Encryption Using HTTPS does not prevent the hub from accessing the update's content. Depending of the intended