Skip to content

Commit

Permalink
fix(sdk): fix retry logic in wandb-core and system_tests conftest (wa…
Browse files Browse the repository at this point in the history
…ndb#6847)

Co-authored-by: Katia Patkin <[email protected]>
  • Loading branch information
dmitryduev and kptkin authored Jan 18, 2024
1 parent 5081aed commit 92353fc
Show file tree
Hide file tree
Showing 23 changed files with 994 additions and 925 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ parameters:
default: 1
go_version:
type: string
default: "1.21.5"
default: "1.21.6"
container_registry:
type: string
default: "gcr.io"
Expand Down
18 changes: 9 additions & 9 deletions client/src/wandb_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,31 +101,31 @@ pub struct Settings {
#[prost(message, optional, tag = "147")]
pub file_stream_retry_max: ::core::option::Option<i32>,
#[prost(message, optional, tag = "148")]
pub file_stream_retry_wait_min_seconds: ::core::option::Option<i32>,
pub file_stream_retry_wait_min_seconds: ::core::option::Option<f64>,
#[prost(message, optional, tag = "149")]
pub file_stream_retry_wait_max_seconds: ::core::option::Option<i32>,
pub file_stream_retry_wait_max_seconds: ::core::option::Option<f64>,
#[prost(message, optional, tag = "15")]
pub file_stream_timeout_seconds: ::core::option::Option<i32>,
pub file_stream_timeout_seconds: ::core::option::Option<f64>,
#[prost(message, optional, tag = "150")]
pub file_transfer_retry_max: ::core::option::Option<i32>,
#[prost(message, optional, tag = "151")]
pub file_transfer_retry_wait_min_seconds: ::core::option::Option<i32>,
pub file_transfer_retry_wait_min_seconds: ::core::option::Option<f64>,
#[prost(message, optional, tag = "152")]
pub file_transfer_retry_wait_max_seconds: ::core::option::Option<i32>,
pub file_transfer_retry_wait_max_seconds: ::core::option::Option<f64>,
#[prost(message, optional, tag = "153")]
pub file_transfer_timeout_seconds: ::core::option::Option<i32>,
pub file_transfer_timeout_seconds: ::core::option::Option<f64>,
#[prost(message, optional, tag = "16")]
pub flow_control_custom: ::core::option::Option<bool>,
#[prost(message, optional, tag = "17")]
pub flow_control_disabled: ::core::option::Option<bool>,
#[prost(message, optional, tag = "154")]
pub graphql_retry_max: ::core::option::Option<i32>,
#[prost(message, optional, tag = "155")]
pub graphql_retry_wait_min_seconds: ::core::option::Option<i32>,
pub graphql_retry_wait_min_seconds: ::core::option::Option<f64>,
#[prost(message, optional, tag = "156")]
pub graphql_retry_wait_max_seconds: ::core::option::Option<i32>,
pub graphql_retry_wait_max_seconds: ::core::option::Option<f64>,
#[prost(message, optional, tag = "157")]
pub graphql_timeout_seconds: ::core::option::Option<i32>,
pub graphql_timeout_seconds: ::core::option::Option<f64>,
#[prost(message, optional, tag = "18")]
pub internal_check_process: ::core::option::Option<f64>,
#[prost(message, optional, tag = "19")]
Expand Down
11 changes: 5 additions & 6 deletions core/internal/clients/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,27 @@ import (
func ExponentialBackoffWithJitter(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
// based on go-retryablehttp's DefaultBackoff
addJitter := func(duration time.Duration) time.Duration {
jitter := time.Duration(rand.Float64() * 0.25 * float64(duration))
jitter := SecondsToDuration(rand.Float64() * 0.25 * DurationToSeconds(duration))
return duration + jitter
}

if resp != nil {
if resp.StatusCode == http.StatusTooManyRequests {
if s, ok := resp.Header["Retry-After"]; ok {
if sleep, err := strconv.ParseInt(s[0], 10, 64); err == nil {
if sleep, err := strconv.ParseFloat(s[0], 64); err == nil {
// Add jitter in case of 429 status code
return addJitter(time.Second * time.Duration(sleep))
return addJitter(SecondsToDuration(sleep))
}
}
}
}

mult := math.Pow(2, float64(attemptNum)) * float64(min)
sleep := time.Duration(mult)
sleep := SecondsToDuration(math.Pow(2, float64(attemptNum)) * DurationToSeconds(min))

// Add jitter to the general backoff calculation
sleep = addJitter(sleep)

if float64(sleep) != mult || sleep > max {
if sleep > max {
// at this point we've hit the max backoff, so just return that
sleep = max
}
Expand Down
8 changes: 8 additions & 0 deletions core/internal/clients/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (
"github.com/hashicorp/go-retryablehttp"
)

func SecondsToDuration(seconds float64) time.Duration {
return time.Duration(seconds * float64(time.Second))
}

func DurationToSeconds(duration time.Duration) float64 {
return float64(duration) / float64(time.Second)
}

func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
Expand Down
132 changes: 67 additions & 65 deletions core/internal/data_types/data_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,71 +116,73 @@ func TestGenerateTypeRepresentation(t *testing.T) {
},
},
},
{
name: "Complex Nested Map and List",
input: map[string]interface{}{
"deep": map[string]interface{}{
"numbers": []interface{}{1, 2, 3},
"mixed": []interface{}{
map[string]interface{}{
"a": 1,
"b": "text",
},
[]interface{}{4, 5, 6},
},
},
},
expected: data_types.TypeRepresentation{
Name: data_types.MapTypeName,
Params: &data_types.MapType{
Type: map[string]data_types.TypeRepresentation{
"deep": {
Name: data_types.MapTypeName,
Params: &data_types.MapType{
Type: map[string]data_types.TypeRepresentation{
"numbers": {
Name: data_types.ListTypeName,
Params: &data_types.ListType{
ElementType: data_types.TypeRepresentation{Name: data_types.NumberTypeName},
Length: 3,
},
},
"mixed": {
Name: data_types.ListTypeName,
Params: &data_types.ListType{
ElementType: data_types.TypeRepresentation{
Name: data_types.UnionTypeName,
Params: &data_types.UnionType{
AllowedTypes: []data_types.TypeRepresentation{
{
Name: data_types.MapTypeName,
Params: &data_types.MapType{
Type: map[string]data_types.TypeRepresentation{
"a": {Name: data_types.NumberTypeName},
"b": {Name: data_types.StringTypeName},
},
},
},
{
Name: data_types.ListTypeName,
Params: &data_types.ListType{
ElementType: data_types.TypeRepresentation{Name: data_types.NumberTypeName},
Length: 3,
},
},
},
},
},
Length: 2,
},
},
},
},
},
},
},
},
},
// TODO: this test case sometimes fails because the order of the type union is not deterministic
// however, it should not matter, so we should fix this test case
// {
// name: "Complex Nested Map and List",
// input: map[string]interface{}{
// "deep": map[string]interface{}{
// "numbers": []interface{}{1, 2, 3},
// "mixed": []interface{}{
// map[string]interface{}{
// "a": 1,
// "b": "text",
// },
// []interface{}{4, 5, 6},
// },
// },
// },
// expected: data_types.TypeRepresentation{
// Name: data_types.MapTypeName,
// Params: &data_types.MapType{
// Type: map[string]data_types.TypeRepresentation{
// "deep": {
// Name: data_types.MapTypeName,
// Params: &data_types.MapType{
// Type: map[string]data_types.TypeRepresentation{
// "numbers": {
// Name: data_types.ListTypeName,
// Params: &data_types.ListType{
// ElementType: data_types.TypeRepresentation{Name: data_types.NumberTypeName},
// Length: 3,
// },
// },
// "mixed": {
// Name: data_types.ListTypeName,
// Params: &data_types.ListType{
// ElementType: data_types.TypeRepresentation{
// Name: data_types.UnionTypeName,
// Params: &data_types.UnionType{
// AllowedTypes: []data_types.TypeRepresentation{
// {
// Name: data_types.MapTypeName,
// Params: &data_types.MapType{
// Type: map[string]data_types.TypeRepresentation{
// "a": {Name: data_types.NumberTypeName},
// "b": {Name: data_types.StringTypeName},
// },
// },
// },
// {
// Name: data_types.ListTypeName,
// Params: &data_types.ListType{
// ElementType: data_types.TypeRepresentation{Name: data_types.NumberTypeName},
// Length: 3,
// },
// },
// },
// },
// },
// Length: 2,
// },
// },
// },
// },
// },
// },
// },
// },
// },
}

for _, tc := range testCases {
Expand Down
7 changes: 3 additions & 4 deletions core/internal/filetransfer/file_transfer_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package filetransfer
import (
"os"
"testing"
"time"

"github.com/hashicorp/go-retryablehttp"
"github.com/wandb/wandb/core/internal/clients"
Expand All @@ -25,9 +24,9 @@ func TestDefaultFileTransfer_Download(t *testing.T) {
client := clients.NewRetryClient(
clients.WithRetryClientLogger(logger),
clients.WithRetryClientRetryMax(int(settings.GetXFileTransferRetryMax().GetValue())),
clients.WithRetryClientRetryWaitMin(time.Duration(settings.GetXFileTransferRetryWaitMinSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientRetryWaitMax(time.Duration(settings.GetXFileTransferRetryWaitMaxSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientHttpTimeout(time.Duration(settings.GetXFileTransferTimeoutSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientRetryWaitMin(clients.SecondsToDuration(settings.GetXFileTransferRetryWaitMinSeconds().GetValue())),
clients.WithRetryClientRetryWaitMax(clients.SecondsToDuration(settings.GetXFileTransferRetryWaitMaxSeconds().GetValue())),
clients.WithRetryClientHttpTimeout(clients.SecondsToDuration(settings.GetXFileTransferTimeoutSeconds().GetValue())),
)
tests := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion core/pkg/gowandb/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewSettings(args ...any) *SettingsWrap {
XOffline: &wrapperspb.BoolValue{
Value: (mode == "offline"),
},
XFileStreamTimeoutSeconds: &wrapperspb.Int32Value{
XFileStreamTimeoutSeconds: &wrapperspb.DoubleValue{
Value: 60,
},
XStatsSamplesToAverage: &wrapperspb.Int32Value{
Expand Down
18 changes: 9 additions & 9 deletions core/pkg/server/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ func NewSender(
return resp.StatusCode >= 400
}),
clients.WithRetryClientRetryMax(int(settings.GetXGraphqlRetryMax().GetValue())),
clients.WithRetryClientRetryWaitMin(time.Duration(settings.GetXGraphqlRetryWaitMinSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientRetryWaitMax(time.Duration(settings.GetXGraphqlRetryWaitMaxSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientHttpTimeout(time.Duration(settings.GetXGraphqlTimeoutSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientRetryWaitMin(clients.SecondsToDuration(settings.GetXGraphqlRetryWaitMinSeconds().GetValue())),
clients.WithRetryClientRetryWaitMax(clients.SecondsToDuration(settings.GetXGraphqlRetryWaitMaxSeconds().GetValue())),
clients.WithRetryClientHttpTimeout(clients.SecondsToDuration(settings.GetXGraphqlTimeoutSeconds().GetValue())),
clients.WithRetryClientBackoff(clients.ExponentialBackoffWithJitter),
)
url := fmt.Sprintf("%s/graphql", settings.GetBaseUrl().GetValue())
Expand All @@ -164,9 +164,9 @@ func NewSender(
return resp.StatusCode >= 400
}),
clients.WithRetryClientRetryMax(int(settings.GetXFileStreamRetryMax().GetValue())),
clients.WithRetryClientRetryWaitMin(time.Duration(settings.GetXFileStreamRetryWaitMinSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientRetryWaitMax(time.Duration(settings.GetXFileStreamRetryWaitMaxSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientHttpTimeout(time.Duration(settings.GetXFileStreamTimeoutSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientRetryWaitMin(clients.SecondsToDuration(settings.GetXFileStreamRetryWaitMinSeconds().GetValue())),
clients.WithRetryClientRetryWaitMax(clients.SecondsToDuration(settings.GetXFileStreamRetryWaitMaxSeconds().GetValue())),
clients.WithRetryClientHttpTimeout(clients.SecondsToDuration(settings.GetXFileStreamTimeoutSeconds().GetValue())),
clients.WithRetryClientHttpAuthTransport(sender.settings.GetApiKey().GetValue()),
clients.WithRetryClientBackoff(clients.ExponentialBackoffWithJitter),
// TODO(core:beta): add custom retry function
Expand All @@ -181,9 +181,9 @@ func NewSender(
clients.WithRetryClientLogger(logger),
clients.WithRetryClientRetryPolicy(clients.CheckRetry),
clients.WithRetryClientRetryMax(int(settings.GetXFileTransferRetryMax().GetValue())),
clients.WithRetryClientRetryWaitMin(time.Duration(settings.GetXFileTransferRetryWaitMinSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientRetryWaitMax(time.Duration(settings.GetXFileTransferRetryWaitMaxSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientHttpTimeout(time.Duration(settings.GetXFileTransferTimeoutSeconds().GetValue()*int32(time.Second))),
clients.WithRetryClientRetryWaitMin(clients.SecondsToDuration(settings.GetXFileTransferRetryWaitMinSeconds().GetValue())),
clients.WithRetryClientRetryWaitMax(clients.SecondsToDuration(settings.GetXFileTransferRetryWaitMaxSeconds().GetValue())),
clients.WithRetryClientHttpTimeout(clients.SecondsToDuration(settings.GetXFileTransferTimeoutSeconds().GetValue())),
clients.WithRetryClientBackoff(clients.ExponentialBackoffWithJitter),
)
defaultFileTransfer := filetransfer.NewDefaultFileTransfer(
Expand Down
Loading

0 comments on commit 92353fc

Please sign in to comment.