Skip to content

Commit

Permalink
fix: fix the bytes encode/decode for redis cache (ContentSquare#153)
Browse files Browse the repository at this point in the history
* fix: fix the bytes encode/decode for redis cache

Using `string(data)` to convert the byte array to string introduces error in json marshal/unmarshal,
hence causes error when returning cached response from redis.

The reason is `Unmarshal` function in
`encode/json` would replace invalid UTF-8 or invalid UTF-16 pairs with `U+FFFD`, therefore the
`payload` string in `redisCachePayload` will actually change after json marshal/unmarshal since the
byte array may contain invalid UTF-8/UTF-16 byte, the length of payload will thereby change,
resulting the http server to find the declared length in header `Content-Length` mismatches the
actual length of payload.

The fix is to base64-encode/decode the byte array to string, thereby
eliminates invalid UTF-8/UTF-16 bytes.

* fix: add test case about encode/decode the cached value

add test cases for base64 encode/decode the cached value

* fix: adjust the waiting time of `queue_overflow_for_user` case to pass ci

minimize the waiting time between two consecutive requests
  • Loading branch information
wangxinalex authored Apr 3, 2022
1 parent 5b23001 commit 6cfac12
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
/docs/.output
/docs/.nuxt
/docs/static/sw.js
.idea
11 changes: 9 additions & 2 deletions cache/redis_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"github.com/contentsquare/chproxy/config"
"github.com/contentsquare/chproxy/log"
Expand Down Expand Up @@ -118,13 +119,18 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) {
log.Errorf("Not able to fetch TTL for: %s ", key)
}

decoded, err := base64.StdEncoding.DecodeString(payload.Payload)
if err != nil {
log.Errorf("failed to decode payload: %s , due to: %v ", payload.Payload, err)
return nil, ErrMissing
}
value := &CachedData{
ContentMetadata: ContentMetadata{
Length: payload.Length,
Type: payload.Type,
Encoding: payload.Encoding,
},
Data: bytes.NewReader([]byte(payload.Payload)),
Data: bytes.NewReader(decoded),
Ttl: ttl,
}

Expand All @@ -137,8 +143,9 @@ func (r *redisCache) Put(reader io.Reader, contentMetadata ContentMetadata, key
return 0, err
}

encoded := base64.StdEncoding.EncodeToString(data)
payload := &redisCachePayload{
Length: contentMetadata.Length, Type: contentMetadata.Type, Encoding: contentMetadata.Encoding, Payload: string(data),
Length: contentMetadata.Length, Type: contentMetadata.Type, Encoding: contentMetadata.Encoding, Payload: encoded,
}

marshalled, err := json.Marshal(payload)
Expand Down
62 changes: 60 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"compress/gzip"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/contentsquare/chproxy/cache"
"io"
Expand All @@ -19,9 +21,9 @@ import (
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/contentsquare/chproxy/config"
"github.com/contentsquare/chproxy/log"
"github.com/alicebob/miniredis/v2"
)

var testDir = "./temp-test-data"
Expand Down Expand Up @@ -365,7 +367,7 @@ func TestServe(t *testing.T) {
str, err := redisClient.Get(key.String())
checkErr(t, err)

if !strings.Contains(str, "Ok") || !strings.Contains(str, "text/plain") || !strings.Contains(str, "charset=utf-8") {
if !strings.Contains(str, base64.StdEncoding.EncodeToString([]byte("Ok."))) || !strings.Contains(str, "text/plain") || !strings.Contains(str, "charset=utf-8") {
t.Fatalf("result from cache query is wrong: %s", str)
}

Expand All @@ -376,6 +378,57 @@ func TestServe(t *testing.T) {
},
startHTTP,
},
{
"http requests with caching in redis (testcase for base64 encoding/decoding)",
"testdata/http.cache.redis.yml",
func(t *testing.T) {
redisClient.FlushAll()
q := "SELECT 1 FORMAT TabSeparatedWithNamesAndTypes"
req, err := http.NewRequest("GET", "http://127.0.0.1:9090?query="+url.QueryEscape(q), nil)
checkErr(t, err)

resp := httpRequest(t, req, http.StatusOK)
checkHttpResponse(t, resp, string(bytesWithInvalidUTFPairs))
resp2 := httpRequest(t, req, http.StatusOK)
// if we do not use base64 to encode/decode the cached payload, EOF error will be thrown here.
checkHttpResponse(t, resp2, string(bytesWithInvalidUTFPairs))
keys := redisClient.Keys()
if len(keys) != 1 {
t.Fatalf("unexpected amount of keys in redis: %v", len(keys))
}

// check cached response
key := &cache.Key{
Query: []byte(q),
AcceptEncoding: "gzip",
Version: cache.Version,
}
str, err := redisClient.Get(key.String())
checkErr(t, err)

type redisCachePayload struct {
Length int64 `json:"l"`
Type string `json:"t"`
Encoding string `json:"enc"`
Payload string `json:"payload"`
}

var unMarshaledPayload redisCachePayload
err = json.Unmarshal([]byte(str), &unMarshaledPayload)
checkErr(t, err)
if unMarshaledPayload.Payload != base64.StdEncoding.EncodeToString(bytesWithInvalidUTFPairs) {
t.Fatalf("result from cache query is wrong: %s", str)
}
decoded, err := base64.StdEncoding.DecodeString(unMarshaledPayload.Payload)
checkErr(t, err)

if unMarshaledPayload.Length != int64(len(decoded)) {
t.Fatalf("the declared length %d and actual length %d is not same", unMarshaledPayload.Length, len(decoded))
}
},
startHTTP,
},

{
"http gzipped POST request",
"testdata/http.cache.yml",
Expand Down Expand Up @@ -706,6 +759,9 @@ func fakeCHHandler(w http.ResponseWriter, r *http.Request) {
fakeCHState.sleep()

fmt.Fprint(w, "bar")
case "SELECT 1 FORMAT TabSeparatedWithNamesAndTypes":
w.WriteHeader(http.StatusOK)
w.Write(bytesWithInvalidUTFPairs)
default:
if strings.Contains(string(query), killQueryPattern) {
fakeCHState.kill()
Expand All @@ -715,6 +771,8 @@ func fakeCHHandler(w http.ResponseWriter, r *http.Request) {
}
}

var bytesWithInvalidUTFPairs = []byte{239, 191, 189, 1, 32, 50, 239, 191}

var fakeCHState = &stateCH{
syncCH: make(chan struct{}),
}
Expand Down
4 changes: 2 additions & 2 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
p.users["default"].maxConcurrentQueries = 1
p.users["default"].queueCh = make(chan struct{}, 1)
go makeHeavyRequest(p, time.Millisecond*20)
time.Sleep(time.Millisecond * 5)
time.Sleep(time.Millisecond * 1) // in case ci runner is slow
go makeHeavyRequest(p, time.Millisecond*20)
time.Sleep(time.Millisecond * 5)
time.Sleep(time.Millisecond * 1)
return makeHeavyRequest(p, time.Millisecond*20)
},
},
Expand Down

0 comments on commit 6cfac12

Please sign in to comment.