Skip to content

Commit

Permalink
adding delete to bulk indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
untoldone committed Jan 17, 2015
1 parent bc9b2a9 commit e3520fc
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
6 changes: 6 additions & 0 deletions lib/corebulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,12 @@ func (b *BulkIndexer) Update(index string, _type string, id, ttl string, date *t
return nil
}

func (b *BulkIndexer) Delete(index, _type, id string, refresh bool) {
queryLine := fmt.Sprintf("{\"delete\":{\"_index\":%q,\"_type\":%q,\"_id\":%q,\"refresh\":%t}}\n", index, _type, id, refresh)
b.bulkChannel <- []byte(queryLine)
return
}

func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, ttl string, date *time.Time, partialDoc interface{}, upsert bool, refresh bool) error {

var data map[string]interface{} = make(map[string]interface{})
Expand Down
27 changes: 27 additions & 0 deletions lib/corebulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,33 @@ func TestBulkSmallBatch(t *testing.T) {

}

func TestBulkDelete(t *testing.T) {
InitTests(true)

c := NewTestConn()
indexer := c.NewBulkIndexer(1)
sentBytes := []byte{}

indexer.Sender = func(buf *bytes.Buffer) error {
sentBytes = append(sentBytes, buf.Bytes()...)
return nil
}

indexer.Start()

indexer.Delete("fake", "fake_type", "1", true)

indexer.Flush()
indexer.Stop()

sent := string(sentBytes)

expected := `{"delete":{"_index":"fake","_type":"fake_type","_id":"1","refresh":true}}
`
asExpected := sent == expected
assert.T(t, asExpected, fmt.Sprintf("Should have sent '%s' but actually sent '%s'", expected, sent))
}

func XXXTestBulkErrors(t *testing.T) {
// lets set a bad port, and hope we get a conn refused error?
c := NewTestConn()
Expand Down

0 comments on commit e3520fc

Please sign in to comment.