Skip to content

Commit

Permalink
Merge pull request influxdata#7477 from influxdata/er-service-create
Browse files Browse the repository at this point in the history
Input plugins should retry database creation
  • Loading branch information
e-dard authored Oct 18, 2016
2 parents cea7690 + 168c91c commit 4776e8f
Show file tree
Hide file tree
Showing 9 changed files with 533 additions and 171 deletions.
56 changes: 33 additions & 23 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ type Service struct {
typesdb gollectd.Types
addr net.Addr

mu sync.Mutex
done chan struct{}
mu sync.RWMutex
ready bool // Has the required database been created?
done chan struct{} // Is the service closing or closed?

// expvar-based stats.
stats *Statistics
Expand Down Expand Up @@ -96,11 +97,6 @@ func (s *Service) Open() error {
return fmt.Errorf("PointsWriter is nil")
}

if _, err := s.MetaClient.CreateDatabase(s.Config.Database); err != nil {
s.Logger.Printf("Failed to ensure target database %s exists: %s", s.Config.Database, err.Error())
return err
}

if s.typesdb == nil {
// Open collectd types.
if stat, err := os.Stat(s.Config.TypesDB); err != nil {
Expand Down Expand Up @@ -178,12 +174,11 @@ func (s *Service) Open() error {
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration))
s.batcher.Start()

// Create waitgroup for signalling goroutines to stop.
// Create waitgroup for signalling goroutines to stop and start goroutines
// that process collectd packets.
s.wg.Add(2)

// Start goroutines that process collectd packets.
go s.serve()
go s.writePoints()
go func() { defer s.wg.Done(); s.serve() }()
go func() { defer s.wg.Done(); s.writePoints() }()

return nil
}
Expand Down Expand Up @@ -215,13 +210,6 @@ func (s *Service) Close() error {
return nil
}

// Closed returns true if the service is currently closed.
func (s *Service) Closed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed()
}

func (s *Service) closed() bool {
select {
case <-s.done:
Expand All @@ -232,6 +220,26 @@ func (s *Service) closed() bool {
return s.done == nil
}

// createInternalStorage ensures that the required database has been created.
func (s *Service) createInternalStorage() error {
s.mu.RLock()
ready := s.ready
s.mu.RUnlock()
if ready {
return nil
}

if _, err := s.MetaClient.CreateDatabase(s.Config.Database); err != nil {
return err
}

// The service is now ready.
s.mu.Lock()
s.ready = true
s.mu.Unlock()
return nil
}

// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
Expand Down Expand Up @@ -280,8 +288,6 @@ func (s *Service) Addr() net.Addr {
}

func (s *Service) serve() {
defer s.wg.Done()

// From https://collectd.org/wiki/index.php/Binary_protocol
// 1024 bytes (payload only, not including UDP / IP headers)
// In versions 4.0 through 4.7, the receive buffer has a fixed size
Expand Down Expand Up @@ -331,13 +337,17 @@ func (s *Service) handleMessage(buffer []byte) {
}

func (s *Service) writePoints() {
defer s.wg.Done()

for {
select {
case <-s.done:
return
case batch := <-s.batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Printf("Required database %s not yet created: %s", s.Config.Database, err.Error())
continue
}

if err := s.PointsWriter.WritePoints(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
Expand Down
96 changes: 88 additions & 8 deletions services/collectd/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"log"
"net"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -52,27 +53,82 @@ func TestService_OpenClose(t *testing.T) {
}
}

// Test that the service checks / creates the target database on startup.
// Test that the service checks / creates the target database every time we
// try to write points.
func TestService_CreatesDatabase(t *testing.T) {
t.Parallel()

s := NewTestService(1, time.Second)

var created bool
s.WritePointsFn = func(string, string, models.ConsistencyLevel, []models.Point) error {
return nil
}

called := make(chan struct{})
s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) {
if name != s.Config.Database {
t.Errorf("\n\texp = %s\n\tgot = %s\n", s.Config.Database, name)
}
created = true
// Allow some time for the caller to return and the ready status to
// be set.
time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} })
return nil, errors.New("an error")
}

if err := s.Service.Open(); err != nil {
t.Fatal(err)
}

points, err := models.ParsePointsString(`cpu value=1`)
if err != nil {
t.Fatal(err)
}

s.Service.batcher.In() <- points[0] // Send a point.
s.Service.batcher.Flush()
select {
case <-called:
// OK
case <-time.NewTimer(5 * time.Second).C:
t.Fatal("Service should have attempted to create database")
}

// ready status should not have been switched due to meta client error.
s.Service.mu.RLock()
ready := s.Service.ready
s.Service.mu.RUnlock()

if got, exp := ready, false; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

// This time MC won't cause an error.
s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) {
// Allow some time for the caller to return and the ready status to
// be set.
time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} })
return nil, nil
}

