Skip to content

Commit

Permalink
Merge pull request mattbaird#32 from araddon/flush_30
Browse files Browse the repository at this point in the history
Fix Flush() on bulk to ensure work in process is sent, completed before returning
  • Loading branch information
mattbaird committed Aug 9, 2013
2 parents 5eaa93f + 53e96e9 commit 9187aa0
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 47 deletions.
34 changes: 30 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,50 @@ Adding content to Elasticsearch
----------------------------------------------

examples:


import "github.com/mattbaird/elastigo/api"
import "github.com/mattbaird/elastigo/core"

type Tweet struct {
User string `json:"user"`
Message string `json:"message"`
}

// Set the Elasticsearch Host to Connect to
api.Domain = "localhost"
// api.Port = "9300"

// add single go struct entity
response, _ := core.Index(true, "twitter", "tweet", "1", NewTweet("kimchy", "Search is cool"))
response, _ := core.Index(true, "twitter", "tweet", "1", Tweet{"kimchy", "Search is cool"})

// you have bytes
tw := Tweet{"kimchy", "Search is cool part 2"}
bytesLine, err := json.Marshall(tw)
response, _ := core.Index(true, "twitter", "tweet", "2", bytesLine)

// Bulk Indexing
core.IndexBulk("twitter", "tweet", "3", &time.Now(), NewTweet("kimchy", "Search is now cooler"))
core.IndexBulk("twitter", "tweet", "3", &time.Now(), Tweet{"kimchy", "Search is now cooler"})

// Search Using Raw json String
searchJson := `{
"query" : {
"term" : { "user" : "kimchy" }
}
}`
out, err := core.SearchRequest(true, "twitter", "tweet", searchJson, "")
if len(out.Hits.Hits) == 1 {
fmt.Println(string(out.Hits.Hits[0].Source))
}


Search Examples
Search DSL Examples
-------------------------

A Faceted, ranged Search using the `Search DSL` :

import "github.com/mattbaird/elastigo/api"
import "github.com/mattbaird/elastigo/core"

// Set the Elasticsearch Host to Connect to
api.Domain = "localhost"
// api.Port = "9300"
Expand Down Expand Up @@ -97,6 +120,9 @@ Adding content to Elasticsearch in Bulk

example:

import "github.com/mattbaird/elastigo/api"
import "github.com/mattbaird/elastigo/core"

// Set the Elasticsearch Host to Connect to
api.Domain = "localhost"
// api.Port = "9300"
Expand Down
17 changes: 17 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/mattbaird/elastigo/core"
"github.com/mattbaird/elastigo/indices"
"log"
"time"
)

