Skip to content

Commit

Permalink
Refactor autoscaling test helpers (knative#9712)
Browse files Browse the repository at this point in the history
* Refactor autoscaling test helpers

* Normalize timeouts, add comments

* Fix nit
  • Loading branch information
markusthoemmes authored Oct 7, 2020
1 parent d0dcbd1 commit 5dfeb5f
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 87 deletions.
105 changes: 29 additions & 76 deletions test/e2e/autoscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func getVegetaTarget(kubeClientset *kubernetes.Clientset, domain, endpointOverri
}, nil
}

var err error
// If the domain that the Route controller is configured to assign to Route.Status.Domain
// (the domainSuffix) is not resolvable, we need to retrieve the endpoint and spoof
// the Host in our requests.
Expand All @@ -96,7 +95,6 @@ func generateTraffic(
ctx *TestContext,
attacker *vegeta.Attacker,
pacer vegeta.Pacer,
duration time.Duration,
stopChan chan struct{}) error {

target, err := getVegetaTarget(
Expand All @@ -105,7 +103,8 @@ func generateTraffic(
return fmt.Errorf("error creating vegeta target: %w", err)
}

results := attacker.Attack(vegeta.NewStaticTargeter(target), pacer, duration, "load-test")
// The 0 duration means that the attack will only be controlled by the `Stop` function.
results := attacker.Attack(vegeta.NewStaticTargeter(target), pacer, 0, "load-test")
defer attacker.Stop()

var (
Expand Down Expand Up @@ -142,35 +141,23 @@ func generateTraffic(
}
}

func generateTrafficAtFixedConcurrency(ctx *TestContext, concurrency int, duration time.Duration, stopChan chan struct{}) error {
func generateTrafficAtFixedConcurrency(ctx *TestContext, concurrency int, stopChan chan struct{}) error {
pacer := vegeta.ConstantPacer{} // Sends requests as quickly as possible, capped by MaxWorkers below.
attacker := vegeta.NewAttacker(vegeta.Timeout(duration), vegeta.Workers(uint64(concurrency)), vegeta.MaxWorkers(uint64(concurrency)))
attacker := vegeta.NewAttacker(
vegeta.Timeout(0), // No timeout is enforced at all.
vegeta.Workers(uint64(concurrency)),
vegeta.MaxWorkers(uint64(concurrency)))

ctx.t.Logf("Maintaining %d concurrent requests for %v.", concurrency, duration)
return generateTraffic(ctx, attacker, pacer, duration, stopChan)
ctx.t.Logf("Maintaining %d concurrent requests.", concurrency)
return generateTraffic(ctx, attacker, pacer, stopChan)
}

func generateTrafficAtFixedRPS(ctx *TestContext, rps int, duration time.Duration, stopChan chan struct{}) error {
func generateTrafficAtFixedRPS(ctx *TestContext, rps int, stopChan chan struct{}) error {
pacer := vegeta.ConstantPacer{Freq: rps, Per: time.Second}
attacker := vegeta.NewAttacker(vegeta.Timeout(duration))
attacker := vegeta.NewAttacker(vegeta.Timeout(0)) // No timeout is enforced at all.

ctx.t.Logf("Maintaining %v RPS requests for %v.", rps, duration)
return generateTraffic(ctx, attacker, pacer, duration, stopChan)
}

func validateEndpoint(t *testing.T, clients *test.Clients, names test.ResourceNames) error {
ctx := context.Background()
_, err := pkgTest.WaitForEndpointState(
ctx,
clients.KubeClient,
t.Logf,
names.URL,
v1test.RetryingRouteInconsistency(pkgTest.MatchesAllOf(pkgTest.IsStatusOK)),
"CheckingEndpointAfterUpdating",
test.ServingFlags.ResolvableDomain,
test.AddRootCAtoTransport(ctx, t.Logf, clients, test.ServingFlags.HTTPS),
)
return err
ctx.t.Logf("Maintaining %v RPS.", rps)
return generateTraffic(ctx, attacker, pacer, stopChan)
}

func toPercentageString(f float64) string {
Expand Down Expand Up @@ -217,7 +204,16 @@ func SetupSvc(t *testing.T, class, metric string, target int, targetUtilization
t.Fatalf("Failed to create initial Service: %v: %v", names.Service, err)
}

if err := validateEndpoint(t, clients, names); err != nil {
if _, err := pkgTest.WaitForEndpointState(
context.Background(),
clients.KubeClient,
t.Logf,
names.URL,
v1test.RetryingRouteInconsistency(pkgTest.MatchesAllOf(pkgTest.IsStatusOK)),
"CheckingEndpointAfterCreate",
test.ServingFlags.ResolvableDomain,
test.AddRootCAtoTransport(context.Background(), t.Logf, clients, test.ServingFlags.HTTPS),
); err != nil {
t.Fatalf("Error probing %s: %v", names.URL.Hostname(), err)
}

Expand Down Expand Up @@ -287,11 +283,7 @@ func numberOfReadyPods(ctx *TestContext) (float64, error) {
return float64(resources.ReadyAddressCount(eps)), nil
}

func checkPodScale(ctx *TestContext, targetPods, minPods, maxPods float64, duration time.Duration, quick bool) error {
return checkPodScaleWithDone(ctx, targetPods, minPods, maxPods, time.After(duration), quick)
}

func checkPodScaleWithDone(ctx *TestContext, targetPods, minPods, maxPods float64, done <-chan time.Time, quick bool) error {
func checkPodScale(ctx *TestContext, targetPods, minPods, maxPods float64, done <-chan time.Time, quick bool) error {
// Short-circuit traffic generation once we exit from the check logic.
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -340,30 +332,14 @@ func checkPodScaleWithDone(ctx *TestContext, targetPods, minPods, maxPods float6
}
}

// assertAutoscaleUpToNumPods asserts the number of pods gets scaled to targetPods.
// It supports two test modes: quick, and not quick.
// 1) Quick mode: succeeds when the number of pods meets targetPods.
// 2) Not Quick (sustaining) mode: succeeds when the number of pods gets scaled to targetPods and
// sustains there for the `duration`.
func assertAutoscaleUpToNumPods(ctx *TestContext, curPods, targetPods float64, duration time.Duration, quick bool) {
assertAutoscaleUpToNumPodsWithDurationAndDone(ctx, curPods, targetPods, duration, time.After(duration), quick)
}

// AssertAutoscaleUpToNumPodsWithDone asserts the number of pods gets scaled to targetPods and
// sustains there until the `done` channel sends a signal.
func AssertAutoscaleUpToNumPodsWithDone(ctx *TestContext, curPods, targetPods float64, done <-chan time.Time) {
// We use 10 hours here which should be suffient for the e2e tests.
assertAutoscaleUpToNumPodsWithDurationAndDone(ctx, curPods, targetPods, 10*time.Hour, done, false /* quick */)
}

// assertAutoscaleUpToNumPodsWithDurationAndDone asserts the number of pods gets scaled to targetPods.
// AssertAutoscaleUpToNumPods asserts the number of pods gets scaled to targetPods.
// It supports two test modes: quick, and not quick.
// 1) Quick mode: succeeds when the number of pods meets targetPods.
// 2) Not Quick (sustaining) mode: succeeds when the number of pods gets scaled to targetPods and
// sustains there until the `done` channel sends a signal.
// The given `duration` is how long the traffic will be generated. You must make sure that the signal
// from the given `done` channel will be sent within the `duration`.
func assertAutoscaleUpToNumPodsWithDurationAndDone(ctx *TestContext, curPods, targetPods float64, duration time.Duration, done <-chan time.Time, quick bool) {
func AssertAutoscaleUpToNumPods(ctx *TestContext, curPods, targetPods float64, done <-chan time.Time, quick bool) {
ctx.t.Helper()

// Relax the bounds to reduce the flakiness caused by sampling in the autoscaling algorithm.
Expand All @@ -376,41 +352,18 @@ func assertAutoscaleUpToNumPodsWithDurationAndDone(ctx *TestContext, curPods, ta
grp.Go(func() error {
switch ctx.metric {
case autoscaling.RPS:
return generateTrafficAtFixedRPS(ctx, int(targetPods*float64(ctx.targetValue)), duration, stopChan)
return generateTrafficAtFixedRPS(ctx, int(targetPods*float64(ctx.targetValue)), stopChan)
default:
return generateTrafficAtFixedConcurrency(ctx, int(targetPods*float64(ctx.targetValue)), duration, stopChan)
return generateTrafficAtFixedConcurrency(ctx, int(targetPods*float64(ctx.targetValue)), stopChan)
}
})

grp.Go(func() error {
defer close(stopChan)
return checkPodScaleWithDone(ctx, targetPods, minPods, maxPods, done, quick)
return checkPodScale(ctx, targetPods, minPods, maxPods, done, quick)
})

if err := grp.Wait(); err != nil {
ctx.t.Errorf("Error: %v", err)
}
}

// runAutoscaleUpCountPods is a test kernel to test the chosen autoscaler using the given
// metric tracks the given target.
func runAutoscaleUpCountPods(t *testing.T, class, metric string) {
target := containerConcurrency
if metric == autoscaling.RPS {
target = 10
}

ctx := SetupSvc(t, class, metric, target, targetUtilization)

ctx.t.Log("The autoscaler spins up additional replicas when traffic increases.")
// note: without the warm-up / gradual increase of load the test is retrieving a 503 (overload) from the envoy

// Increase workload for 2 replicas for 60s
// Assert the number of expected replicas is between n-1 and n+1, where n is the # of desired replicas for 60s.
// Assert the number of expected replicas is n and n+1 at the end of 60s, where n is the # of desired replicas.
assertAutoscaleUpToNumPods(ctx, 1, 2, 60*time.Second, true)
// Increase workload scale to 3 replicas, assert between [n-1, n+1] during scale up, assert between [n, n+1] after scaleup.
assertAutoscaleUpToNumPods(ctx, 2, 3, 60*time.Second, true)
// Increase workload scale to 4 replicas, assert between [n-1, n+1] during scale up, assert between [n, n+1] after scaleup.
assertAutoscaleUpToNumPods(ctx, 3, 4, 60*time.Second, true)
}
36 changes: 28 additions & 8 deletions test/e2e/autoscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func TestAutoscaleUpDownUp(t *testing.T) {

ctx := SetupSvc(t, autoscaling.KPA, autoscaling.Concurrency, containerConcurrency, targetUtilization)

assertAutoscaleUpToNumPods(ctx, 1, 2, 60*time.Second, true)
AssertAutoscaleUpToNumPods(ctx, 1, 2, time.After(60*time.Second), true /* quick */)
assertScaleDown(ctx)
assertAutoscaleUpToNumPods(ctx, 0, 2, 60*time.Second, true)
AssertAutoscaleUpToNumPods(ctx, 0, 2, time.After(60*time.Second), true /* quick */)
}

func TestAutoscaleUpCountPods(t *testing.T) {
Expand All @@ -57,14 +57,37 @@ func TestRPSBasedAutoscaleUpCountPods(t *testing.T) {
runAutoscaleUpCountPods(t, autoscaling.KPA, autoscaling.RPS)
}

// runAutoscaleUpCountPods is a test kernel to test the chosen autoscaler using the given
// metric tracks the given target.
func runAutoscaleUpCountPods(t *testing.T, class, metric string) {
target := containerConcurrency
if metric == autoscaling.RPS {
target = 10
}

ctx := SetupSvc(t, class, metric, target, targetUtilization)

ctx.t.Log("The autoscaler spins up additional replicas when traffic increases.")
// note: without the warm-up / gradual increase of load the test is retrieving a 503 (overload) from the envoy

// Increase workload for 2 replicas for 60s
// Assert the number of expected replicas is between n-1 and n+1, where n is the # of desired replicas for 60s.
// Assert the number of expected replicas is n and n+1 at the end of 60s, where n is the # of desired replicas.
AssertAutoscaleUpToNumPods(ctx, 1, 2, time.After(60*time.Second), true /* quick */)
// Increase workload scale to 3 replicas, assert between [n-1, n+1] during scale up, assert between [n, n+1] after scaleup.
AssertAutoscaleUpToNumPods(ctx, 2, 3, time.After(60*time.Second), true /* quick */)
// Increase workload scale to 4 replicas, assert between [n-1, n+1] during scale up, assert between [n, n+1] after scaleup.
AssertAutoscaleUpToNumPods(ctx, 3, 4, time.After(60*time.Second), true /* quick */)
}

func TestAutoscaleSustaining(t *testing.T) {
// When traffic increases, a knative app should scale up and sustain the scale
// as long as the traffic sustains, despite whether it is switching modes between
// normal and panic.
t.Parallel()

ctx := SetupSvc(t, autoscaling.KPA, autoscaling.Concurrency, containerConcurrency, targetUtilization)
assertAutoscaleUpToNumPods(ctx, 1, 10, 2*time.Minute, false)
AssertAutoscaleUpToNumPods(ctx, 1, 10, time.After(2*time.Minute), false /* quick */)
}

func TestTargetBurstCapacity(t *testing.T) {
Expand Down Expand Up @@ -92,11 +115,8 @@ func TestTargetBurstCapacity(t *testing.T) {
defer grp.Wait()
defer close(stopCh)

// We'll terminate the test via stopCh.
const duration = time.Hour

grp.Go(func() error {
return generateTrafficAtFixedConcurrency(ctx, 7, duration, stopCh)
return generateTrafficAtFixedConcurrency(ctx, 7, stopCh)
})

// Wait for the activator endpoints to equalize.
Expand All @@ -106,7 +126,7 @@ func TestTargetBurstCapacity(t *testing.T) {

// Start second load generator.
grp.Go(func() error {
return generateTrafficAtFixedConcurrency(ctx, 5, duration, stopCh)
return generateTrafficAtFixedConcurrency(ctx, 5, stopCh)
})

// Wait for two stable pods.
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func assertGRPCAutoscaleUpToNumPods(ctx *TestContext, curPods, targetPods float6

grp.Go(func() error {
defer close(stopChan)
return checkPodScale(ctx, targetPods, minPods, maxPods, duration, true /* quick */)
return checkPodScale(ctx, targetPods, minPods, maxPods, time.After(duration), true /* quick */)
})

if err := grp.Wait(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/upgrade/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestAutoscaleSustaining(t *testing.T) {
close(stopCh)
}()

e2e.AssertAutoscaleUpToNumPodsWithDone(ctx, 1, 5, stopCh)
e2e.AssertAutoscaleUpToNumPods(ctx, 1, 5, stopCh, false /* quick */)
}

func TestAutoscaleSustainingWithTBC(t *testing.T) {
Expand All @@ -79,5 +79,5 @@ func TestAutoscaleSustainingWithTBC(t *testing.T) {
close(stopCh)
}()

e2e.AssertAutoscaleUpToNumPodsWithDone(ctx, 1, 5, stopCh)
e2e.AssertAutoscaleUpToNumPods(ctx, 1, 5, stopCh, false /* quick */)
}

0 comments on commit 5dfeb5f

Please sign in to comment.