Skip to content

Commit

Permalink
rename BulkSync and move error counts around
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremy Shute committed Jun 19, 2014
1 parent 495f810 commit f36b337
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 120 deletions.
98 changes: 17 additions & 81 deletions lib/corebulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,20 @@ import (
"errors"
"fmt"
"io"
// "log"
"strconv"
"sync"
"time"
)

var (
const (
// Max buffer size in bytes before flushing to elasticsearch
BulkMaxBuffer = 1048576
// Max number of Docs to hold in buffer before forcing flush
BulkMaxDocs = 100
// Max delay before forcing a flush to Elasticearch
BulkDelaySeconds = 5
// Keep a running total of errors seen, since it is in the background
BulkErrorCt uint64
// maximum wait shutdown seconds
MAX_SHUTDOWN_SECS = 5

// There is one Global Bulk Indexer for convenience
GlobalBulkIndexer *BulkIndexer
)

type ErrorBuffer struct {
Expand All @@ -51,7 +45,7 @@ type BulkIndexer struct {

// We are creating a variable defining the func responsible for sending
// to allow a mock sendor for test purposes
BulkSender func(*bytes.Buffer) error
Sender func(*bytes.Buffer) error

// If we encounter an error in sending, we are going to retry for this long
// before returning an error
Expand All @@ -64,6 +58,9 @@ type BulkIndexer struct {
// channel for sending to background indexer
bulkChannel chan []byte

// numErrors is a running total of errors seen
numErrors uint64

// shutdown channel
shutdownChan chan bool
// Channel to shutdown http send go-routines
Expand Down Expand Up @@ -97,6 +94,10 @@ type BulkIndexer struct {
sendWg *sync.WaitGroup
}

func (b *BulkIndexer) NumErrors() uint64 {
return b.numErrors
}

func (c *Conn) NewBulkIndexer(maxConns int) *BulkIndexer {
b := BulkIndexer{conn: c, sendBuf: make(chan *bytes.Buffer, maxConns)}
b.needsTimeBasedFlush = true
Expand Down Expand Up @@ -131,8 +132,9 @@ func (c *Conn) NewBulkIndexerErrors(maxConns, retrySeconds int) *BulkIndexer {
func (b *BulkIndexer) Run(done chan bool) {

go func() {
if b.BulkSender == nil {
b.BulkSender = b.conn.BulkSend
// XXX(j): Refactor this stuff to use an interface.
if b.Sender == nil {
b.Sender = b.Send
}
// Backwards compatibility
b.shutdownChan = done
Expand Down Expand Up @@ -190,7 +192,7 @@ func (b *BulkIndexer) startHttpSender() {
select {
case buf := <-b.sendBuf:
b.sendWg.Add(1)
err := b.BulkSender(buf)
err := b.Sender(buf)

// Perhaps a b.FailureStrategy(err) ?? with different types of strategies
// 1. Retry, then panic
Expand All @@ -199,7 +201,7 @@ func (b *BulkIndexer) startHttpSender() {
if err != nil {
if b.RetryForSeconds > 0 {
time.Sleep(time.Second * time.Duration(b.RetryForSeconds))
err = b.BulkSender(buf)
err = b.Sender(buf)
if err == nil {
// Successfully re-sent with no error
b.sendWg.Done()
Expand Down Expand Up @@ -315,10 +317,10 @@ func (b *BulkIndexer) Update(index string, _type string, id, ttl string, date *t

// This does the actual send of a buffer, which has already been formatted
// into bytes of ES formatted bulk data
func (c *Conn) BulkSend(buf *bytes.Buffer) error {
_, err := c.DoCommand("POST", "/_bulk", nil, buf)
func (b *BulkIndexer) Send(buf *bytes.Buffer) error {
_, err := b.conn.DoCommand("POST", "/_bulk", nil, buf)
if err != nil {
BulkErrorCt += 1
b.numErrors += 1
return err
}
return nil
Expand Down Expand Up @@ -382,69 +384,3 @@ func WriteBulkBytes(op string, index string, _type string, id, ttl string, date
buf.WriteRune('\n')
return buf.Bytes(), nil
}

// The index bulk API adds or updates a typed JSON document to a specific index, making it searchable.
// it operates by buffering requests, and ocassionally flushing to elasticsearch
//
// This uses the one Global Bulk Indexer, you can also create your own non-global indexers and use the
// Index functions of that
//
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func IndexBulk(index string, _type string, id string, date *time.Time, data interface{}, refresh bool) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
if GlobalBulkIndexer == nil {
panic("Must have Global Bulk Indexer to use this Func")
}
by, err := WriteBulkBytes("index", index, _type, id, "", date, data, refresh)
if err != nil {
return err
}
GlobalBulkIndexer.bulkChannel <- by
return nil
}

func UpdateBulk(index string, _type string, id string, date *time.Time, data interface{}, refresh bool) error {
//{ "update" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
if GlobalBulkIndexer == nil {
panic("Must have Global Bulk Indexer to use this Func")
}
by, err := WriteBulkBytes("update", index, _type, id, "", date, data, refresh)
if err != nil {
return err
}
GlobalBulkIndexer.bulkChannel <- by
return nil
}

// The index bulk API adds or updates a typed JSON document to a specific index, making it searchable.
// it operates by buffering requests, and ocassionally flushing to elasticsearch.
//
// This uses the one Global Bulk Indexer, you can also create your own non-global indexers and use the
// IndexTtl functions of that
//
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func IndexBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}, refresh bool) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
if GlobalBulkIndexer == nil {
panic("Must have Global Bulk Indexer to use this Func")
}
by, err := WriteBulkBytes("index", index, _type, id, ttl, date, data, refresh)
if err != nil {
return err
}
GlobalBulkIndexer.bulkChannel <- by
return nil
}

func UpdateBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}, refresh bool) error {
//{ "update" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
if GlobalBulkIndexer == nil {
panic("Must have Global Bulk Indexer to use this Func")
}
by, err := WriteBulkBytes("update", index, _type, id, ttl, date, data, refresh)
if err != nil {
return err
}
GlobalBulkIndexer.bulkChannel <- by
return nil
}
61 changes: 31 additions & 30 deletions lib/corebulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func TestBulkIndexerBasic(t *testing.T) {
InitTests(true)
c := NewConn()
indexer := c.NewBulkIndexer(3)
indexer.BulkSender = func(buf *bytes.Buffer) error {
indexer.Sender = func(buf *bytes.Buffer) error {
messageSets += 1
totalBytesSent += buf.Len()
buffers = append(buffers, buf)
// log.Printf("buffer:%s", string(buf.Bytes()))
return c.BulkSend(buf)
return indexer.Send(buf)
}
done := make(chan bool)
indexer.Run(done)
Expand All @@ -75,7 +75,7 @@ func TestBulkIndexerBasic(t *testing.T) {
// part of request is url, so lets factor that in
//totalBytesSent = totalBytesSent - len(*eshost)
assert.T(t, len(buffers) == 1, fmt.Sprintf("Should have sent one operation but was %d", len(buffers)))
assert.T(t, BulkErrorCt == 0 && err == nil, fmt.Sprintf("Should not have any errors. BulkErroCt: %v, err:%v", BulkErrorCt, err))
assert.T(t, indexer.NumErrors() == 0 && err == nil, fmt.Sprintf("Should not have any errors. BulkErroCt: %v, err:%v", indexer.NumErrors(), err))
expectedBytes := 166
assert.T(t, totalBytesSent == expectedBytes, fmt.Sprintf("Should have sent %v bytes but was %v", expectedBytes, totalBytesSent))

Expand All @@ -87,7 +87,7 @@ func TestBulkIndexerBasic(t *testing.T) {
assert.T(t, err == nil, fmt.Sprintf("Should have nil error =%v", err))
assert.T(t, len(buffers) == 2, fmt.Sprintf("Should have another buffer ct=%d", len(buffers)))

assert.T(t, BulkErrorCt == 0, fmt.Sprintf("Should not have any errors %d", BulkErrorCt))
assert.T(t, indexer.NumErrors() == 0, fmt.Sprintf("Should not have any errors %d", indexer.NumErrors()))
expectedBytes = 282 // with refresh
assert.T(t, closeInt(totalBytesSent, expectedBytes), fmt.Sprintf("Should have sent %v bytes but was %v", expectedBytes, totalBytesSent))

Expand All @@ -100,11 +100,11 @@ func XXXTestBulkUpdate(t *testing.T) {
c := NewConn()
c.Port = "9200"
indexer := c.NewBulkIndexer(3)
indexer.BulkSender = func(buf *bytes.Buffer) error {
indexer.Sender = func(buf *bytes.Buffer) error {
messageSets += 1
totalBytesSent += buf.Len()
buffers = append(buffers, buf)
return c.BulkSend(buf)
return indexer.Send(buf)
}
done := make(chan bool)
indexer.Run(done)
Expand Down Expand Up @@ -132,7 +132,7 @@ func XXXTestBulkUpdate(t *testing.T) {
return len(buffers) > 0
}, 5)

assert.T(t, BulkErrorCt == 0 && err == nil, fmt.Sprintf("Should not have any errors, bulkErrorCt:%v, err:%v", BulkErrorCt, err))
assert.T(t, indexer.NumErrors() == 0 && err == nil, fmt.Sprintf("Should not have any errors, bulkErrorCt:%v, err:%v", indexer.NumErrors(), err))

response, err := c.Get("users", "user", "5", nil)
assert.T(t, err == nil, fmt.Sprintf("Should not have any errors %v", err))
Expand All @@ -151,22 +151,22 @@ func TestBulkSmallBatch(t *testing.T) {
data := map[string]interface{}{"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0)}

// Now tests small batches
indexersm := c.NewBulkIndexer(1)
indexersm.BufferDelayMax = 100 * time.Millisecond
indexersm.BulkMaxDocs = 2
indexer := c.NewBulkIndexer(1)
indexer.BufferDelayMax = 100 * time.Millisecond
indexer.BulkMaxDocs = 2
messageSets = 0
indexersm.BulkSender = func(buf *bytes.Buffer) error {
indexer.Sender = func(buf *bytes.Buffer) error {
messageSets += 1
return c.BulkSend(buf)
return indexer.Send(buf)
}
indexersm.Run(done)
indexer.Run(done)
<-time.After(time.Millisecond * 20)

indexersm.Index("users", "user", "2", "", &date, data, true)
indexersm.Index("users", "user", "3", "", &date, data, true)
indexersm.Index("users", "user", "4", "", &date, data, true)
indexer.Index("users", "user", "2", "", &date, data, true)
indexer.Index("users", "user", "3", "", &date, data, true)
indexer.Index("users", "user", "4", "", &date, data, true)
<-time.After(time.Millisecond * 200)
// indexersm.Flush()
// indexer.Flush()
done <- true
assert.T(t, messageSets == 2, fmt.Sprintf("Should have sent 2 message sets %d", messageSets))

Expand All @@ -179,7 +179,6 @@ func XXXTestBulkErrors(t *testing.T) {
defer func() {
c.Port = "9200"
}()
BulkDelaySeconds = 1
indexer := c.NewBulkIndexerErrors(10, 1)
done := make(chan bool)
indexer.Run(done)
Expand All @@ -204,47 +203,48 @@ func XXXTestBulkErrors(t *testing.T) {
}

/*
BenchmarkBulkSend 18:33:00 bulk_test.go:131: Sent 1 messages in 0 sets totaling 0 bytes
BenchmarkSend 18:33:00 bulk_test.go:131: Sent 1 messages in 0 sets totaling 0 bytes
18:33:00 bulk_test.go:131: Sent 100 messages in 1 sets totaling 145889 bytes
18:33:01 bulk_test.go:131: Sent 10000 messages in 100 sets totaling 14608888 bytes
18:33:05 bulk_test.go:131: Sent 20000 messages in 99 sets totaling 14462790 bytes
20000 234526 ns/op
*/
func BenchmarkBulkSend(b *testing.B) {
func BenchmarkSend(b *testing.B) {
InitTests(true)
c := NewConn()
b.StartTimer()
totalBytes := 0
sets := 0
GlobalBulkIndexer.BulkSender = func(buf *bytes.Buffer) error {
indexer := c.NewBulkIndexer(1)
indexer.Sender = func(buf *bytes.Buffer) error {
totalBytes += buf.Len()
sets += 1
//log.Println("got bulk")
return c.BulkSend(buf)
return indexer.Send(buf)
}
for i := 0; i < b.N; i++ {
about := make([]byte, 1000)
rand.Read(about)
data := map[string]interface{}{"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0), "about": about}
IndexBulk("users", "user", strconv.Itoa(i), nil, data, true)
indexer.Index("users", "user", strconv.Itoa(i), "", nil, data, true)
}
log.Printf("Sent %d messages in %d sets totaling %d bytes \n", b.N, sets, totalBytes)
if BulkErrorCt != 0 {
if indexer.NumErrors() != 0 {
b.Fail()
}
}

/*
TODO: this should be faster than above
BenchmarkBulkSendBytes 18:33:05 bulk_test.go:169: Sent 1 messages in 0 sets totaling 0 bytes
BenchmarkSendBytes 18:33:05 bulk_test.go:169: Sent 1 messages in 0 sets totaling 0 bytes
18:33:05 bulk_test.go:169: Sent 100 messages in 2 sets totaling 292299 bytes
18:33:09 bulk_test.go:169: Sent 10000 messages in 99 sets totaling 14473800 bytes
10000 373529 ns/op
*/
func BenchmarkBulkSendBytes(b *testing.B) {
func BenchmarkSendBytes(b *testing.B) {
InitTests(true)
c := NewConn()
about := make([]byte, 1000)
Expand All @@ -254,16 +254,17 @@ func BenchmarkBulkSendBytes(b *testing.B) {
b.StartTimer()
totalBytes := 0
sets := 0
GlobalBulkIndexer.BulkSender = func(buf *bytes.Buffer) error {
indexer := c.NewBulkIndexer(1)
indexer.Sender = func(buf *bytes.Buffer) error {
totalBytes += buf.Len()
sets += 1
return c.BulkSend(buf)
return indexer.Send(buf)
}
for i := 0; i < b.N; i++ {
IndexBulk("users", "user", strconv.Itoa(i), nil, body, true)
indexer.Index("users", "user", strconv.Itoa(i), "", nil, body, true)
}
log.Printf("Sent %d messages in %d sets totaling %d bytes \n", b.N, sets, totalBytes)
if BulkErrorCt != 0 {
if indexer.NumErrors() != 0 {
b.Fail()
}
}
2 changes: 1 addition & 1 deletion lib/coreexample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func ExampleBulkIndexer_responses() {

indexer := c.NewBulkIndexer(10)
// Create a custom Sender Func, to allow inspection of response/error
indexer.BulkSender = func(buf *bytes.Buffer) error {
indexer.Sender = func(buf *bytes.Buffer) error {
// @buf is the buffer of docs about to be written
respJson, err := c.DoCommand("POST", "/_bulk", nil, buf)
if err != nil {
Expand Down
Loading

0 comments on commit f36b337

Please sign in to comment.