s.Service.Open()
s.Service.Close()
s.Service.batcher.In() <- points[0] // Send a point.
s.Service.batcher.Flush()
select {
case <-called:
// OK
case <-time.NewTimer(5 * time.Second).C:
t.Fatal("Service should have attempted to create database")
}

// ready status should not have been switched due to meta client error.
s.Service.mu.RLock()
ready = s.Service.ready
s.Service.mu.RUnlock()

if !created {
t.Errorf("CreateDatabaseIfNotExists should have been called when the service opened.")
if got, exp := ready, true; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

s.Service.Close()
}

// Test that the collectd service correctly batches points by BatchSize.
Expand Down Expand Up @@ -280,7 +336,31 @@ func check(err error) {

// Raw data sent by collectd, captured using Wireshark.
var testData = func() []byte {
b, err := hex.DecodeString("000000167066312d36322d3231302d39342d313733000001000c00000000544928ff0007000c00000000000000050002000c656e74726f7079000004000c656e74726f7079000006000f0001010000000000007240000200086370750000030006310000040008637075000005000969646c65000006000f0001000000000000a674620005000977616974000006000f0001000000000000000000000200076466000003000500000400076466000005000d6c6976652d636f7700000600180002010100000000a090b641000000a0cb6a2742000200086370750000030006310000040008637075000005000e696e74657272757074000006000f00010000000000000000fe0005000c736f6674697271000006000f000100000000000000000000020007646600000300050000040007646600000500096c6976650000060018000201010000000000000000000000e0ec972742000200086370750000030006310000040008637075000005000a737465616c000006000f00010000000000000000000003000632000005000975736572000006000f0001000000000000005f36000500096e696365000006000f0001000000000000000ad80002000e696e746572666163650000030005000004000e69665f6f6374657473000005000b64756d6d79300000060018000200000000000000000000000000000000041a000200076466000004000764660000050008746d70000006001800020101000000000000f240000000a0ea972742000200086370750000030006320000040008637075000005000b73797374656d000006000f00010000000000000045d30002000e696e746572666163650000030005000004000f69665f7061636b657473000005000b64756d6d79300000060018000200000000000000000000000000000000000f000200086370750000030006320000040008637075000005000969646c65000006000f0001000000000000a66480000200076466000003000500000400076466000005000d72756e2d6c6f636b000006001800020101000000000000000000000000000054410002000e696e74657266616365000004000e69665f6572726f7273000005000b64756d6d793000000600180002000000000000000000000000000000000000000200086370750000030006320000040008637075000005000977616974000006000f00010000000000000000000005000e696e74657272757074000006000f0001000000000000000132")
data := []string{
"000000167066312d36322d3231302d39342d313733000001000c00000000544928ff0007000c0000000",
"0000000050002000c656e74726f7079000004000c656e74726f7079000006000f000101000000000000",
"7240000200086370750000030006310000040008637075000005000969646c65000006000f000100000",
"0000000a674620005000977616974000006000f00010000000000000000000002000764660000030005",
"00000400076466000005000d6c6976652d636f7700000600180002010100000000a090b641000000a0c",
"b6a2742000200086370750000030006310000040008637075000005000e696e74657272757074000006",
"000f00010000000000000000fe0005000c736f6674697271000006000f0001000000000000000000000",
"20007646600000300050000040007646600000500096c69766500000600180002010100000000000000",
"00000000e0ec972742000200086370750000030006310000040008637075000005000a737465616c000",
"006000f00010000000000000000000003000632000005000975736572000006000f0001000000000000",
"005f36000500096e696365000006000f0001000000000000000ad80002000e696e74657266616365000",
"0030005000004000e69665f6f6374657473000005000b64756d6d793000000600180002000000000000",
"00000000000000000000041a000200076466000004000764660000050008746d7000000600180002010",
"1000000000000f240000000a0ea97274200020008637075000003000632000004000863707500000500",
"0b73797374656d000006000f00010000000000000045d30002000e696e7465726661636500000300050",
"00004000f69665f7061636b657473000005000b64756d6d793000000600180002000000000000000000",
"00000000000000000f000200086370750000030006320000040008637075000005000969646c6500000",
"6000f0001000000000000a66480000200076466000003000500000400076466000005000d72756e2d6c",
"6f636b000006001800020101000000000000000000000000000054410002000e696e746572666163650",
"00004000e69665f6572726f7273000005000b64756d6d79300000060018000200000000000000000000",
"00000000000000000002000863707500000300063200000400086370750000050009776169740000060",
"00f00010000000000000000000005000e696e74657272757074000006000f0001000000000000000132",
}
b, err := hex.DecodeString(strings.Join(data, ""))
check(err)
return b
}()
Expand Down
8 changes: 8 additions & 0 deletions services/graphite/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,3 +707,11 @@ func TestApplyTemplateFieldError(t *testing.T) {
"'field' can only be used once in each template: current.users.logged_in")
}
}

