Skip to content

Commit

Permalink
Merge pull request mattbaird#31 from dmichael/bulk-update
Browse files Browse the repository at this point in the history
Bulk interfaces now support update
  • Loading branch information
mattbaird committed Aug 9, 2013
2 parents 9187aa0 + c02faf5 commit e90a899
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 6 deletions.
65 changes: 59 additions & 6 deletions core/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strconv"
"sync"
"time"
"fmt"
"errors"
)

var (
Expand Down Expand Up @@ -259,7 +261,18 @@ func (b *BulkIndexor) send(buf *bytes.Buffer) {
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func (b *BulkIndexor) Index(index string, _type string, id, ttl string, date *time.Time, data interface{}) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
by, err := IndexBulkBytes(index, _type, id, ttl, date, data)
by, err := WriteBulkBytes("index", index, _type, id, ttl, date, data)
if err != nil {
u.Error(err)
return err
}
b.bulkChannel <- by
return nil
}

func (b *BulkIndexor) Update(index string, _type string, id, ttl string, date *time.Time, data interface{}) error {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
by, err := WriteBulkBytes("update", index, _type, id, ttl, date, data)
if err != nil {
u.Error(err)
return err
Expand All @@ -282,15 +295,26 @@ func BulkSend(buf *bytes.Buffer) error {

// Given a set of arguments for index, type, id, data create a set of bytes that is formatted for bulkd index
// http://www.elasticsearch.org/guide/reference/api/bulk.html
func IndexBulkBytes(index string, _type string, id, ttl string, date *time.Time, data interface{}) ([]byte, error) {
//{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
func WriteBulkBytes(op string, index string, _type string, id, ttl string, date *time.Time, data interface{}) ([]byte, error) {
// only index and update are currently supported
if op != "index" && op != "update" {
return nil, errors.New(fmt.Sprintf("Operation '%s' is not yet supported", op))
}

// First line
buf := bytes.Buffer{}
buf.WriteString(`{"index":{"_index":"`)
buf.WriteString(fmt.Sprintf(`{"%s":{"_index":"`, op))
buf.WriteString(index)
buf.WriteString(`","_type":"`)
buf.WriteString(_type)
buf.WriteString(`","_id":"`)
buf.WriteString(id)

if op == "update" {
buf.WriteString(`","retry_on_conflict":"3`)
buf.WriteString(ttl)
}

if len(ttl) > 0 {
buf.WriteString(`","ttl":"`)
buf.WriteString(ttl)
Expand All @@ -301,6 +325,7 @@ func IndexBulkBytes(index string, _type string, id, ttl string, date *time.Time,
}
buf.WriteString(`"}}`)
buf.WriteByte('\n')

switch v := data.(type) {
case *bytes.Buffer:
io.Copy(&buf, v)
Expand All @@ -320,6 +345,7 @@ func IndexBulkBytes(index string, _type string, id, ttl string, date *time.Time,
return buf.Bytes(), nil
}


// The index bulk API adds or updates a typed JSON document to a specific index, making it searchable.
// it operates by buffering requests, and ocassionally flushing to elasticsearch
//
Expand All @@ -332,7 +358,20 @@ func IndexBulk(index string, _type string, id string, date *time.Time, data inte
if GlobalBulkIndexor == nil {
panic("Must have Global Bulk Indexor to use this Func")
}
by, err := IndexBulkBytes(index, _type, id, "", date, data)
by, err := WriteBulkBytes("index", index, _type, id, "", date, data)
if err != nil {
return err
}
GlobalBulkIndexor.bulkChannel <- by
return nil
}

func UpdateBulk(index string, _type string, id string, date *time.Time, data interface{}) error {
//{ "update" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
if GlobalBulkIndexor == nil {
panic("Must have Global Bulk Indexor to use this Func")
}
by, err := WriteBulkBytes("update", index, _type, id, "", date, data)
if err != nil {
return err
}
Expand All @@ -352,7 +391,21 @@ func IndexBulkTtl(index string, _type string, id, ttl string, date *time.Time, d
if GlobalBulkIndexor == nil {
panic("Must have Global Bulk Indexor to use this Func")
}
by, err := IndexBulkBytes(index, _type, id, ttl, date, data)
by, err := WriteBulkBytes("index", index, _type, id, ttl, date, data)
if err != nil {
return err
}
GlobalBulkIndexor.bulkChannel <- by
return nil
}


func UpdateBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}) error {
//{ "update" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
if GlobalBulkIndexor == nil {
panic("Must have Global Bulk Indexor to use this Func")
}
by, err := WriteBulkBytes("update", index, _type, id, ttl, date, data)
if err != nil {
return err
}
Expand Down
48 changes: 48 additions & 0 deletions core/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,52 @@ func TestBulkIndexorBasic(t *testing.T) {
u.Assert(u.CloseInt(totalBytesSent, 257), t, "Should have sent 257 bytes but was %v", totalBytesSent)

}

func TestBulkUpdate(t *testing.T) {
InitTests(true)
api.Port = "9200"
indexor := NewBulkIndexor(3)
indexor.BulkSendor = func(buf *bytes.Buffer) error {
messageSets += 1
totalBytesSent += buf.Len()
buffers = append(buffers, buf)
u.Debug(string(buf.Bytes()))
return BulkSend(buf)
}
done := make(chan bool)
indexor.Run(done)

date := time.Unix(1257894000, 0)
user := map[string]interface{}{
"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0), "count": 1,
}

// Lets make sure the data is in the index ...
_, err := Index(true, "users", "user", "5", user)

// script and params
data := map[string]interface{}{
"script": "ctx._source.count += 2",
}
err = indexor.Update("users", "user", "5", "", &date, data)
// So here's the deal. Flushing does seem to work, you just have to give the
// channel a moment to recieve the message ...
// <- time.After(time.Millisecond * 20)
// indexor.Flush()
done <- true

WaitFor(func() bool {
return len(buffers) > 0
}, 5)

u.Assert(BulkErrorCt == 0 && err == nil, t, "Should not have any errors %v", err)

response, err := Get(true, "users", "user", "5")
u.Assert(err == nil, t, "Should not have any errors %v", err)
newCount := response.Source.(map[string]interface {})["count"]
u.Assert( newCount.(float64) == 3, t, "Should have update count: %#v ... %#v", response.Source.(map[string]interface {})["count"], response)
}

func TestBulkSmallBatch(t *testing.T) {
InitTests(true)

Expand All @@ -90,6 +136,8 @@ func TestBulkSmallBatch(t *testing.T) {
indexorsm.Index("users", "user", "3", "", &date, data)
indexorsm.Index("users", "user", "4", "", &date, data)
<-time.After(time.Millisecond * 200)
// indexorsm.Flush()
done <- true
Assert(messageSets == 2, t, "Should have sent 2 message sets %d", messageSets)

}
Expand Down

0 comments on commit e90a899

Please sign in to comment.