Skip to content

Commit

Permalink
got tests to compile; still need to check that tests actually pass
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremy Shute committed Jun 19, 2014
1 parent b8e5ca5 commit 495f810
Show file tree
Hide file tree
Showing 44 changed files with 166 additions and 202 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func main() {
flag.Parse()
log.SetFlags(log.Ltime | log.Lshortfile)

c := elastigo.NewConnection()
c := elastigo.NewConn()
c.Domain = *eshost
response, _ := c.Index("twitter", "tweet", "1", nil, NewTweet("kimchy", "Search is cool"))
c.Flush()
Expand Down
4 changes: 2 additions & 2 deletions lib/baserequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"time"
)

func (c *Connection) DoCommand(method string, url string, args map[string]interface{}, data interface{}) ([]byte, error) {
func (c *Conn) DoCommand(method string, url string, args map[string]interface{}, data interface{}) ([]byte, error) {
var response map[string]interface{}
var body []byte
var httpStatusCode int
Expand Down Expand Up @@ -82,7 +82,7 @@ func (e ESError) Error() string {
// Exists allows the caller to check for the existance of a document using HEAD
// This appears to be broken in the current version of elasticsearch 0.19.10, currently
// returning nothing
func (c *Connection) Exists(index string, _type string, id string, args map[string]interface{}) (BaseResponse, error) {
func (c *Conn) Exists(index string, _type string, id string, args map[string]interface{}) (BaseResponse, error) {
var response map[string]interface{}
var body []byte
var url string
Expand Down
4 changes: 2 additions & 2 deletions lib/clusterhealth.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// TODO: implement wait_for_status, timeout, wait_for_relocating_shards, wait_for_nodes
// TODO: implement level (Can be one of cluster, indices or shards. Controls the details level of the health
// information returned. Defaults to cluster.)
func (c *Connection) Health(indices ...string) (ClusterHealthResponse, error) {
func (c *Conn) Health(indices ...string) (ClusterHealthResponse, error) {
var url string
var retval ClusterHealthResponse
if len(indices) > 0 {
Expand Down Expand Up @@ -78,7 +78,7 @@ func (f ClusterStateFilter) Parameterize() []string {
return parts
}

func (c *Connection) ClusterState(filter ClusterStateFilter) (ClusterStateResponse, error) {
func (c *Conn) ClusterState(filter ClusterStateFilter) (ClusterStateResponse, error) {
var parameters []string
var url string
var retval ClusterStateResponse
Expand Down
4 changes: 2 additions & 2 deletions lib/clusternodesinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (

// The cluster nodes info API allows to retrieve one or more (or all) of the cluster nodes information.
// informatino can be one of jvm, process
func (c *Connection) AllNodesInfo() (NodeInfo, error) {
func (c *Conn) AllNodesInfo() (NodeInfo, error) {
return c.NodesInfo([]string{"_all"}, "_all")
}

func (c *Connection) NodesInfo(information []string, nodes ...string) (NodeInfo, error) {
func (c *Conn) NodesInfo(information []string, nodes ...string) (NodeInfo, error) {
var url string
var retval NodeInfo
url = fmt.Sprintf("/_nodes/%s/%s", strings.Join(nodes, ","), strings.Join(information, ","))
Expand Down
3 changes: 2 additions & 1 deletion lib/clusternodesinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

func TestGetAll(t *testing.T) {
nodesInfo, err := AllNodesInfo()
c := NewConn()
nodesInfo, err := c.AllNodesInfo()
//log.Println(out)
assert.T(t, err == nil, fmt.Sprintf("should not have gotten error, received :%v", err))
assert.T(t, nodesInfo.ClusterName == "elasticsearch", fmt.Sprintf("clustername should have been elasticsearch, received :%v", err))
Expand Down
2 changes: 1 addition & 1 deletion lib/clusternodesshutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// delay is a integer representing number of seconds
// passing "" or "_all" for the nodes parameter will shut down all nodes
// see http://www.elasticsearch.org/guide/reference/api/admin-cluster-nodes-shutdown/
func (c *Connection) NodesShutdown(delay int, nodes ...string) error {
func (c *Conn) NodesShutdown(delay int, nodes ...string) error {
shutdownUrl := fmt.Sprintf("/_cluster/nodes/%s/_shutdown", strings.Join(nodes, ","))
if delay > 0 {
var values url.Values = url.Values{}
Expand Down
2 changes: 1 addition & 1 deletion lib/clusterreroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// The cluster health API allows to get a very simple status on the health of the cluster.
// see http://www.elasticsearch.org/guide/reference/api/admin-cluster-health.html
// information returned. Defaults to cluster.)
func (c *Connection) Reroute(dryRun bool, commands Commands) (ClusterHealthResponse, error) {
func (c *Conn) Reroute(dryRun bool, commands Commands) (ClusterHealthResponse, error) {
var url string
var retval ClusterHealthResponse

Expand Down
2 changes: 1 addition & 1 deletion lib/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// State gets the comprehensive state information for the whole cluster
// see http://www.elasticsearch.org/guide/reference/api/admin-cluster-state/
func (c *Connection) UpdateSetting(args map[string]interface{}, filter_indices ...string) (ClusterStateResponse, error) {
func (c *Conn) UpdateSetting(args map[string]interface{}, filter_indices ...string) (ClusterStateResponse, error) {
var url string
var retval ClusterStateResponse

Expand Down
2 changes: 1 addition & 1 deletion lib/clusterupdatesettings.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// UpdateSettings allows to update cluster wide specific settings. Defaults to Transient setting
// Settings updated can either be persistent (applied cross restarts) or transient (will not survive a full cluster restart).
// http://www.elasticsearch.org/guide/reference/api/admin-cluster-update-settings.html
func (c *Connection) UpdateSettings(settingType string, key string, value int) (ClusterSettingsResponse, error) {
func (c *Conn) UpdateSettings(settingType string, key string, value int) (ClusterSettingsResponse, error) {
var retval ClusterSettingsResponse
if settingType != "transient" && settingType != "persistent" {
return retval, fmt.Errorf("settingType must be one of transient or persistent, you passed %s", settingType)
Expand Down
12 changes: 6 additions & 6 deletions lib/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
DefaultDecayDuration = 0
)

type Connection struct {
type Conn struct {
// Maintain these for backwards compatibility
Protocol string
Domain string
Expand All @@ -49,8 +49,8 @@ type Connection struct {
DecayDuration time.Duration
}

func NewConnection() *Connection {
return &Connection{
func NewConn() *Conn {
return &Conn{
// Maintain these for backwards compatibility
Protocol: DefaultProtocol,
Domain: DefaultDomain,
Expand All @@ -60,7 +60,7 @@ func NewConnection() *Connection {
}
}

func (c *Connection) SetHosts(newhosts []string) {
func (c *Conn) SetHosts(newhosts []string) {

// Store the new host list
c.Hosts = newhosts
Expand All @@ -71,7 +71,7 @@ func (c *Connection) SetHosts(newhosts []string) {
}

// Set up the host pool to be used
func (c *Connection) initializeHostPool() {
func (c *Conn) initializeHostPool() {

// If no hosts are set, fallback to defaults
if len(c.Hosts) == 0 {
Expand All @@ -93,7 +93,7 @@ func (c *Connection) initializeHostPool() {
c.Hosts, c.DecayDuration, &hostpool.LinearEpsilonValueCalculator{})
}

func (c *Connection) NewRequest(method, path, query string) (*Request, error) {
func (c *Conn) NewRequest(method, path, query string) (*Request, error) {
// Setup the hostpool on our first run
c.once.Do(c.initializeHostPool)

Expand Down
14 changes: 7 additions & 7 deletions lib/corebulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type ErrorBuffer struct {
// A bulk indexer creates goroutines, and channels for connecting and sending data
// to elasticsearch in bulk, using buffers.
type BulkIndexer struct {
connection *Connection
conn *Conn

// We are creating a variable defining the func responsible for sending
// to allow a mock sendor for test purposes
Expand Down Expand Up @@ -86,7 +86,7 @@ type BulkIndexer struct {

// Number of documents we have send through so far on this session
docCt int
// Max number of http connections in flight at one time
// Max number of http conns in flight at one time
maxConns int
// If we are indexing enough docs per bufferdelaymax, we won't need to do time
// based eviction, else we do.
Expand All @@ -97,8 +97,8 @@ type BulkIndexer struct {
sendWg *sync.WaitGroup
}

func (c *Connection) NewBulkIndexer(maxConns int) *BulkIndexer {
b := BulkIndexer{connection: c, sendBuf: make(chan *bytes.Buffer, maxConns)}
func (c *Conn) NewBulkIndexer(maxConns int) *BulkIndexer {
b := BulkIndexer{conn: c, sendBuf: make(chan *bytes.Buffer, maxConns)}
b.needsTimeBasedFlush = true
b.buf = new(bytes.Buffer)
b.maxConns = maxConns
Expand All @@ -119,7 +119,7 @@ func (c *Connection) NewBulkIndexer(maxConns int) *BulkIndexer {
//
// done := make(chan bool)
// BulkIndexerGlobalRun(100, done)
func (c *Connection) NewBulkIndexerErrors(maxConns, retrySeconds int) *BulkIndexer {
func (c *Conn) NewBulkIndexerErrors(maxConns, retrySeconds int) *BulkIndexer {
b := c.NewBulkIndexer(maxConns)
b.RetryForSeconds = retrySeconds
b.ErrorChannel = make(chan *ErrorBuffer, 20)
Expand All @@ -132,7 +132,7 @@ func (b *BulkIndexer) Run(done chan bool) {

go func() {
if b.BulkSender == nil {
b.BulkSender = b.connection.BulkSend
b.BulkSender = b.conn.BulkSend
}
// Backwards compatibility
b.shutdownChan = done
Expand Down Expand Up @@ -315,7 +315,7 @@ func (b *BulkIndexer) Update(index string, _type string, id, ttl string, date *t

// This does the actual send of a buffer, which has already been formatted
// into bytes of ES formatted bulk data
func (c *Connection) BulkSend(buf *bytes.Buffer) error {
func (c *Conn) BulkSend(buf *bytes.Buffer) error {
_, err := c.DoCommand("POST", "/_bulk", nil, buf)
if err != nil {
BulkErrorCt += 1
Expand Down
44 changes: 25 additions & 19 deletions lib/corebulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func init() {
}

// take two ints, compare, need to be within 5%
func CloseInt(a, b int) bool {
func closeInt(a, b int) bool {
c := float64(a) / float64(b)
if c >= .95 && c <= 1.05 {
return true
Expand All @@ -52,13 +52,14 @@ func CloseInt(a, b int) bool {

func TestBulkIndexerBasic(t *testing.T) {
InitTests(true)
indexer := NewBulkIndexer(3)
c := NewConn()
indexer := c.NewBulkIndexer(3)
indexer.BulkSender = func(buf *bytes.Buffer) error {
messageSets += 1
totalBytesSent += buf.Len()
buffers = append(buffers, buf)
// log.Printf("buffer:%s", string(buf.Bytes()))
return BulkSend(buf)
return c.BulkSend(buf)
}
done := make(chan bool)
indexer.Run(done)
Expand All @@ -68,7 +69,7 @@ func TestBulkIndexerBasic(t *testing.T) {

err := indexer.Index("users", "user", "1", "", &date, data, true)

WaitFor(func() bool {
waitFor(func() bool {
return len(buffers) > 0
}, 5)
// part of request is url, so lets factor that in
Expand All @@ -88,21 +89,22 @@ func TestBulkIndexerBasic(t *testing.T) {

assert.T(t, BulkErrorCt == 0, fmt.Sprintf("Should not have any errors %d", BulkErrorCt))
expectedBytes = 282 // with refresh
assert.T(t, CloseInt(totalBytesSent, expectedBytes), fmt.Sprintf("Should have sent %v bytes but was %v", expectedBytes, totalBytesSent))
assert.T(t, closeInt(totalBytesSent, expectedBytes), fmt.Sprintf("Should have sent %v bytes but was %v", expectedBytes, totalBytesSent))

done <- true
}

// currently broken in drone.io
func XXXTestBulkUpdate(t *testing.T) {
InitTests(true)
api.Port = "9200"
indexer := NewBulkIndexer(3)
c := NewConn()
c.Port = "9200"
indexer := c.NewBulkIndexer(3)
indexer.BulkSender = func(buf *bytes.Buffer) error {
messageSets += 1
totalBytesSent += buf.Len()
buffers = append(buffers, buf)
return BulkSend(buf)
return c.BulkSend(buf)
}
done := make(chan bool)
indexer.Run(done)
Expand All @@ -113,7 +115,7 @@ func XXXTestBulkUpdate(t *testing.T) {
}

// Lets make sure the data is in the index ...
_, err := Index("users", "user", "5", nil, user)
_, err := c.Index("users", "user", "5", nil, user)

// script and params
data := map[string]interface{}{
Expand All @@ -126,13 +128,13 @@ func XXXTestBulkUpdate(t *testing.T) {
// indexer.Flush()
done <- true

WaitFor(func() bool {
waitFor(func() bool {
return len(buffers) > 0
}, 5)

assert.T(t, BulkErrorCt == 0 && err == nil, fmt.Sprintf("Should not have any errors, bulkErrorCt:%v, err:%v", BulkErrorCt, err))

response, err := Get("users", "user", "5", nil)
response, err := c.Get("users", "user", "5", nil)
assert.T(t, err == nil, fmt.Sprintf("Should not have any errors %v", err))
newCount := response.Source.(map[string]interface{})["count"]
assert.T(t, newCount.(float64) == 3,
Expand All @@ -141,20 +143,21 @@ func XXXTestBulkUpdate(t *testing.T) {

func TestBulkSmallBatch(t *testing.T) {
InitTests(true)
c := NewConn()

done := make(chan bool)

date := time.Unix(1257894000, 0)
data := map[string]interface{}{"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0)}

// Now tests small batches
indexersm := NewBulkIndexer(1)
indexersm := c.NewBulkIndexer(1)
indexersm.BufferDelayMax = 100 * time.Millisecond
indexersm.BulkMaxDocs = 2
messageSets = 0
indexersm.BulkSender = func(buf *bytes.Buffer) error {
messageSets += 1
return BulkSend(buf)
return c.BulkSend(buf)
}
indexersm.Run(done)
<-time.After(time.Millisecond * 20)
Expand All @@ -170,13 +173,14 @@ func TestBulkSmallBatch(t *testing.T) {
}

func XXXTestBulkErrors(t *testing.T) {
// lets set a bad port, and hope we get a connection refused error?
api.Port = "27845"
// lets set a bad port, and hope we get a conn refused error?
c := NewConn()
c.Port = "27845"
defer func() {
api.Port = "9200"
c.Port = "9200"
}()
BulkDelaySeconds = 1
indexer := NewBulkIndexerErrors(10, 1)
indexer := c.NewBulkIndexerErrors(10, 1)
done := make(chan bool)
indexer.Run(done)
errorCt := 0
Expand Down Expand Up @@ -209,14 +213,15 @@ BenchmarkBulkSend 18:33:00 bulk_test.go:131: Sent 1 messages in 0 sets totaling
*/
func BenchmarkBulkSend(b *testing.B) {
InitTests(true)
c := NewConn()
b.StartTimer()
totalBytes := 0
sets := 0
GlobalBulkIndexer.BulkSender = func(buf *bytes.Buffer) error {
totalBytes += buf.Len()
sets += 1
//log.Println("got bulk")
return BulkSend(buf)
return c.BulkSend(buf)
}
for i := 0; i < b.N; i++ {
about := make([]byte, 1000)
Expand All @@ -241,6 +246,7 @@ BenchmarkBulkSendBytes 18:33:05 bulk_test.go:169: Sent 1 messages in 0 sets tota
*/
func BenchmarkBulkSendBytes(b *testing.B) {
InitTests(true)
c := NewConn()
about := make([]byte, 1000)
rand.Read(about)
data := map[string]interface{}{"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0), "about": about}
Expand All @@ -251,7 +257,7 @@ func BenchmarkBulkSendBytes(b *testing.B) {
GlobalBulkIndexer.BulkSender = func(buf *bytes.Buffer) error {
totalBytes += buf.Len()
sets += 1
return BulkSend(buf)
return c.BulkSend(buf)
}
for i := 0; i < b.N; i++ {
IndexBulk("users", "user", strconv.Itoa(i), nil, body, true)
Expand Down
2 changes: 1 addition & 1 deletion lib/corecount.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type CountResponse struct {
// The query can either be provided using a simple query string as a parameter,
// or using the Query DSL defined within the request body.
// http://www.elasticsearch.org/guide/reference/api/count.html
func (c *Connection) Count(index string, _type string, args map[string]interface{}) (CountResponse, error) {
func (c *Conn) Count(index string, _type string, args map[string]interface{}) (CountResponse, error) {
var url string
var retval CountResponse
url = fmt.Sprintf("/%s/%s/_count", index, _type)
Expand Down
2 changes: 1 addition & 1 deletion lib/coredelete.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// Delete API allows to delete a typed JSON document from a specific index based on its id.
// http://www.elasticsearch.org/guide/reference/api/delete.html
func (c *Connection) Delete(index string, _type string, id string, args map[string]interface{}) (BaseResponse, error) {
func (c *Conn) Delete(index string, _type string, id string, args map[string]interface{}) (BaseResponse, error) {
var url string
var retval BaseResponse
url = fmt.Sprintf("/%s/%s/%s", index, _type, id)
Expand Down
2 changes: 1 addition & 1 deletion lib/coredeletebyquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// The query can either be provided using a simple query string as a parameter, or using the Query DSL defined within
// the request body.
// see: http://www.elasticsearch.org/guide/reference/api/delete-by-query.html
func (c *Connection) DeleteByQuery(indices []string, types []string, args map[string]interface{}, query interface{}) (BaseResponse, error) {
func (c *Conn) DeleteByQuery(indices []string, types []string, args map[string]interface{}, query interface{}) (BaseResponse, error) {
var url string
var retval BaseResponse
if len(indices) > 0 && len(types) > 0 {
Expand Down
Loading

0 comments on commit 495f810

Please sign in to comment.