Skip to content

Commit

Permalink
add support for different subscription modes
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jun 2, 2017
1 parent 7237730 commit c187664
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 52 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

### Features

- [#1413](https://github.com/influxdata/kapacitor/issues/1413): Add subscriptions modes to InfluxDB subscriptions.

### Bugfixes
- [#1400](https://github.com/influxdata/kapacitor/issues/1400): Allow for `.yml` file extensions in `define-topic-handler`

- [#1400](https://github.com/influxdata/kapacitor/issues/1400): Allow for `.yml` file extensions in `define-topic-handler`
- [#1402](https://github.com/influxdata/kapacitor/pull/1402): Fix http server error logging.

## v1.3.1 [2017-06-02]
Expand Down
12 changes: 11 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (s *Server) appendInfluxDBService() error {
if err != nil {
return errors.Wrap(err, "failed to get http port")
}
srv, err := influxdb.NewService(c, httpPort, s.config.Hostname, s.config.HTTP.AuthEnabled, l)
srv, err := influxdb.NewService(c, httpPort, s.config.Hostname, varsIDer{}, s.config.HTTP.AuthEnabled, l)
if err != nil {
return err
}
Expand Down Expand Up @@ -1037,3 +1037,13 @@ func (qe *Queryexecutor) Authorize(u *meta.UserInfo, q *influxql.Query, db strin
func (qe *Queryexecutor) ExecuteQuery(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
return nil, errors.New("cannot execute queries against Kapacitor")
}

type varsIDer struct {
}

func (v varsIDer) ClusterID() uuid.UUID {
return vars.ClusterIDVar.UUIDValue()
}
func (v varsIDer) ServerID() uuid.UUID {
return vars.ServerIDVar.UUIDValue()
}
11 changes: 11 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5522,6 +5522,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"ssl-key": "",
"startup-timeout": "1h0m0s",
"subscription-protocol": "http",
"subscription-mode": "cluster",
"subscriptions": nil,
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
Expand Down Expand Up @@ -5553,6 +5554,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"ssl-key": "",
"startup-timeout": "1h0m0s",
"subscription-protocol": "http",
"subscription-mode": "cluster",
"subscriptions": nil,
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
Expand Down Expand Up @@ -5594,6 +5596,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"ssl-key": "",
"startup-timeout": "1h0m0s",
"subscription-protocol": "http",
"subscription-mode": "cluster",
"subscriptions": nil,
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
Expand Down Expand Up @@ -5625,6 +5628,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"ssl-key": "",
"startup-timeout": "1h0m0s",
"subscription-protocol": "http",
"subscription-mode": "cluster",
"subscriptions": nil,
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
Expand Down Expand Up @@ -5667,6 +5671,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"ssl-key": "",
"startup-timeout": "1h0m0s",
"subscription-protocol": "https",
"subscription-mode": "cluster",
"subscriptions": map[string]interface{}{"_internal": []interface{}{"monitor"}},
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
Expand Down Expand Up @@ -5698,6 +5703,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"ssl-key": "",
"startup-timeout": "1h0m0s",
"subscription-protocol": "https",
"subscription-mode": "cluster",
"subscriptions": map[string]interface{}{"_internal": []interface{}{"monitor"}},
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
Expand Down Expand Up @@ -5736,6 +5742,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"ssl-key": "",
"startup-timeout": "1h0m0s",
"subscription-protocol": "https",
"subscription-mode": "cluster",
"subscriptions": map[string]interface{}{"_internal": []interface{}{"monitor"}},
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
Expand Down Expand Up @@ -5767,6 +5774,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"ssl-key": "",
"startup-timeout": "1h0m0s",
"subscription-protocol": "https",
"subscription-mode": "cluster",
"subscriptions": map[string]interface{}{"_internal": []interface{}{"monitor"}},
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
Expand Down Expand Up @@ -5809,6 +5817,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"ssl-key": "",
"startup-timeout": "1h0m0s",
"subscription-protocol": "https",
"subscription-mode": "cluster",
"subscriptions": map[string]interface{}{"_internal": []interface{}{"monitor"}},
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
Expand Down Expand Up @@ -5839,6 +5848,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"ssl-key": "",
"startup-timeout": "5m0s",
"subscription-protocol": "http",
"subscription-mode": "cluster",
"subscriptions": nil,
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
Expand Down Expand Up @@ -5872,6 +5882,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"startup-timeout": "5m0s",
"subscription-protocol": "http",
"subscriptions": nil,
"subscription-mode": "cluster",
"subscriptions-sync-interval": "1m0s",
"timeout": "0s",
"udp-bind": "",
Expand Down
33 changes: 33 additions & 0 deletions services/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ const (
DefaultSubscriptionProtocol = "http"
)

type SubscriptionMode int

const (
// ClusterMode means that there will be one set of subscriptions per cluster.
ClusterMode SubscriptionMode = iota
// ServerMode means that there will be one set of subscriptions per server.
ServerMode
)

type Config struct {
Enabled bool `toml:"enabled" override:"enabled"`
Name string `toml:"name" override:"name"`
Expand All @@ -39,6 +48,7 @@ type Config struct {
Timeout toml.Duration `toml:"timeout" override:"timeout"`
DisableSubscriptions bool `toml:"disable-subscriptions" override:"disable-subscriptions"`
SubscriptionProtocol string `toml:"subscription-protocol" override:"subscription-protocol"`
SubscriptionMode SubscriptionMode `toml:"subscription-mode" override:"subscription-mode"`
Subscriptions map[string][]string `toml:"subscriptions" override:"subscriptions"`
ExcludedSubscriptions map[string][]string `toml:"excluded-subscriptions" override:"excluded-subscriptions"`
KapacitorHostname string `toml:"kapacitor-hostname" override:"kapacitor-hostname"`
Expand Down Expand Up @@ -67,6 +77,7 @@ func (c *Config) Init() {
c.StartUpTimeout = toml.Duration(DefaultStartUpTimeout)
c.SubscriptionProtocol = DefaultSubscriptionProtocol
c.SubscriptionSyncInterval = toml.Duration(DefaultSubscriptionSyncInterval)
c.SubscriptionMode = ClusterMode
}

func (c *Config) ApplyConditionalDefaults() {
Expand Down Expand Up @@ -113,3 +124,25 @@ func (c Config) Validate() error {
}
return nil
}

func (m SubscriptionMode) MarshalText() ([]byte, error) {
switch m {
case ClusterMode:
return []byte("cluster"), nil
case ServerMode:
return []byte("server"), nil
default:
return nil, fmt.Errorf("unknown subscription mode %q", m)
}
}
func (m *SubscriptionMode) UnmarshalText(text []byte) error {
switch s := string(text); s {
case "cluster":
*m = ClusterMode
case "server":
*m = ServerMode
default:
return fmt.Errorf("unknown subscription mode %q", s)
}
return nil
}
Loading

0 comments on commit c187664

Please sign in to comment.