Skip to content

Commit

Permalink
Periodically check if new InfluxDB DBRPs have been created influxdata…
Browse files Browse the repository at this point in the history
…#553 (influxdata#604)

* Periodically check if new InfluxDB DBRPs have been created influxdata#553
  • Loading branch information
titilambert authored and Nathaniel Cook committed Jun 3, 2016
1 parent efa47b3 commit ee9fd2b
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ In order to know if subscription writes are being dropped you should monitor the
- [#570](https://github.com/influxdata/kapacitor/issues/570): Removes panic in SMTP service on failed close connection.
- [#587](https://github.com/influxdata/kapacitor/issues/587): Allow number literals without leading zeros.
- [#584](https://github.com/influxdata/kapacitor/issues/584): Do not block during startup to send usage stats.
- [#553](https://github.com/influxdata/kapacitor/issues/553): Periodically check if new InfluxDB DBRPs have been created.

## v0.13.1 [2016-05-13]

Expand Down
5 changes: 5 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ data_dir = "/var/lib/kapacitor"
# one of 'udp', 'http', or 'https'.
subscription-protocol = "http"

# Subscriptions resync time interval
# Useful if you want to subscribe to new created databases
# without restart Kapacitord
subscriptions-sync-interval = "1m0s"

# Host part of a bind addres for UDP listeners.
# For example if a UDP listener is using port 1234
# and `udp-bind = "hostname_or_ip"`,
Expand Down
34 changes: 20 additions & 14 deletions services/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

const (
// Maximum time to try and connect to InfluxDB during startup.
DefaultStartUpTimeout = time.Minute * 5
DefaultStartUpTimeout = time.Minute * 5
DefaultSubscriptionSyncInterval = time.Minute * 1

DefaultSubscriptionProtocol = "http"
)
Expand All @@ -34,15 +35,16 @@ type Config struct {
// Use SSL but skip chain & host verification
InsecureSkipVerify bool `toml:"insecure-skip-verify"`

Timeout toml.Duration `toml:"timeout"`
DisableSubscriptions bool `toml:"disable-subscriptions"`
SubscriptionProtocol string `toml:"subscription-protocol"`
Subscriptions map[string][]string `toml:"subscriptions"`
ExcludedSubscriptions map[string][]string `toml:"excluded-subscriptions"`
UDPBind string `toml:"udp-bind"`
UDPBuffer int `toml:"udp-buffer"`
UDPReadBuffer int `toml:"udp-read-buffer"`
StartUpTimeout toml.Duration `toml:"startup-timeout"`
Timeout toml.Duration `toml:"timeout"`
DisableSubscriptions bool `toml:"disable-subscriptions"`
SubscriptionProtocol string `toml:"subscription-protocol"`
Subscriptions map[string][]string `toml:"subscriptions"`
ExcludedSubscriptions map[string][]string `toml:"excluded-subscriptions"`
UDPBind string `toml:"udp-bind"`
UDPBuffer int `toml:"udp-buffer"`
UDPReadBuffer int `toml:"udp-read-buffer"`
StartUpTimeout toml.Duration `toml:"startup-timeout"`
SubscriptionSyncInterval toml.Duration `toml:"subscriptions-sync-interval"`
}

func NewConfig() Config {
Expand All @@ -57,22 +59,26 @@ func NewConfig() Config {
ExcludedSubscriptions: map[string][]string{
stats.DefaultDatabse: []string{stats.DefaultRetentionPolicy},
},
UDPBuffer: udp.DefaultBuffer,
StartUpTimeout: toml.Duration(DefaultStartUpTimeout),
SubscriptionProtocol: DefaultSubscriptionProtocol,
UDPBuffer: udp.DefaultBuffer,
StartUpTimeout: toml.Duration(DefaultStartUpTimeout),
SubscriptionProtocol: DefaultSubscriptionProtocol,
SubscriptionSyncInterval: toml.Duration(DefaultSubscriptionSyncInterval),
}
}

func (c *Config) SetDefaultValues() {
if c.UDPBuffer == 0 {
c.UDPBuffer = udp.DefaultBuffer
}
if c.StartUpTimeout == toml.Duration(0) {
if c.StartUpTimeout == 0 {
c.StartUpTimeout = toml.Duration(DefaultStartUpTimeout)
}
if c.SubscriptionProtocol == "" {
c.SubscriptionProtocol = DefaultSubscriptionProtocol
}
if c.SubscriptionSyncInterval == toml.Duration(0) {
c.SubscriptionSyncInterval = toml.Duration(DefaultSubscriptionSyncInterval)
}
}

func (c Config) Validate() error {
Expand Down
88 changes: 54 additions & 34 deletions services/influxdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,24 @@ func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string
exSubs[se] = true
}
}
runningSubs := make(map[subEntry]bool, len(c.Subscriptions))
clusters[c.Name] = &influxdb{
configs: urls,
configSubs: subs,
exConfigSubs: exSubs,
hostname: hostname,
httpPort: httpPort,
logger: l,
udpBind: c.UDPBind,
udpBuffer: c.UDPBuffer,
udpReadBuffer: c.UDPReadBuffer,
startupTimeout: time.Duration(c.StartUpTimeout),
clusterID: clusterID,
subName: subName,
disableSubs: c.DisableSubscriptions,
protocol: c.SubscriptionProtocol,
configs: urls,
configSubs: subs,
exConfigSubs: exSubs,
hostname: hostname,
httpPort: httpPort,
logger: l,
udpBind: c.UDPBind,
udpBuffer: c.UDPBuffer,
udpReadBuffer: c.UDPReadBuffer,
startupTimeout: time.Duration(c.StartUpTimeout),
subscriptionSyncInterval: time.Duration(c.SubscriptionSyncInterval),
clusterID: clusterID,
subName: subName,
disableSubs: c.DisableSubscriptions,
protocol: c.SubscriptionProtocol,
runningSubs: runningSubs,
}
if defaultInfluxDB == i {
defaultInfluxDBName = c.Name
Expand Down Expand Up @@ -144,22 +147,25 @@ func (s *Service) NewNamedClient(name string) (client.Client, error) {
}

type influxdb struct {
configs []client.HTTPConfig
i int
configSubs map[subEntry]bool
exConfigSubs map[subEntry]bool
hostname string
httpPort int
logger *log.Logger
protocol string
udpBind string
udpBuffer int
udpReadBuffer int
startupTimeout time.Duration
disableSubs bool

clusterID string
subName string
configs []client.HTTPConfig
i int
configSubs map[subEntry]bool
exConfigSubs map[subEntry]bool
hostname string
httpPort int
logger *log.Logger
protocol string
udpBind string
udpBuffer int
udpReadBuffer int
startupTimeout time.Duration
subscriptionSyncInterval time.Duration
disableSubs bool
runningSubs map[subEntry]bool

clusterID string
subName string
subSyncTicker *time.Ticker

PointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
Expand Down Expand Up @@ -187,13 +193,25 @@ type subInfo struct {

func (s *influxdb) Open() error {
if !s.disableSubs {
return s.linkSubscriptions()
err := s.linkSubscriptions()
if s.subscriptionSyncInterval != 0 {
s.subSyncTicker = time.NewTicker(s.subscriptionSyncInterval)
go func() {
for _ = range s.subSyncTicker.C {
s.linkSubscriptions()
}
}()
}
return err
}
return nil
}

func (s *influxdb) Close() error {
var lastErr error
if s.subscriptionSyncInterval != 0 {
s.subSyncTicker.Stop()
}
for _, service := range s.services {
err := service.Close()
if err != nil {
Expand Down Expand Up @@ -229,7 +247,7 @@ func (s *influxdb) NewClient() (c client.Client, err error) {
}

func (s *influxdb) linkSubscriptions() error {
s.logger.Println("I! linking subscriptions")
s.logger.Println("D! linking subscriptions")
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = s.startupTimeout
ticker := backoff.NewTicker(b)
Expand Down Expand Up @@ -373,7 +391,7 @@ func (s *influxdb) linkSubscriptions() error {
startedSubs := make(map[subEntry]bool)
all := len(s.configSubs) == 0
for se, si := range existingSubs {
if (s.configSubs[se] || all) && !s.exConfigSubs[se] {
if (s.configSubs[se] || all) && !s.exConfigSubs[se] && !s.runningSubs[se] {
// Check if this kapacitor instance is in the list of hosts
for _, dest := range si.Destinations {
u, err := url.Parse(dest)
Expand All @@ -391,6 +409,7 @@ func (s *influxdb) linkSubscriptions() error {
}
}
startedSubs[se] = true
s.runningSubs[se] = true
break
}
}
Expand All @@ -399,7 +418,7 @@ func (s *influxdb) linkSubscriptions() error {
// create and start any new subscriptions
for _, se := range allSubs {
// If we have been configured to subscribe and the subscription is not started yet.
if (s.configSubs[se] || all) && !startedSubs[se] && !s.exConfigSubs[se] {
if (s.configSubs[se] || all) && !startedSubs[se] && !s.exConfigSubs[se] && !s.runningSubs[se] {
var destination string
switch s.protocol {
case "http", "https":
Expand All @@ -418,6 +437,7 @@ func (s *influxdb) linkSubscriptions() error {
if err != nil {
return err
}
s.runningSubs[se] = true
}
}

Expand Down

0 comments on commit ee9fd2b

Please sign in to comment.