Skip to content

Commit

Permalink
Add optional gzip compression setting
Browse files Browse the repository at this point in the history
Use `elastic.SetGzip(true)` to enable gzip compression of HTTP
request/response with Elasticsearch. Notice that you need to enable
gzip compression in Elasticsearch first by adding
`http.compression: true` to `elasticsearch.yml` (see
[docs](https://www.elastic.co/guide/en/elasticsearch/reference/2.0/modules-http.html)).
  • Loading branch information
olivere committed Nov 20, 2015
2 parents dd8a42e + d0d9c3e commit c3762bc
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 14 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#
# Please keep this list sorted.

Adam Weiner [@adamweiner](https://github.com/adamweiner)
Alexey Sharov [@nizsheanez](https://github.com/nizsheanez)
Braden Bassingthwaite [@bbassingthwaite-va](https://github.com/bbassingthwaite-va)
Conrad Pankoff [@deoxxa](https://github.com/deoxxa)
Expand Down
25 changes: 16 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

const (
// Version is the current version of Elastic.
Version = "3.0.5"
Version = "3.0.6"

// DefaultUrl is the default endpoint of Elasticsearch on the local machine.
// It is used e.g. when initializing a new Client without a specific URL.
Expand Down Expand Up @@ -76,6 +76,9 @@ const (
// DefaultSendGetBodyAs is the HTTP method to use when elastic is sending
// a GET request with a body.
DefaultSendGetBodyAs = "GET"

// DefaultGzipEnabled specifies if gzip compression is enabled by default.
DefaultGzipEnabled = false
)

var (
Expand Down Expand Up @@ -127,6 +130,7 @@ type Client struct {
basicAuthPassword string // password for HTTP Basic Auth
sendGetBodyAs string // override for when sending a GET with a body
requiredPlugins []string // list of required plugins
gzipEnabled bool // gzip compression enabled or disabled (default)
}

// NewClient creates a new client to work with Elasticsearch.
Expand Down Expand Up @@ -193,6 +197,7 @@ func NewClient(options ...ClientOptionFunc) (*Client, error) {
snifferInterval: DefaultSnifferInterval,
snifferStop: make(chan bool),
sendGetBodyAs: DefaultSendGetBodyAs,
gzipEnabled: DefaultGzipEnabled,
}

// Run the options on it
Expand Down Expand Up @@ -399,6 +404,14 @@ func SetMaxRetries(maxRetries int) func(*Client) error {
}
}

// SetGzip enables or disables gzip compression (disabled by default).
func SetGzip(enabled bool) ClientOptionFunc {
return func(c *Client) error {
c.gzipEnabled = enabled
return nil
}
}

// SetDecoder sets the Decoder to use when decoding data from Elasticsearch.
// DefaultDecoder is used by default.
func SetDecoder(decoder Decoder) func(*Client) error {
Expand Down Expand Up @@ -929,6 +942,7 @@ func (c *Client) PerformRequest(method, path string, params url.Values, body int
basicAuthUsername := c.basicAuthUsername
basicAuthPassword := c.basicAuthPassword
sendGetBodyAs := c.sendGetBodyAs
gzipEnabled := c.gzipEnabled
c.mu.RUnlock()

var err error
Expand Down Expand Up @@ -985,14 +999,7 @@ func (c *Client) PerformRequest(method, path string, params url.Values, body int

// Set body
if body != nil {
switch b := body.(type) {
case string:
req.SetBodyString(b)
break
default:
req.SetBodyJson(body)
break
}
req.SetBody(body, gzipEnabled)
}

// Tracing
Expand Down
70 changes: 65 additions & 5 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package elastic

import (
"bytes"
"compress/gzip"
"encoding/json"
"io"
"io/ioutil"
Expand All @@ -17,6 +18,7 @@ import (
// Elasticsearch-specific HTTP request
type Request http.Request

// NewRequest is a http.Request and adds features such as encoding the body.
func NewRequest(method, url string) (*Request, error) {
req, err := http.NewRequest(method, url, nil)
if err != nil {
Expand All @@ -27,25 +29,83 @@ func NewRequest(method, url string) (*Request, error) {
return (*Request)(req), nil
}

// SetBasicAuth wraps http.Request's SetBasicAuth.
func (r *Request) SetBasicAuth(username, password string) {
((*http.Request)(r)).SetBasicAuth(username, password)
}

func (r *Request) SetBodyJson(data interface{}) error {
// SetBody encodes the body in the request. Optionally, it performs GZIP compression.
func (r *Request) SetBody(body interface{}, gzipCompress bool) error {
switch b := body.(type) {
case string:
if gzipCompress {
return r.setBodyGzip(b)
} else {
return r.setBodyString(b)
}
default:
if gzipCompress {
return r.setBodyGzip(body)
} else {
return r.setBodyJson(body)
}
}
}

// setBodyJson encodes the body as a struct to be marshaled via json.Marshal.
func (r *Request) setBodyJson(data interface{}) error {
body, err := json.Marshal(data)
if err != nil {
return err
}
r.SetBody(bytes.NewReader(body))
r.Header.Set("Content-Type", "application/json")
r.setBodyReader(bytes.NewReader(body))
return nil
}

func (r *Request) SetBodyString(body string) error {
return r.SetBody(strings.NewReader(body))
// setBodyString encodes the body as a string.
func (r *Request) setBodyString(body string) error {
return r.setBodyReader(strings.NewReader(body))
}

// setBodyGzip gzip's the body. It accepts both strings and structs as body.
// The latter will be encoded via json.Marshal.
func (r *Request) setBodyGzip(body interface{}) error {
switch b := body.(type) {
case string:
buf := new(bytes.Buffer)
w := gzip.NewWriter(buf)
if _, err := w.Write([]byte(b)); err != nil {
return err
}
if err := w.Close(); err != nil {
return err
}
r.Header.Add("Content-Encoding", "gzip")
r.Header.Add("Vary", "Accept-Encoding")
return r.setBodyReader(bytes.NewReader(buf.Bytes()))
default:
data, err := json.Marshal(b)
if err != nil {
return err
}
buf := new(bytes.Buffer)
w := gzip.NewWriter(buf)
if _, err := w.Write(data); err != nil {
return err
}
if err := w.Close(); err != nil {
return err
}
r.Header.Add("Content-Encoding", "gzip")
r.Header.Add("Vary", "Accept-Encoding")
r.Header.Set("Content-Type", "application/json")
return r.setBodyReader(bytes.NewReader(buf.Bytes()))
}
}

func (r *Request) SetBody(body io.Reader) error {
// setBodyReader writes the body from an io.Reader.
func (r *Request) setBodyReader(body io.Reader) error {
rc, ok := body.(io.ReadCloser)
if !ok && body != nil {
rc = ioutil.NopCloser(body)
Expand Down

0 comments on commit c3762bc

Please sign in to comment.