// Test Helpers
func errstr(err error) string {
if err != nil {
return err.Error()
}
return ""
}
60 changes: 41 additions & 19 deletions services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func (c *tcpConnection) Close() {

// Service represents a Graphite service.
type Service struct {
mu sync.Mutex

bindAddress string
database string
retentionPolicy string
Expand All @@ -71,8 +69,11 @@ type Service struct {
addr net.Addr
udpConn *net.UDPConn

wg sync.WaitGroup
done chan struct{}
wg sync.WaitGroup

mu sync.RWMutex
ready bool // Has the required database been created?
done chan struct{} // Is the service closing or closed?

Monitor interface {
RegisterDiagnosticsClient(name string, client diagnostics.Client)
Expand Down Expand Up @@ -141,21 +142,6 @@ func (s *Service) Open() error {
s.Monitor.RegisterDiagnosticsClient(s.diagsKey, s)
}

if db := s.MetaClient.Database(s.database); db != nil {
if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil {
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec); err != nil {
s.logger.Printf("Failed to ensure target retention policy %s exists: %s", s.database, err.Error())
}
}
} else {
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, &spec); err != nil {
s.logger.Printf("Failed to ensure target database %s exists: %s", s.database, err.Error())
return err
}
}

s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout)
s.batcher.Start()

Expand Down Expand Up @@ -236,6 +222,36 @@ func (s *Service) closed() bool {
return s.done == nil
}

// createInternalStorage ensures that the required database has been created.
func (s *Service) createInternalStorage() error {
s.mu.RLock()
ready := s.ready
s.mu.RUnlock()
if ready {
return nil
}

if db := s.MetaClient.Database(s.database); db != nil {
if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil {
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec); err != nil {
return err
}
}
} else {
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, &spec); err != nil {
return err
}
}

// The service is now ready.
s.mu.Lock()
s.ready = true
s.mu.Unlock()
return nil
}

// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
Expand Down Expand Up @@ -422,6 +438,12 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
for {
select {
case batch := <-batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.logger.Printf("Required database or retention policy do not yet exist: %s", err.Error())
continue
}

if err := s.PointsWriter.WritePoints(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
Expand Down
Loading

0 comments on commit 4776e8f

Please sign in to comment.