var (
Expand Down Expand Up @@ -64,3 +65,19 @@ func main() {
cluster.State("transient", "discovery.zen.minimum_master_nodes", 2)

}

// used in test suite, chosen to be similar to the documentation
type Tweet struct {
User string `json:"user"`
PostDate time.Time `json:"postDate"`
Message string `json:"message"`
}

func NewTweet(user string, message string) Tweet {
return Tweet{User: user, PostDate: time.Now(), Message: message}
}

func (t *Tweet) String() string {
b, _ := json.Marshal(t)
return string(b)
}
54 changes: 40 additions & 14 deletions core/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
BulkDelaySeconds = 5
// Keep a running total of errors seen, since it is in the background
BulkErrorCt uint64
// maximum wait shutdown seconds
MAX_SHUTDOWN_SECS = 5

// There is one Global Bulk Indexor for convenience
GlobalBulkIndexor *BulkIndexor
Expand Down Expand Up @@ -68,9 +70,10 @@ type BulkIndexor struct {
// shutdown channel
shutdownChan chan bool

// buffers
// Channel to send a complete byte.Buffer to the http sendor
sendBuf chan *bytes.Buffer
buf *bytes.Buffer
// byte buffer for docs that have been converted to bytes, but not yet sent
buf *bytes.Buffer
// Buffer for Max number of time before forcing flush
BufferDelayMax time.Duration
// Max buffer size in bytes before flushing to elasticsearch
Expand All @@ -85,18 +88,22 @@ type BulkIndexor struct {
// If we are indexing enough docs per bufferdelaymax, we won't need to do time
// based eviction, else we do.
needsTimeBasedFlush bool
mu sync.Mutex
// Lock for document writes/operations
mu sync.Mutex
// Wait Group for the http sends
sendWg *sync.WaitGroup
}

func NewBulkIndexor(maxConns int) *BulkIndexor {
b := BulkIndexor{sendBuf: make(chan *bytes.Buffer, maxConns)}
b.needsTimeBasedFlush = true
b.buf = new(bytes.Buffer)
b.maxConns = maxConns
b.BulkMaxBuffer = BulkMaxBuffer
b.BulkMaxDocs = BulkMaxDocs
b.BufferDelayMax = time.Duration(BulkDelaySeconds) * time.Second
b.buf = new(bytes.Buffer)
b.maxConns = maxConns
b.bulkChannel = make(chan []byte, 100)
b.sendWg = new(sync.WaitGroup)
return &b
}

Expand All @@ -107,17 +114,10 @@ func NewBulkIndexor(maxConns int) *BulkIndexor {
// done := make(chan bool)
// BulkIndexorGlobalRun(100, done)
func NewBulkIndexorErrors(maxConns, retrySeconds int) *BulkIndexor {
b := BulkIndexor{sendBuf: make(chan *bytes.Buffer, maxConns)}
b.needsTimeBasedFlush = true
b.buf = new(bytes.Buffer)
b.maxConns = maxConns
b.BulkMaxBuffer = BulkMaxBuffer
b.BulkMaxDocs = BulkMaxDocs
b.BufferDelayMax = time.Duration(BulkDelaySeconds) * time.Second
b := NewBulkIndexor(maxConns)
b.RetryForSeconds = retrySeconds
b.bulkChannel = make(chan []byte, 100)
b.ErrorChannel = make(chan *ErrorBuffer, 20)
return &b
return b
}

// Starts this bulk Indexor running, this Run opens a go routine so is
Expand All @@ -137,13 +137,35 @@ func (b *BulkIndexor) Run(done chan bool) {
}()
}

// Make a channel that will close when the given WaitGroup is done.
func wgChan(wg *sync.WaitGroup) <-chan interface{} {
ch := make(chan interface{})
go func() {
wg.Wait()
close(ch)
}()
return ch
}

// Flush all current documents to ElasticSearch
func (b *BulkIndexor) Flush() {
b.mu.Lock()
if b.docCt > 0 {
b.send(b.buf)
}
b.mu.Unlock()
for {
select {
case <-wgChan(b.sendWg):
// done
u.Info("Normal Wait Group Shutdown")
return
case <-time.After(time.Second * time.Duration(MAX_SHUTDOWN_SECS)):
// timeout!
u.Error("Timeout in Shutdown!")
return
}
}
}

func (b *BulkIndexor) startHttpSendor() {
Expand All @@ -156,6 +178,7 @@ func (b *BulkIndexor) startHttpSendor() {
go func() {
for {
buf := <-b.sendBuf
b.sendWg.Add(1)
err := b.BulkSendor(buf)

// Perhaps a b.FailureStrategy(err) ?? with different types of strategies
Expand All @@ -167,6 +190,8 @@ func (b *BulkIndexor) startHttpSendor() {
time.Sleep(time.Second * time.Duration(b.RetryForSeconds))
err = b.BulkSendor(buf)
if err == nil {
// Successfully re-sent with no error
b.sendWg.Done()
continue
}
}
Expand All @@ -175,6 +200,7 @@ func (b *BulkIndexor) startHttpSendor() {
b.ErrorChannel <- &ErrorBuffer{err, buf}
}
}
b.sendWg.Done()
}
}()
}
Expand Down
14 changes: 7 additions & 7 deletions core/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {
u.SetupLogging("debug")
}
}
func TestBulk(t *testing.T) {
func TestBulkIndexorBasic(t *testing.T) {
InitTests(true)
indexor := NewBulkIndexor(3)
indexor.BulkSendor = func(buf *bytes.Buffer) error {
Expand All @@ -55,14 +55,14 @@ func TestBulk(t *testing.T) {
u.Assert(totalBytesSent == 145, t, "Should have sent 135 bytes but was %v", totalBytesSent)

err = indexor.Index("users", "user", "2", "", nil, data)

WaitFor(func() bool {
return len(buffers) > 1
}, 5)
<-time.After(time.Millisecond * 10) // we need to wait for doc to hit send channel
// this will test to ensure that Flush actually catches a doc
indexor.Flush()
totalBytesSent = totalBytesSent - len(*eshost)
u.Assert(len(buffers) == 2, t, "Should have nil error, and another buffer")
u.Assert(err == nil, t, "Should have nil error =%v", err)
u.Assert(len(buffers) == 2, t, "Should have another buffer ct=%d", len(buffers))

u.Assert(BulkErrorCt == 0 && err == nil, t, "Should not have any errors")
u.Assert(BulkErrorCt == 0, t, "Should not have any errors %d", BulkErrorCt)
u.Assert(u.CloseInt(totalBytesSent, 257), t, "Should have sent 257 bytes but was %v", totalBytesSent)

}
Expand Down
22 changes: 0 additions & 22 deletions testTweet.go

This file was deleted.

0 comments on commit 9187aa0

Please sign in to comment.