Skip to content

Commit

Permalink
grpc: port messagesize interceptors and raise default client message …
Browse files Browse the repository at this point in the history
…size to 90mb (sourcegraph#640)
  • Loading branch information
ggilmore authored Aug 22, 2023
1 parent 993cfdb commit f75df3d
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 33 deletions.
66 changes: 44 additions & 22 deletions cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
sglog "github.com/sourcegraph/log"
"github.com/sourcegraph/zoekt/grpc/internalerrs"
"github.com/sourcegraph/zoekt/grpc/messagesize"
"go.uber.org/automaxprocs/maxprocs"
"golang.org/x/net/trace"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -1372,33 +1373,12 @@ func newServer(conf rootConfig) (*Server, error) {
}

logger := sglog.Scoped("zoektConfigurationGRPCClient", "")

gRPCConnectionOptions := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainStreamInterceptor(
internalActorStreamInterceptor(),
internalerrs.LoggingStreamClientInterceptor(logger),
internalerrs.PrometheusStreamClientInterceptor,
),
grpc.WithChainUnaryInterceptor(
internalActorUnaryInterceptor(),
internalerrs.LoggingUnaryClientInterceptor(logger),
internalerrs.PrometheusUnaryClientInterceptor,
),
grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
}

// This dialer is used to connect via gRPC to the Sourcegraph instance.
// This is done lazily, so we can provide the client to use regardless of
// whether we enabled gRPC or not initially.
cc, err := grpc.Dial(rootURL.Host, gRPCConnectionOptions...)
client, err := dialGRPCClient(rootURL.Host, logger)
if err != nil {
return nil, fmt.Errorf("initializing gRPC connection to %q: %w", rootURL.Host, err)
}

client := proto.NewZoektConfigurationServiceClient(cc)
opts = append(opts, WithGRPCClient(client))

sg = newSourcegraphClient(rootURL, conf.hostname, opts...)

} else {
Expand Down Expand Up @@ -1472,6 +1452,48 @@ func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
}
}

// defaultGRPCMessageReceiveSizeBytes is the default message size that gRPCs servers and clients are allowed to process.
// This can be overridden by providing custom Server/Dial options.
const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB

func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainStreamInterceptor(
internalActorStreamInterceptor(),
internalerrs.LoggingStreamClientInterceptor(logger),
internalerrs.PrometheusStreamClientInterceptor,
),
grpc.WithChainUnaryInterceptor(
internalActorUnaryInterceptor(),
internalerrs.LoggingUnaryClientInterceptor(logger),
internalerrs.PrometheusUnaryClientInterceptor,
),
grpc.WithDefaultServiceConfig(defaultGRPCServiceConfigurationJSON),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultGRPCMessageReceiveSizeBytes)),
}

opts = append(opts, additionalOpts...)

// Ensure that the message size options are set last, so they override any other
// client-specific options that tweak the message size.
//
// The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
// take precedence over everything else with a uniform size setting that's easy to reason about.
opts = append(opts, messagesize.MustGetClientMessageSizeFromEnv()...)

// This dialer is used to connect via gRPC to the Sourcegraph instance.
// This is done lazily, so we can provide the client to use regardless of
// whether we enabled gRPC or not initially.
cc, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, fmt.Errorf("dialing %q: %w", addr, err)
}

client := proto.NewZoektConfigurationServiceClient(cc)
return client, nil
}

