Skip to content

Commit

Permalink
Fixing Flush to wait for http sends to complete, refs mattbaird#30
Browse files Browse the repository at this point in the history
  • Loading branch information
araddon committed Aug 3, 2013
1 parent 75c5785 commit 37111c1
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
55 changes: 41 additions & 14 deletions core/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
BulkDelaySeconds = 5
// Keep a running total of errors seen, since it is in the background
BulkErrorCt uint64
// maximum wait shutdown seconds
MAX_SHUTDOWN_SECS = 5

// There is one Global Bulk Indexor for convenience
GlobalBulkIndexor *BulkIndexor
Expand Down Expand Up @@ -68,9 +70,10 @@ type BulkIndexor struct {
// shutdown channel
shutdownChan chan bool

// buffers
// Channel to send a complete byte.Buffer to the http sendor
sendBuf chan *bytes.Buffer
buf *bytes.Buffer
// byte buffer for docs that have been converted to bytes, but not yet sent
buf *bytes.Buffer
// Buffer for Max number of time before forcing flush
BufferDelayMax time.Duration
// Max buffer size in bytes before flushing to elasticsearch
Expand All @@ -85,18 +88,22 @@ type BulkIndexor struct {
// If we are indexing enough docs per bufferdelaymax, we won't need to do time
// based eviction, else we do.
needsTimeBasedFlush bool
mu sync.Mutex
// Lock for document writes/operations
mu sync.Mutex
// Wait Group for the http sends
sendWg *sync.WaitGroup
}

func NewBulkIndexor(maxConns int) *BulkIndexor {
b := BulkIndexor{sendBuf: make(chan *bytes.Buffer, maxConns)}
b.needsTimeBasedFlush = true
b.buf = new(bytes.Buffer)
b.maxConns = maxConns
b.BulkMaxBuffer = BulkMaxBuffer
b.BulkMaxDocs = BulkMaxDocs
b.BufferDelayMax = time.Duration(BulkDelaySeconds) * time.Second
b.buf = new(bytes.Buffer)
b.maxConns = maxConns
b.bulkChannel = make(chan []byte, 100)
b.sendWg = new(sync.WaitGroup)
return &b
}

Expand All @@ -107,17 +114,10 @@ func NewBulkIndexor(maxConns int) *BulkIndexor {
// done := make(chan bool)
// BulkIndexorGlobalRun(100, done)
func NewBulkIndexorErrors(maxConns, retrySeconds int) *BulkIndexor {
b := BulkIndexor{sendBuf: make(chan *bytes.Buffer, maxConns)}
b.needsTimeBasedFlush = true
b.buf = new(bytes.Buffer)
b.maxConns = maxConns
b.BulkMaxBuffer = BulkMaxBuffer
b.BulkMaxDocs = BulkMaxDocs
b.BufferDelayMax = time.Duration(BulkDelaySeconds) * time.Second
b := NewBulkIndexor(maxConns)
b.RetryForSeconds = retrySeconds
b.bulkChannel = make(chan []byte, 100)
b.ErrorChannel = make(chan *ErrorBuffer, 20)
return &b
return b
}

// Starts this bulk Indexor running, this Run opens a go routine so is
Expand All @@ -137,13 +137,35 @@ func (b *BulkIndexor) Run(done chan bool) {
}()
}

// Make a channel that will close when the given WaitGroup is done.
func wgChan(wg *sync.WaitGroup) <-chan interface{} {
ch := make(chan interface{})
go func() {
wg.Wait()
close(ch)
}()
return ch
}

// Flush all current documents to ElasticSearch
func (b *BulkIndexor) Flush() {
b.mu.Lock()
if b.docCt > 0 {
b.send(b.buf)
}
b.mu.Unlock()
for {
select {
case <-wgChan(b.sendWg):
// done
u.Info("Normal Wait Group Shutdown")
return
case <-time.After(time.Second * time.Duration(MAX_SHUTDOWN_SECS)):
// timeout!
u.Error("Timeout in Shutdown!")
return
}
}
}

func (b *BulkIndexor) startHttpSendor() {
Expand All @@ -156,6 +178,7 @@ func (b *BulkIndexor) startHttpSendor() {
go func() {
for {
buf := <-b.sendBuf
b.sendWg.Add(1)
err := b.BulkSendor(buf)

// Perhaps a b.FailureStrategy(err) ?? with different types of strategies
Expand All @@ -167,6 +190,9 @@ func (b *BulkIndexor) startHttpSendor() {
time.Sleep(time.Second * time.Duration(b.RetryForSeconds))
err = b.BulkSendor(buf)
if err == nil {
// at this point we are Abandoning the documents, which is not
// TODO: some better retry mechanisms
b.sendWg.Done()
continue
}
}
Expand All @@ -175,6 +201,7 @@ func (b *BulkIndexor) startHttpSendor() {
b.ErrorChannel <- &ErrorBuffer{err, buf}
}
}
b.sendWg.Done()
}
}()
}
Expand Down
14 changes: 7 additions & 7 deletions core/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {
u.SetupLogging("debug")
}
}
func TestBulk(t *testing.T) {
func TestBulkIndexorBasic(t *testing.T) {
InitTests(true)
indexor := NewBulkIndexor(3)
indexor.BulkSendor = func(buf *bytes.Buffer) error {
Expand All @@ -55,14 +55,14 @@ func TestBulk(t *testing.T) {
u.Assert(totalBytesSent == 145, t, "Should have sent 135 bytes but was %v", totalBytesSent)

err = indexor.Index("users", "user", "2", "", nil, data)

WaitFor(func() bool {
return len(buffers) > 1
}, 5)
<-time.After(time.Millisecond * 10) // we need to wait for doc to hit send channel
// this will test to ensure that Flush actually catches a doc
indexor.Flush()
totalBytesSent = totalBytesSent - len(*eshost)
u.Assert(len(buffers) == 2, t, "Should have nil error, and another buffer")
u.Assert(err == nil, t, "Should have nil error =%v", err)
u.Assert(len(buffers) == 2, t, "Should have another buffer ct=%d", len(buffers))

u.Assert(BulkErrorCt == 0 && err == nil, t, "Should not have any errors")
u.Assert(BulkErrorCt == 0, t, "Should not have any errors %d", BulkErrorCt)
u.Assert(u.CloseInt(totalBytesSent, 257), t, "Should have sent 257 bytes but was %v", totalBytesSent)

}
Expand Down

0 comments on commit 37111c1

Please sign in to comment.