From cc60647827500a35d2287f5e1eab5e32b92ff01c Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 18 Oct 2016 11:52:12 +0100 Subject: [PATCH 1/4] collectd should retry creating database --- services/collectd/service.go | 56 ++++++++++++-------- services/collectd/service_test.go | 88 ++++++++++++++++++++++++++++--- 2 files changed, 113 insertions(+), 31 deletions(-) diff --git a/services/collectd/service.go b/services/collectd/service.go index 5e66e72a000..aa11aae7d08 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -54,8 +54,9 @@ type Service struct { typesdb gollectd.Types addr net.Addr - mu sync.Mutex - done chan struct{} + mu sync.RWMutex + ready bool // Has the required database been created? + done chan struct{} // Is the service closing or closed? // expvar-based stats. stats *Statistics @@ -96,11 +97,6 @@ func (s *Service) Open() error { return fmt.Errorf("PointsWriter is nil") } - if _, err := s.MetaClient.CreateDatabase(s.Config.Database); err != nil { - s.Logger.Printf("Failed to ensure target database %s exists: %s", s.Config.Database, err.Error()) - return err - } - if s.typesdb == nil { // Open collectd types. if stat, err := os.Stat(s.Config.TypesDB); err != nil { @@ -178,12 +174,11 @@ func (s *Service) Open() error { s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration)) s.batcher.Start() - // Create waitgroup for signalling goroutines to stop. + // Create waitgroup for signalling goroutines to stop and start goroutines + // that process collectd packets. s.wg.Add(2) - - // Start goroutines that process collectd packets. - go s.serve() - go s.writePoints() + go func() { defer s.wg.Done(); s.serve() }() + go func() { defer s.wg.Done(); s.writePoints() }() return nil } @@ -215,13 +210,6 @@ func (s *Service) Close() error { return nil } -// Closed returns true if the service is currently closed. -func (s *Service) Closed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.closed() -} - func (s *Service) closed() bool { select { case <-s.done: @@ -232,6 +220,26 @@ func (s *Service) closed() bool { return s.done == nil } +// createInternalStorage ensures that the required database has been created. +func (s *Service) createInternalStorage() error { + s.mu.RLock() + ready := s.ready + s.mu.RUnlock() + if ready { + return nil + } + + if _, err := s.MetaClient.CreateDatabase(s.Config.Database); err != nil { + return err + } + + // The service is now ready. + s.mu.Lock() + s.ready = true + s.mu.Unlock() + return nil +} + // SetLogOutput sets the writer to which all logs are written. It must not be // called after Open is called. func (s *Service) SetLogOutput(w io.Writer) { @@ -280,8 +288,6 @@ func (s *Service) Addr() net.Addr { } func (s *Service) serve() { - defer s.wg.Done() - // From https://collectd.org/wiki/index.php/Binary_protocol // 1024 bytes (payload only, not including UDP / IP headers) // In versions 4.0 through 4.7, the receive buffer has a fixed size @@ -331,13 +337,17 @@ func (s *Service) handleMessage(buffer []byte) { } func (s *Service) writePoints() { - defer s.wg.Done() - for { select { case <-s.done: return case batch := <-s.batcher.Out(): + // Will attempt to create database if not yet created. + if err := s.createInternalStorage(); err != nil { + s.Logger.Printf("Required database %s not yet created: %s", s.Config.Database, err.Error()) + continue + } + if err := s.PointsWriter.WritePoints(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) diff --git a/services/collectd/service_test.go b/services/collectd/service_test.go index 76ae6e0327d..e3e35145b6e 100644 --- a/services/collectd/service_test.go +++ b/services/collectd/service_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "net" + "strings" "testing" "time" @@ -52,26 +53,73 @@ func TestService_OpenClose(t *testing.T) { } } -// Test that the service checks / creates the target database on startup. +// Test that the service checks / creates the target database every time we +// try to write points. func TestService_CreatesDatabase(t *testing.T) { t.Parallel() s := NewTestService(1, time.Second) - var created bool + s.WritePointsFn = func(string, string, models.ConsistencyLevel, []models.Point) error { + return nil + } + + called := make(chan struct{}) s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) { if name != s.Config.Database { t.Errorf("\n\texp = %s\n\tgot = %s\n", s.Config.Database, name) } - created = true + called <- struct{}{} + return nil, errors.New("an error") + } + + if err := s.Service.Open(); err != nil { + t.Fatal(err) + } + + points, err := models.ParsePointsString(`cpu value=1`) + if err != nil { + t.Fatal(err) + } + + s.Service.batcher.In() <- points[0] // Send a point. + select { + case <-called: + // OK + case <-time.NewTimer(time.Second).C: + t.Fatalf("Service should have attempted to created database") + } + + // ready status should not have been switched due to meta client error. + s.Service.mu.RLock() + ready := s.Service.ready + s.Service.mu.RUnlock() + + if got, exp := ready, false; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + // This time MC won't cause an error. + s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) { + called <- struct{}{} return nil, nil } - s.Service.Open() - s.Service.Close() + s.Service.batcher.In() <- points[0] // Send a point. + select { + case <-called: + // OK + case <-time.NewTimer(time.Second).C: + t.Fatalf("Service should have attempted to created database") + } + + // ready status should not have been switched due to meta client error. + s.Service.mu.RLock() + ready = s.Service.ready + s.Service.mu.RUnlock() - if !created { - t.Errorf("CreateDatabaseIfNotExists should have been called when the service opened.") + if got, exp := ready, true; got != exp { + t.Fatalf("got %v, expected %v", got, exp) } } @@ -280,7 +328,31 @@ func check(err error) { // Raw data sent by collectd, captured using Wireshark. var testData = func() []byte { - b, err := hex.DecodeString("000000167066312d36322d3231302d39342d313733000001000c00000000544928ff0007000c00000000000000050002000c656e74726f7079000004000c656e74726f7079000006000f0001010000000000007240000200086370750000030006310000040008637075000005000969646c65000006000f0001000000000000a674620005000977616974000006000f0001000000000000000000000200076466000003000500000400076466000005000d6c6976652d636f7700000600180002010100000000a090b641000000a0cb6a2742000200086370750000030006310000040008637075000005000e696e74657272757074000006000f00010000000000000000fe0005000c736f6674697271000006000f000100000000000000000000020007646600000300050000040007646600000500096c6976650000060018000201010000000000000000000000e0ec972742000200086370750000030006310000040008637075000005000a737465616c000006000f00010000000000000000000003000632000005000975736572000006000f0001000000000000005f36000500096e696365000006000f0001000000000000000ad80002000e696e746572666163650000030005000004000e69665f6f6374657473000005000b64756d6d79300000060018000200000000000000000000000000000000041a000200076466000004000764660000050008746d70000006001800020101000000000000f240000000a0ea972742000200086370750000030006320000040008637075000005000b73797374656d000006000f00010000000000000045d30002000e696e746572666163650000030005000004000f69665f7061636b657473000005000b64756d6d79300000060018000200000000000000000000000000000000000f000200086370750000030006320000040008637075000005000969646c65000006000f0001000000000000a66480000200076466000003000500000400076466000005000d72756e2d6c6f636b000006001800020101000000000000000000000000000054410002000e696e74657266616365000004000e69665f6572726f7273000005000b64756d6d793000000600180002000000000000000000000000000000000000000200086370750000030006320000040008637075000005000977616974000006000f00010000000000000000000005000e696e74657272757074000006000f0001000000000000000132") + data := []string{ + "000000167066312d36322d3231302d39342d313733000001000c00000000544928ff0007000c0000000", + "0000000050002000c656e74726f7079000004000c656e74726f7079000006000f000101000000000000", + "7240000200086370750000030006310000040008637075000005000969646c65000006000f000100000", + "0000000a674620005000977616974000006000f00010000000000000000000002000764660000030005", + "00000400076466000005000d6c6976652d636f7700000600180002010100000000a090b641000000a0c", + "b6a2742000200086370750000030006310000040008637075000005000e696e74657272757074000006", + "000f00010000000000000000fe0005000c736f6674697271000006000f0001000000000000000000000", + "20007646600000300050000040007646600000500096c69766500000600180002010100000000000000", + "00000000e0ec972742000200086370750000030006310000040008637075000005000a737465616c000", + "006000f00010000000000000000000003000632000005000975736572000006000f0001000000000000", + "005f36000500096e696365000006000f0001000000000000000ad80002000e696e74657266616365000", + "0030005000004000e69665f6f6374657473000005000b64756d6d793000000600180002000000000000", + "00000000000000000000041a000200076466000004000764660000050008746d7000000600180002010", + "1000000000000f240000000a0ea97274200020008637075000003000632000004000863707500000500", + "0b73797374656d000006000f00010000000000000045d30002000e696e7465726661636500000300050", + "00004000f69665f7061636b657473000005000b64756d6d793000000600180002000000000000000000", + "00000000000000000f000200086370750000030006320000040008637075000005000969646c6500000", + "6000f0001000000000000a66480000200076466000003000500000400076466000005000d72756e2d6c", + "6f636b000006001800020101000000000000000000000000000054410002000e696e746572666163650", + "00004000e69665f6572726f7273000005000b64756d6d79300000060018000200000000000000000000", + "00000000000000000002000863707500000300063200000400086370750000050009776169740000060", + "00f00010000000000000000000005000e696e74657272757074000006000f0001000000000000000132", + } + b, err := hex.DecodeString(strings.Join(data, "")) check(err) return b }() From 0470f384781d6fab8c63b266ad7839149e291f5d Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 18 Oct 2016 13:02:36 +0100 Subject: [PATCH 2/4] opentsdb should retry creating its database --- services/collectd/service_test.go | 6 +- services/opentsdb/service.go | 42 +++++++++---- services/opentsdb/service_test.go | 97 +++++++++++++++++++++++++++---- 3 files changed, 121 insertions(+), 24 deletions(-) diff --git a/services/collectd/service_test.go b/services/collectd/service_test.go index e3e35145b6e..5dd588b945a 100644 --- a/services/collectd/service_test.go +++ b/services/collectd/service_test.go @@ -76,6 +76,7 @@ func TestService_CreatesDatabase(t *testing.T) { if err := s.Service.Open(); err != nil { t.Fatal(err) } + defer s.Service.Close() points, err := models.ParsePointsString(`cpu value=1`) if err != nil { @@ -83,11 +84,12 @@ func TestService_CreatesDatabase(t *testing.T) { } s.Service.batcher.In() <- points[0] // Send a point. + s.Service.batcher.Flush() select { case <-called: // OK case <-time.NewTimer(time.Second).C: - t.Fatalf("Service should have attempted to created database") + t.Fatal("Service should have attempted to create database") } // ready status should not have been switched due to meta client error. @@ -110,7 +112,7 @@ func TestService_CreatesDatabase(t *testing.T) { case <-called: // OK case <-time.NewTimer(time.Second).C: - t.Fatalf("Service should have attempted to created database") + t.Fatal("Service should have attempted to create database") } // ready status should not have been switched due to meta client error. diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 420130d75f6..bed22b886ec 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -46,12 +46,14 @@ type Service struct { ln net.Listener // main listener httpln *chanListener // http channel-based listener - mu sync.Mutex wg sync.WaitGroup - done chan struct{} tls bool cert string + mu sync.RWMutex + ready bool // Has the required database been created? + done chan struct{} // Is the service closing or closed? + BindAddress string Database string RetentionPolicy string @@ -110,11 +112,6 @@ func (s *Service) Open() error { s.Logger.Println("Starting OpenTSDB service") - if _, err := s.MetaClient.CreateDatabase(s.Database); err != nil { - s.Logger.Printf("Failed to ensure target database %s exists: %s", s.Database, err.Error()) - return err - } - s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout) s.batcher.Start() @@ -202,6 +199,26 @@ func (s *Service) closed() bool { } } +// createInternalStorage ensures that the required database has been created. +func (s *Service) createInternalStorage() error { + s.mu.RLock() + ready := s.ready + s.mu.RUnlock() + if ready { + return nil + } + + if _, err := s.MetaClient.CreateDatabase(s.Database); err != nil { + return err + } + + // The service is now ready. + s.mu.Lock() + s.ready = true + s.mu.Unlock() + return nil +} + // SetLogOutput sets the writer to which all logs are written. It must not be // called after Open is called. func (s *Service) SetLogOutput(w io.Writer) { @@ -438,7 +455,15 @@ func (s *Service) serveHTTP() { func (s *Service) processBatches(batcher *tsdb.PointBatcher) { for { select { + case <-s.done: + return case batch := <-batcher.Out(): + // Will attempt to create database if not yet created. + if err := s.createInternalStorage(); err != nil { + s.Logger.Printf("Required database %s does not yet exist: %s", s.Database, err.Error()) + continue + } + if err := s.PointsWriter.WritePoints(s.Database, s.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) @@ -446,9 +471,6 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { s.Logger.Printf("failed to write point batch to database %q: %s", s.Database, err) atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) } - - case <-s.done: - return } } } diff --git a/services/opentsdb/service_test.go b/services/opentsdb/service_test.go index 8e1642e51c1..e7138a2882a 100644 --- a/services/opentsdb/service_test.go +++ b/services/opentsdb/service_test.go @@ -1,6 +1,7 @@ -package opentsdb_test +package opentsdb import ( + "errors" "fmt" "io/ioutil" "log" @@ -16,11 +17,10 @@ import ( "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" - "github.com/influxdata/influxdb/services/opentsdb" ) func Test_Service_OpenClose(t *testing.T) { - service := NewService("db0", "127.0.0.1:45362") + service := NewTestService("db0", "127.0.0.1:45362") // Closing a closed service is fine. if err := service.Service.Close(); err != nil { @@ -56,11 +56,84 @@ func Test_Service_OpenClose(t *testing.T) { } } +// Ensure a point can be written via the telnet protocol. +func TestService_CreatesDatabase(t *testing.T) { + t.Parallel() + + database := "db0" + s := NewTestService(database, "127.0.0.1:0") + s.WritePointsFn = func(string, string, models.ConsistencyLevel, []models.Point) error { + return nil + } + + called := make(chan struct{}) + s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) { + if name != database { + t.Errorf("\n\texp = %s\n\tgot = %s\n", database, name) + } + called <- struct{}{} + return nil, errors.New("an error") + } + + if err := s.Service.Open(); err != nil { + t.Fatal(err) + } + // defer s.Service.Close() + + points, err := models.ParsePointsString(`cpu value=1`) + if err != nil { + t.Fatal(err) + } + + s.Service.batcher.In() <- points[0] // Send a point. + s.Service.batcher.Flush() + select { + case <-called: + // OK + case <-time.NewTimer(time.Millisecond).C: + t.Fatal("Service should have attempted to create database") + } + + // ready status should not have been switched due to meta client error. + s.Service.mu.RLock() + ready := s.Service.ready + s.Service.mu.RUnlock() + + if got, exp := ready, false; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + // This time MC won't cause an error. + s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) { + called <- struct{}{} + return nil, nil + } + + s.Service.batcher.In() <- points[0] // Send a point. + s.Service.batcher.Flush() + select { + case <-called: + // OK + case <-time.NewTimer(time.Second).C: + t.Fatal("Service should have attempted to create database") + } + + // ready status should not have been switched due to meta client error. + s.Service.mu.RLock() + ready = s.Service.ready + s.Service.mu.RUnlock() + + if got, exp := ready, true; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + +} + // Ensure a point can be written via the telnet protocol. func TestService_Telnet(t *testing.T) { t.Parallel() - s := NewService("db0", "127.0.0.1:0") + s := NewTestService("db0", "127.0.0.1:0") if err := s.Service.Open(); err != nil { t.Fatal(err) } @@ -123,7 +196,7 @@ func TestService_Telnet(t *testing.T) { func TestService_HTTP(t *testing.T) { t.Parallel() - s := NewService("db0", "127.0.0.1:0") + s := NewTestService("db0", "127.0.0.1:0") if err := s.Service.Open(); err != nil { t.Fatal(err) } @@ -169,15 +242,15 @@ func TestService_HTTP(t *testing.T) { } } -type Service struct { - Service *opentsdb.Service +type TestService struct { + Service *Service MetaClient *internal.MetaClientMock WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } -// NewService returns a new instance of Service. -func NewService(database string, bind string) *Service { - s, err := opentsdb.NewService(opentsdb.Config{ +// NewTestService returns a new instance of Service. +func NewTestService(database string, bind string) *TestService { + s, err := NewService(Config{ BindAddress: bind, Database: database, ConsistencyLevel: "one", @@ -187,7 +260,7 @@ func NewService(database string, bind string) *Service { panic(err) } - service := &Service{ + service := &TestService{ Service: s, MetaClient: &internal.MetaClientMock{}, } @@ -208,6 +281,6 @@ func NewService(database string, bind string) *Service { return service } -func (s *Service) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { +func (s *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points) } From 73c267abc16f31ad64061b3400c95ab11fcb7f7f Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 18 Oct 2016 13:45:58 +0100 Subject: [PATCH 3/4] udp service should retry database creation --- services/collectd/service_test.go | 16 ++-- services/opentsdb/service_test.go | 14 ++-- services/udp/service.go | 35 ++++++-- services/udp/service_test.go | 127 ++++++++++++++++++++++++------ 4 files changed, 153 insertions(+), 39 deletions(-) diff --git a/services/collectd/service_test.go b/services/collectd/service_test.go index 5dd588b945a..b1cc7e9cd6a 100644 --- a/services/collectd/service_test.go +++ b/services/collectd/service_test.go @@ -69,14 +69,15 @@ func TestService_CreatesDatabase(t *testing.T) { if name != s.Config.Database { t.Errorf("\n\texp = %s\n\tgot = %s\n", s.Config.Database, name) } - called <- struct{}{} + // Allow some time for the caller to return and the ready status to + // be set. + time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} }) return nil, errors.New("an error") } if err := s.Service.Open(); err != nil { t.Fatal(err) } - defer s.Service.Close() points, err := models.ParsePointsString(`cpu value=1`) if err != nil { @@ -88,7 +89,7 @@ func TestService_CreatesDatabase(t *testing.T) { select { case <-called: // OK - case <-time.NewTimer(time.Second).C: + case <-time.NewTimer(5 * time.Second).C: t.Fatal("Service should have attempted to create database") } @@ -103,15 +104,18 @@ func TestService_CreatesDatabase(t *testing.T) { // This time MC won't cause an error. s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) { - called <- struct{}{} + // Allow some time for the caller to return and the ready status to + // be set. + time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} }) return nil, nil } s.Service.batcher.In() <- points[0] // Send a point. + s.Service.batcher.Flush() select { case <-called: // OK - case <-time.NewTimer(time.Second).C: + case <-time.NewTimer(5 * time.Second).C: t.Fatal("Service should have attempted to create database") } @@ -123,6 +127,8 @@ func TestService_CreatesDatabase(t *testing.T) { if got, exp := ready, true; got != exp { t.Fatalf("got %v, expected %v", got, exp) } + + s.Service.Close() } // Test that the collectd service correctly batches points by BatchSize. diff --git a/services/opentsdb/service_test.go b/services/opentsdb/service_test.go index e7138a2882a..745ca409216 100644 --- a/services/opentsdb/service_test.go +++ b/services/opentsdb/service_test.go @@ -71,14 +71,15 @@ func TestService_CreatesDatabase(t *testing.T) { if name != database { t.Errorf("\n\texp = %s\n\tgot = %s\n", database, name) } - called <- struct{}{} + // Allow some time for the caller to return and the ready status to + // be set. + time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} }) return nil, errors.New("an error") } if err := s.Service.Open(); err != nil { t.Fatal(err) } - // defer s.Service.Close() points, err := models.ParsePointsString(`cpu value=1`) if err != nil { @@ -90,7 +91,7 @@ func TestService_CreatesDatabase(t *testing.T) { select { case <-called: // OK - case <-time.NewTimer(time.Millisecond).C: + case <-time.NewTimer(5 * time.Second).C: t.Fatal("Service should have attempted to create database") } @@ -105,7 +106,9 @@ func TestService_CreatesDatabase(t *testing.T) { // This time MC won't cause an error. s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) { - called <- struct{}{} + // Allow some time for the caller to return and the ready status to + // be set. + time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} }) return nil, nil } @@ -114,7 +117,7 @@ func TestService_CreatesDatabase(t *testing.T) { select { case <-called: // OK - case <-time.NewTimer(time.Second).C: + case <-time.NewTimer(5 * time.Second).C: t.Fatal("Service should have attempted to create database") } @@ -127,6 +130,7 @@ func TestService_CreatesDatabase(t *testing.T) { t.Fatalf("got %v, expected %v", got, exp) } + s.Service.Close() } // Ensure a point can be written via the telnet protocol. diff --git a/services/udp/service.go b/services/udp/service.go index f9403d8e0ab..f5b964e2bbc 100644 --- a/services/udp/service.go +++ b/services/udp/service.go @@ -43,8 +43,9 @@ type Service struct { addr *net.UDPAddr wg sync.WaitGroup - mu sync.Mutex - done chan struct{} + mu sync.RWMutex + ready bool // Has the required database been created? + done chan struct{} // Is the service closing or closed? parserChan chan []byte batcher *tsdb.PointBatcher @@ -93,10 +94,6 @@ func (s *Service) Open() (err error) { return errors.New("database has to be specified in config") } - if _, err := s.MetaClient.CreateDatabase(s.config.Database); err != nil { - return errors.New("Failed to ensure target database exists") - } - s.addr, err = net.ResolveUDPAddr("udp", s.config.BindAddress) if err != nil { s.Logger.Printf("Failed to resolve UDP address %s: %s", s.config.BindAddress, err) @@ -162,6 +159,12 @@ func (s *Service) writer() { for { select { case batch := <-s.batcher.Out(): + // Will attempt to create database if not yet created. + if err := s.createInternalStorage(); err != nil { + s.Logger.Printf("Required database %s does not yet exist: %s", s.config.Database, err.Error()) + continue + } + if err := s.PointsWriter.WritePoints(s.config.Database, s.config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) @@ -270,6 +273,26 @@ func (s *Service) closed() bool { return s.done == nil } +// createInternalStorage ensures that the required database has been created. +func (s *Service) createInternalStorage() error { + s.mu.RLock() + ready := s.ready + s.mu.RUnlock() + if ready { + return nil + } + + if _, err := s.MetaClient.CreateDatabase(s.config.Database); err != nil { + return err + } + + // The service is now ready. + s.mu.Lock() + s.ready = true + s.mu.Unlock() + return nil +} + // SetLogOutput sets the writer to which all logs are written. It must not be // called after Open is called. func (s *Service) SetLogOutput(w io.Writer) { diff --git a/services/udp/service_test.go b/services/udp/service_test.go index 27c857e260c..9a0935a384c 100644 --- a/services/udp/service_test.go +++ b/services/udp/service_test.go @@ -1,75 +1,156 @@ -package udp_test +package udp import ( + "errors" "io/ioutil" "testing" + "time" "github.com/influxdata/influxdb/internal" + "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" - "github.com/influxdata/influxdb/services/udp" ) func TestService_OpenClose(t *testing.T) { - service := NewService(nil) + service := NewTestService(nil) // Closing a closed service is fine. - if err := service.UDPService.Close(); err != nil { + if err := service.Service.Close(); err != nil { t.Fatal(err) } // Closing a closed service again is fine. - if err := service.UDPService.Close(); err != nil { + if err := service.Service.Close(); err != nil { t.Fatal(err) } - if err := service.UDPService.Open(); err != nil { + if err := service.Service.Open(); err != nil { t.Fatal(err) } // Opening an already open service is fine. - if err := service.UDPService.Open(); err != nil { + if err := service.Service.Open(); err != nil { t.Fatal(err) } // Reopening a previously opened service is fine. - if err := service.UDPService.Close(); err != nil { + if err := service.Service.Close(); err != nil { t.Fatal(err) } - if err := service.UDPService.Open(); err != nil { + if err := service.Service.Open(); err != nil { t.Fatal(err) } // Tidy up. - if err := service.UDPService.Close(); err != nil { + if err := service.Service.Close(); err != nil { t.Fatal(err) } } -type Service struct { - UDPService *udp.Service - MetaClient *internal.MetaClientMock +func TestService_CreatesDatabase(t *testing.T) { + t.Parallel() + + s := NewTestService(nil) + s.WritePointsFn = func(string, string, models.ConsistencyLevel, []models.Point) error { + return nil + } + + called := make(chan struct{}) + s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) { + if name != s.Config.Database { + t.Errorf("\n\texp = %s\n\tgot = %s\n", s.Config.Database, name) + } + // Allow some time for the caller to return and the ready status to + // be set. + time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} }) + return nil, errors.New("an error") + } + + if err := s.Service.Open(); err != nil { + t.Fatal(err) + } + + points, err := models.ParsePointsString(`cpu value=1`) + if err != nil { + t.Fatal(err) + } + + s.Service.batcher.In() <- points[0] // Send a point. + s.Service.batcher.Flush() + select { + case <-called: + // OK + case <-time.NewTimer(5 * time.Second).C: + t.Fatal("Service should have attempted to create database") + } + + // ready status should not have been switched due to meta client error. + s.Service.mu.RLock() + ready := s.Service.ready + s.Service.mu.RUnlock() + + if got, exp := ready, false; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + // This time MC won't cause an error. + s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) { + // Allow some time for the caller to return and the ready status to + // be set. + time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} }) + return nil, nil + } + + s.Service.batcher.In() <- points[0] // Send a point. + s.Service.batcher.Flush() + select { + case <-called: + // OK + case <-time.NewTimer(5 * time.Second).C: + t.Fatal("Service should have attempted to create database") + } + + // ready status should now be true. + s.Service.mu.RLock() + ready = s.Service.ready + s.Service.mu.RUnlock() + + if got, exp := ready, true; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + s.Service.Close() +} + +type TestService struct { + Service *Service + Config Config + MetaClient *internal.MetaClientMock + WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } -func NewService(c *udp.Config) *Service { +func NewTestService(c *Config) *TestService { if c == nil { - defaultC := udp.NewConfig() + defaultC := NewConfig() c = &defaultC } - service := &Service{ - UDPService: udp.NewService(*c), + service := &TestService{ + Service: NewService(*c), + Config: *c, MetaClient: &internal.MetaClientMock{}, } - service.MetaClient.CreateDatabaseFn = func(string) (*meta.DatabaseInfo, error) { return nil, nil } - - // Set the Meta Client - service.UDPService.MetaClient = service.MetaClient - if !testing.Verbose() { - service.UDPService.SetLogOutput(ioutil.Discard) + service.Service.SetLogOutput(ioutil.Discard) } + service.Service.MetaClient = service.MetaClient + service.Service.PointsWriter = service return service } + +func (s *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { + return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points) +} From 168c91cc6751b2307b3d7835a2cc13d7c51cfc34 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 18 Oct 2016 14:19:48 +0100 Subject: [PATCH 4/4] graphite service should retry database creation --- services/graphite/parser_test.go | 8 ++ services/graphite/service.go | 60 ++++++---- services/graphite/service_test.go | 179 ++++++++++++++++++------------ 3 files changed, 158 insertions(+), 89 deletions(-) diff --git a/services/graphite/parser_test.go b/services/graphite/parser_test.go index 85f2713dba0..8e5e52a7025 100644 --- a/services/graphite/parser_test.go +++ b/services/graphite/parser_test.go @@ -707,3 +707,11 @@ func TestApplyTemplateFieldError(t *testing.T) { "'field' can only be used once in each template: current.users.logged_in") } } + +// Test Helpers +func errstr(err error) string { + if err != nil { + return err.Error() + } + return "" +} diff --git a/services/graphite/service.go b/services/graphite/service.go index 9ac0468b8d4..cb7d43be57b 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -45,8 +45,6 @@ func (c *tcpConnection) Close() { // Service represents a Graphite service. type Service struct { - mu sync.Mutex - bindAddress string database string retentionPolicy string @@ -71,8 +69,11 @@ type Service struct { addr net.Addr udpConn *net.UDPConn - wg sync.WaitGroup - done chan struct{} + wg sync.WaitGroup + + mu sync.RWMutex + ready bool // Has the required database been created? + done chan struct{} // Is the service closing or closed? Monitor interface { RegisterDiagnosticsClient(name string, client diagnostics.Client) @@ -141,21 +142,6 @@ func (s *Service) Open() error { s.Monitor.RegisterDiagnosticsClient(s.diagsKey, s) } - if db := s.MetaClient.Database(s.database); db != nil { - if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil { - spec := meta.RetentionPolicySpec{Name: s.retentionPolicy} - if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec); err != nil { - s.logger.Printf("Failed to ensure target retention policy %s exists: %s", s.database, err.Error()) - } - } - } else { - spec := meta.RetentionPolicySpec{Name: s.retentionPolicy} - if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, &spec); err != nil { - s.logger.Printf("Failed to ensure target database %s exists: %s", s.database, err.Error()) - return err - } - } - s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout) s.batcher.Start() @@ -236,6 +222,36 @@ func (s *Service) closed() bool { return s.done == nil } +// createInternalStorage ensures that the required database has been created. +func (s *Service) createInternalStorage() error { + s.mu.RLock() + ready := s.ready + s.mu.RUnlock() + if ready { + return nil + } + + if db := s.MetaClient.Database(s.database); db != nil { + if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil { + spec := meta.RetentionPolicySpec{Name: s.retentionPolicy} + if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec); err != nil { + return err + } + } + } else { + spec := meta.RetentionPolicySpec{Name: s.retentionPolicy} + if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, &spec); err != nil { + return err + } + } + + // The service is now ready. + s.mu.Lock() + s.ready = true + s.mu.Unlock() + return nil +} + // SetLogOutput sets the writer to which all logs are written. It must not be // called after Open is called. func (s *Service) SetLogOutput(w io.Writer) { @@ -422,6 +438,12 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { for { select { case batch := <-batcher.Out(): + // Will attempt to create database if not yet created. + if err := s.createInternalStorage(); err != nil { + s.logger.Printf("Required database or retention policy do not yet exist: %s", err.Error()) + continue + } + if err := s.PointsWriter.WritePoints(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil { atomic.AddInt64(&s.stats.BatchesTransmitted, 1) atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) diff --git a/services/graphite/service_test.go b/services/graphite/service_test.go index 0b259daf1ba..468637f47ce 100644 --- a/services/graphite/service_test.go +++ b/services/graphite/service_test.go @@ -1,6 +1,7 @@ -package graphite_test +package graphite import ( + "errors" "fmt" "io/ioutil" "net" @@ -10,61 +11,135 @@ import ( "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/services/graphite" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/toml" ) func Test_Service_OpenClose(t *testing.T) { - c := graphite.Config{BindAddress: ":35422"} - service := NewService(&c) + c := Config{BindAddress: ":35422"} + service := NewTestService(&c) // Closing a closed service is fine. - if err := service.GraphiteService.Close(); err != nil { + if err := service.Service.Close(); err != nil { t.Fatal(err) } // Closing a closed service again is fine. - if err := service.GraphiteService.Close(); err != nil { + if err := service.Service.Close(); err != nil { t.Fatal(err) } - if err := service.GraphiteService.Open(); err != nil { + if err := service.Service.Open(); err != nil { t.Fatal(err) } // Opening an already open service is fine. - if err := service.GraphiteService.Open(); err != nil { + if err := service.Service.Open(); err != nil { t.Fatal(err) } // Reopening a previously opened service is fine. - if err := service.GraphiteService.Close(); err != nil { + if err := service.Service.Close(); err != nil { t.Fatal(err) } - if err := service.GraphiteService.Open(); err != nil { + if err := service.Service.Open(); err != nil { t.Fatal(err) } // Tidy up. - if err := service.GraphiteService.Close(); err != nil { + if err := service.Service.Close(); err != nil { t.Fatal(err) } } +func TestService_CreatesDatabase(t *testing.T) { + t.Parallel() + + s := NewTestService(nil) + s.WritePointsFn = func(string, string, models.ConsistencyLevel, []models.Point) error { + return nil + } + + called := make(chan struct{}) + s.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(name string, _ *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { + if name != s.Service.database { + t.Errorf("\n\texp = %s\n\tgot = %s\n", s.Service.database, name) + } + // Allow some time for the caller to return and the ready status to + // be set. + time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} }) + return nil, errors.New("an error") + } + + if err := s.Service.Open(); err != nil { + t.Fatal(err) + } + + points, err := models.ParsePointsString(`cpu value=1`) + if err != nil { + t.Fatal(err) + } + + s.Service.batcher.In() <- points[0] // Send a point. + s.Service.batcher.Flush() + select { + case <-called: + // OK + case <-time.NewTimer(5 * time.Second).C: + t.Fatal("Service should have attempted to create database") + } + + // ready status should not have been switched due to meta client error. + s.Service.mu.RLock() + ready := s.Service.ready + s.Service.mu.RUnlock() + + if got, exp := ready, false; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + // This time MC won't cause an error. + s.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(name string, _ *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { + // Allow some time for the caller to return and the ready status to + // be set. + time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} }) + return nil, nil + } + + s.Service.batcher.In() <- points[0] // Send a point. + s.Service.batcher.Flush() + select { + case <-called: + // OK + case <-time.NewTimer(5 * time.Second).C: + t.Fatal("Service should have attempted to create database") + } + + // ready status should now be true. + s.Service.mu.RLock() + ready = s.Service.ready + s.Service.mu.RUnlock() + + if got, exp := ready, true; got != exp { + t.Fatalf("got %v, expected %v", got, exp) + } + + s.Service.Close() +} + func Test_Service_TCP(t *testing.T) { t.Parallel() now := time.Now().UTC().Round(time.Second) - config := graphite.Config{} + config := Config{} config.Database = "graphitedb" config.BatchSize = 0 // No batching. config.BatchTimeout = toml.Duration(time.Second) config.BindAddress = ":0" - service := NewService(&config) + service := NewTestService(&config) // Allow test to wait until points are written. var wg sync.WaitGroup @@ -91,25 +166,12 @@ func Test_Service_TCP(t *testing.T) { return nil } - var created bool - service.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(db string, _ *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { - if db != config.Database { - t.Fatalf("got %s, expected %s", db, config.Database) - } - created = true - return nil, nil - } - - if err := service.GraphiteService.Open(); err != nil { + if err := service.Service.Open(); err != nil { t.Fatalf("failed to open Graphite service: %s", err.Error()) } - if !created { - t.Fatalf("failed to create target database") - } - // Connect to the graphite endpoint we just spun up - _, port, _ := net.SplitHostPort(service.GraphiteService.Addr().String()) + _, port, _ := net.SplitHostPort(service.Service.Addr().String()) conn, err := net.Dial("tcp", "127.0.0.1:"+port) if err != nil { t.Fatal(err) @@ -134,14 +196,14 @@ func Test_Service_UDP(t *testing.T) { now := time.Now().UTC().Round(time.Second) - config := graphite.Config{} + config := Config{} config.Database = "graphitedb" config.BatchSize = 0 // No batching. config.BatchTimeout = toml.Duration(time.Second) config.BindAddress = ":10000" config.Protocol = "udp" - service := NewService(&config) + service := NewTestService(&config) // Allow test to wait until points are written. var wg sync.WaitGroup @@ -165,25 +227,12 @@ func Test_Service_UDP(t *testing.T) { return nil } - var created bool - service.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(db string, _ *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { - if db != config.Database { - t.Fatalf("got %s, expected %s", db, config.Database) - } - created = true - return nil, nil - } - - if err := service.GraphiteService.Open(); err != nil { + if err := service.Service.Open(); err != nil { t.Fatalf("failed to open Graphite service: %s", err.Error()) } - if !created { - t.Fatalf("failed to create target database") - } - // Connect to the graphite endpoint we just spun up - _, port, _ := net.SplitHostPort(service.GraphiteService.Addr().String()) + _, port, _ := net.SplitHostPort(service.Service.Addr().String()) conn, err := net.Dial("udp", "127.0.0.1:"+port) if err != nil { t.Fatal(err) @@ -200,27 +249,26 @@ func Test_Service_UDP(t *testing.T) { conn.Close() } -type Service struct { - GraphiteService *graphite.Service - +type TestService struct { + Service *Service MetaClient *internal.MetaClientMock WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error } -func NewService(c *graphite.Config) *Service { +func NewTestService(c *Config) *TestService { if c == nil { - defaultC := graphite.NewConfig() + defaultC := NewConfig() c = &defaultC } - gservice, err := graphite.NewService(*c) + gservice, err := NewService(*c) if err != nil { panic(err) } - service := &Service{ - GraphiteService: gservice, - MetaClient: &internal.MetaClientMock{}, + service := &TestService{ + Service: gservice, + MetaClient: &internal.MetaClientMock{}, } service.MetaClient.CreateRetentionPolicyFn = func(string, *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) { @@ -239,26 +287,17 @@ func NewService(c *graphite.Config) *Service { return nil, nil } - // Set the Meta Client - service.GraphiteService.MetaClient = service.MetaClient - - // Set the PointsWriter - service.GraphiteService.PointsWriter = service - if !testing.Verbose() { - service.GraphiteService.SetLogOutput(ioutil.Discard) + service.Service.SetLogOutput(ioutil.Discard) } + + // Set the Meta Client and PointsWriter. + service.Service.MetaClient = service.MetaClient + service.Service.PointsWriter = service + return service } -func (s *Service) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { +func (s *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error { return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points) } - -// Test Helpers -func errstr(err error) string { - if err != nil { - return err.Error() - } - return "" -}