// addDefaultPort adds a default port to a URL if one is not specified.
//
// If the URL scheme is "http" and no port is specified, "80" is used.
Expand Down
41 changes: 30 additions & 11 deletions cmd/zoekt-webserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/sourcegraph/mountinfo"
"github.com/sourcegraph/zoekt/grpc/internalerrs"
"github.com/sourcegraph/zoekt/grpc/messagesize"
zoektgrpc "github.com/sourcegraph/zoekt/grpc/server"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/net/http2"
Expand Down Expand Up @@ -291,17 +292,8 @@ func main() {

logger := sglog.Scoped("ZoektWebserverGRPCServer", "The Zoekt Webserver GRPC Server")

grpcServer := grpc.NewServer(
grpc.ChainStreamInterceptor(
otelgrpc.StreamServerInterceptor(),
internalerrs.LoggingStreamServerInterceptor(logger),
),
grpc.ChainUnaryInterceptor(
otelgrpc.UnaryServerInterceptor(),
internalerrs.LoggingUnaryServerInterceptor(logger),
),
)
v1.RegisterWebserverServiceServer(grpcServer, zoektgrpc.NewServer(web.NewTraceAwareSearcher(s.Searcher)))
streamer := web.NewTraceAwareSearcher(s.Searcher)
grpcServer := newGRPCServer(logger, streamer)

handler = multiplexGRPC(grpcServer, handler)

Expand Down Expand Up @@ -636,6 +628,33 @@ func traceContext(ctx context.Context) sglog.TraceContext {
return sglog.TraceContext{}
}

func newGRPCServer(logger sglog.Logger, streamer zoekt.Streamer, additionalOpts ...grpc.ServerOption) *grpc.Server {
opts := []grpc.ServerOption{
grpc.ChainStreamInterceptor(
otelgrpc.StreamServerInterceptor(),
internalerrs.LoggingStreamServerInterceptor(logger),
),
grpc.ChainUnaryInterceptor(
otelgrpc.UnaryServerInterceptor(),
internalerrs.LoggingUnaryServerInterceptor(logger),
),
}

opts = append(opts, additionalOpts...)

// Ensure that the message size options are set last, so they override any other
// server-specific options that tweak the message size.
//
// The message size options are only provided if the environment variable is set. These options serve as an escape hatch, so they
// take precedence over everything else with a uniform size setting that's easy to reason about.
opts = append(opts, messagesize.MustGetServerMessageSizeFromEnv()...)

s := grpc.NewServer(opts...)
v1.RegisterWebserverServiceServer(s, zoektgrpc.NewServer(streamer))

return s
}

var (
metricWatchdogErrors = promauto.NewGauge(prometheus.GaugeOpts{
Name: "zoekt_webserver_watchdog_errors",
Expand Down
127 changes: 127 additions & 0 deletions grpc/messagesize/messagesize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package messagesize

import (
"fmt"
"math"
"os"

"google.golang.org/grpc"

"github.com/dustin/go-humanize"
)

var (
smallestAllowedMaxMessageSize = uint64(4 * 1024 * 1024) // 4 MB: There isn't a scenario where we'd want to dip below the default of 4MB.
largestAllowedMaxMessageSize = uint64(math.MaxInt) // This is the largest allowed value for the type accepted by the grpc.MaxSize[...] options.

envClientMessageSize = getEnv("GRPC_CLIENT_MAX_MESSAGE_SIZE", messageSizeDisabled) // set the maximum message size for gRPC clients (ex: "40MB")
envServerMessageSize = getEnv("GRPC_SERVER_MAX_MESSAGE_SIZE", messageSizeDisabled) // set the maximum message size for gRPC servers (ex: "40MB")

messageSizeDisabled = "message_size_disabled" // sentinel value for when the message size env var isn't set
)

// MustGetClientMessageSizeFromEnv returns a slice of grpc.DialOptions that set the maximum message size for gRPC clients if
// the "SRC_GRPC_CLIENT_MAX_MESSAGE_SIZE" environment variable is set to a valid size value (ex: "40 MB").
//
// If the environment variable isn't set, it returns nil.
// If the size value in the environment variable is invalid (too small, not parsable, etc.), it panics.
func MustGetClientMessageSizeFromEnv() []grpc.DialOption {
if envClientMessageSize == messageSizeDisabled {
return nil
}

messageSize, err := getMessageSizeBytesFromString(envClientMessageSize, smallestAllowedMaxMessageSize, largestAllowedMaxMessageSize)
if err != nil {
panic(fmt.Sprintf("failed to get gRPC client message size: %s", err))
}

return []grpc.DialOption{
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(messageSize),
grpc.MaxCallSendMsgSize(messageSize),
),
}
}

// MustGetServerMessageSizeFromEnv returns a slice of grpc.ServerOption that set the maximum message size for gRPC servers if
// the "SRC_GRPC_SERVER_MAX_MESSAGE_SIZE" environment variable is set to a valid size value (ex: "40 MB").
//
// If the environment variable isn't set, it returns nil.
// If the size value in the environment variable is invalid (too small, not parsable, etc.), it panics.
func MustGetServerMessageSizeFromEnv() []grpc.ServerOption {
if envServerMessageSize == messageSizeDisabled {
return nil
}

messageSize, err := getMessageSizeBytesFromString(envServerMessageSize, smallestAllowedMaxMessageSize, largestAllowedMaxMessageSize)
if err != nil {
panic(fmt.Sprintf("failed to get gRPC server message size: %s", err))
}

return []grpc.ServerOption{
grpc.MaxRecvMsgSize(messageSize),
grpc.MaxSendMsgSize(messageSize),
}
}

// getMessageSizeBytesFromEnv parses rawSize returns the message size in bytes within the range [minSize, maxSize].
//
// If rawSize isn't a valid size is not set or the value is outside the allowed range, it returns an error.
func getMessageSizeBytesFromString(rawSize string, minSize, maxSize uint64) (size int, err error) {
sizeBytes, err := humanize.ParseBytes(rawSize)
if err != nil {
return 0, &parseError{
rawSize: rawSize,
err: err,
}
}

if sizeBytes < minSize || sizeBytes > maxSize {
return 0, &sizeOutOfRangeError{
size: humanize.IBytes(sizeBytes),
min: humanize.IBytes(minSize),
max: humanize.IBytes(maxSize),
}
}

return int(sizeBytes), nil
}

// parseError occurs when the environment variable's value cannot be parsed as a byte size.
type parseError struct {
// rawSize is the raw size string that was attempted to be parsed
rawSize string
// err is the error that occurred while parsing rawSize
err error
}

func (e *parseError) Error() string {
return fmt.Sprintf("failed to parse %q as bytes: %s", e.rawSize, e.err)
}

func (e *parseError) Unwrap() error {
return e.err
}

// sizeOutOfRangeError occurs when the environment variable's value is outside of the allowed range.
type sizeOutOfRangeError struct {
// size is the size that was out of range
size string
// min is the minimum allowed size
min string
// max is the maximum allowed size
max string
}

func (e *sizeOutOfRangeError) Error() string {
return fmt.Sprintf("size %s is outside of allowed range [%s, %s]", e.size, e.min, e.max)
}

func getEnv(key string, defaultValue string) string {
value, ok := os.LookupEnv(key)
if !ok {
return defaultValue
}

return value
}
97 changes: 97 additions & 0 deletions grpc/messagesize/messagesize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package messagesize

import (
"errors"
"math"
"testing"

"github.com/google/go-cmp/cmp"
)

func TestGetMessageSizeBytesFromString(t *testing.T) {

t.Run("8 MB", func(t *testing.T) {
sizeString := "8MB"

size, err := getMessageSizeBytesFromString(sizeString, 0, math.MaxInt)

if err != nil {
t.Fatalf("unexpected error: %s", err)
}

expectedSize := 8 * 1000 * 1000
if diff := cmp.Diff(expectedSize, size); diff != "" {
t.Fatalf("unexpected size (-want +got):\n%s", diff)
}
})

t.Run("just small enough", func(t *testing.T) {
sizeString := "4MB" // inside large-end of range

fourMegaBytes := 4 * 1000 * 1000
size, err := getMessageSizeBytesFromString(sizeString, 0, uint64(fourMegaBytes))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

if diff := cmp.Diff(fourMegaBytes, size); diff != "" {
t.Fatalf("unexpected size (-want +got):\n%s", diff)
}
})

t.Run("just large enough", func(t *testing.T) {
sizeString := "4MB" // inside low-end of range

fourMegaBytes := 4 * 1000 * 1000
size, err := getMessageSizeBytesFromString(sizeString, uint64(fourMegaBytes), math.MaxInt)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

if diff := cmp.Diff(fourMegaBytes, size); diff != "" {
t.Fatalf("unexpected size (-want +got):\n%s", diff)
}
})

t.Run("invalid size", func(t *testing.T) {
sizeString := "this-is-not-a-size"

_, err := getMessageSizeBytesFromString(sizeString, 0, math.MaxInt)
var expectedErr *parseError
if !errors.As(err, &expectedErr) {
t.Fatalf("expected parseError, got error %q", err)
}
})

t.Run("empty", func(t *testing.T) {
sizeString := ""

_, err := getMessageSizeBytesFromString(sizeString, 0, math.MaxInt)
var expectedErr *parseError
if !errors.As(err, &expectedErr) {
t.Fatalf("expected parseError, got error %q", err)
}
})

t.Run("too large", func(t *testing.T) {
sizeString := "4MB" // above range

twoMegaBytes := 2 * 1024 * 1024
_, err := getMessageSizeBytesFromString(sizeString, 0, uint64(twoMegaBytes))
var expectedErr *sizeOutOfRangeError
if !errors.As(err, &expectedErr) {
t.Fatalf("expected sizeOutOfRangeError, got error %q", err)
}
})

t.Run("too small", func(t *testing.T) {
sizeString := "1MB" // below range

twoMegaBytes := 2 * 1024 * 1024
_, err := getMessageSizeBytesFromString(sizeString, uint64(twoMegaBytes), math.MaxInt)
var expectedErr *sizeOutOfRangeError
if !errors.As(err, &expectedErr) {
t.Fatalf("expected sizeOutOfRangeError, got error %q", err)
}
})
}

0 comments on commit f75df3d

Please sign in to comment.