Skip to content

Commit

Permalink
Merge pull request kubernetes#88200 from liu-cong/benchmark
Browse files Browse the repository at this point in the history
Make MetricCollector configurable for scheduler benchmark tests
  • Loading branch information
k8s-ci-robot authored Feb 19, 2020
2 parents ddd6d66 + 7f56c75 commit d4c5637
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 108 deletions.
179 changes: 83 additions & 96 deletions test/integration/scheduler_perf/scheduler_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package benchmark
import (
"fmt"
"io/ioutil"
"sync/atomic"
"testing"
"time"

v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog"
Expand All @@ -39,11 +39,13 @@ const (
)

var (
defaultMetrics = []string{
"scheduler_scheduling_algorithm_predicate_evaluation_seconds",
"scheduler_scheduling_algorithm_priority_evaluation_seconds",
"scheduler_binding_duration_seconds",
"scheduler_e2e_scheduling_duration_seconds",
defaultMetricsCollectorConfig = metricsCollectorConfig{
Metrics: []string{
"scheduler_scheduling_algorithm_predicate_evaluation_seconds",
"scheduler_scheduling_algorithm_priority_evaluation_seconds",
"scheduler_binding_duration_seconds",
"scheduler_e2e_scheduling_duration_seconds",
},
}
)

Expand All @@ -52,8 +54,8 @@ var (
//
// It specifies nodes and pods in the cluster before running the test. It also specifies the pods to
// schedule during the test. The config can be as simple as just specify number of nodes/pods, where
// default spec will be applied. It also allows the user to specify a pod spec template for more compicated
// test cases.
// default spec will be applied. It also allows the user to specify a pod spec template for more
// complicated test cases.
//
// It also specifies the metrics to be collected after the test. If nothing is specified, default metrics
// such as scheduling throughput and latencies will be collected.
Expand All @@ -68,6 +70,8 @@ type testCase struct {
PodsToSchedule podCase
// optional, feature gates to set before running the test
FeatureGates map[featuregate.Feature]bool
// optional, replaces default defaultMetricsCollectorConfig if supplied.
MetricsCollectorConfig *metricsCollectorConfig
}

type nodeCase struct {
Expand Down Expand Up @@ -100,6 +104,11 @@ type testParams struct {
NumPodsToSchedule int
}

type testDataCollector interface {
run(stopCh chan struct{})
collect() []DataItem
}

func BenchmarkPerfScheduling(b *testing.B) {
dataItems := DataItems{Version: "v1"}
tests := getSimpleTestCases(configFile)
Expand All @@ -119,119 +128,97 @@ func BenchmarkPerfScheduling(b *testing.B) {
}

func perfScheduling(test testCase, b *testing.B) []DataItem {
var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
if test.Nodes.NodeAllocatableStrategy != nil {
nodeStrategy = test.Nodes.NodeAllocatableStrategy
} else if test.Nodes.LabelNodePrepareStrategy != nil {
nodeStrategy = test.Nodes.LabelNodePrepareStrategy
} else if test.Nodes.UniqueNodeLabelStrategy != nil {
nodeStrategy = test.Nodes.UniqueNodeLabelStrategy
}

setupPodStrategy := getPodStrategy(test.InitPods)
testPodStrategy := getPodStrategy(test.PodsToSchedule)

var nodeSpec *v1.Node
if test.Nodes.NodeTemplatePath != nil {
nodeSpec = getNodeSpecFromFile(test.Nodes.NodeTemplatePath)
}

finalFunc, podInformer, clientset := mustSetupScheduler()
defer finalFunc()

var nodePreparer testutils.TestNodePreparer
if nodeSpec != nil {
nodePreparer = framework.NewIntegrationTestNodePreparerWithNodeSpec(
clientset,
[]testutils.CountToStrategy{{Count: test.Nodes.Num, Strategy: nodeStrategy}},
nodeSpec,
)
} else {
nodePreparer = framework.NewIntegrationTestNodePreparer(
clientset,
[]testutils.CountToStrategy{{Count: test.Nodes.Num, Strategy: nodeStrategy}},
"scheduler-perf-",
)
}

nodePreparer := getNodePreparer(test.Nodes, clientset)
if err := nodePreparer.PrepareNodes(); err != nil {
klog.Fatalf("%v", err)
}
defer nodePreparer.CleanupNodes()

config := testutils.NewTestPodCreatorConfig()
config.AddStrategy(setupNamespace, test.InitPods.Num, setupPodStrategy)
podCreator := testutils.NewTestPodCreator(clientset, config)
podCreator.CreatePods()

for {
scheduled, err := getScheduledPods(podInformer)
if err != nil {
klog.Fatalf("%v", err)
}
if len(scheduled) >= test.InitPods.Num {
break
}
klog.Infof("got %d existing pods, required: %d", len(scheduled), test.InitPods.Num)
time.Sleep(1 * time.Second)
}

scheduled := int32(0)
completedCh := make(chan struct{})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)

if len(oldPod.Spec.NodeName) == 0 && len(curPod.Spec.NodeName) > 0 {
if atomic.AddInt32(&scheduled, 1) >= int32(test.PodsToSchedule.Num) {
completedCh <- struct{}{}
}
}
},
})
createPods(setupNamespace, test.InitPods, clientset)
waitNumPodsScheduled(test.InitPods.Num, podInformer)

// start benchmark
b.ResetTimer()

// Start measuring throughput
// Start test data collectors.
stopCh := make(chan struct{})
throughputCollector := newThroughputCollector(podInformer)
go throughputCollector.run(stopCh)
collectors := getTestDataCollectors(test, podInformer, b)
for _, collector := range collectors {
go collector.run(stopCh)
}

// Scheduling the main workload
config = testutils.NewTestPodCreatorConfig()
config.AddStrategy(testNamespace, test.PodsToSchedule.Num, testPodStrategy)
podCreator = testutils.NewTestPodCreator(clientset, config)
podCreator.CreatePods()
// Schedule the main workload
createPods(testNamespace, test.PodsToSchedule, clientset)
waitNumPodsScheduled(test.InitPods.Num+test.PodsToSchedule.Num, podInformer)

<-completedCh
close(stopCh)

// Note: without this line we're taking the overhead of defer() into account.
b.StopTimer()

setNameLabel := func(dataItem *DataItem) DataItem {
if dataItem.Labels == nil {
dataItem.Labels = map[string]string{}
var dataItems []DataItem
for _, collector := range collectors {
dataItems = append(dataItems, collector.collect()...)
}
return dataItems
}

func waitNumPodsScheduled(num int, podInformer coreinformers.PodInformer) {
for {
scheduled, err := getScheduledPods(podInformer)
if err != nil {
klog.Fatalf("%v", err)
}
if len(scheduled) >= num {
break
}
dataItem.Labels["Name"] = b.Name()
return *dataItem
klog.Infof("got %d existing pods, required: %d", len(scheduled), num)
time.Sleep(1 * time.Second)
}
}

dataItems := []DataItem{
setNameLabel(throughputCollector.collect()),
func getTestDataCollectors(tc testCase, podInformer coreinformers.PodInformer, b *testing.B) []testDataCollector {
collectors := []testDataCollector{newThroughputCollector(podInformer, map[string]string{"Name": b.Name()})}
metricsCollectorConfig := defaultMetricsCollectorConfig
if tc.MetricsCollectorConfig != nil {
metricsCollectorConfig = *tc.MetricsCollectorConfig
}
collectors = append(collectors, newMetricsCollector(metricsCollectorConfig, map[string]string{"Name": b.Name()}))
return collectors
}

for _, metric := range defaultMetrics {
dataItem := newMetricsCollector(metric).collect()
if dataItem == nil {
continue
}
dataItems = append(dataItems, setNameLabel(dataItem))
func getNodePreparer(nc nodeCase, clientset clientset.Interface) testutils.TestNodePreparer {
var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
if nc.NodeAllocatableStrategy != nil {
nodeStrategy = nc.NodeAllocatableStrategy
} else if nc.LabelNodePrepareStrategy != nil {
nodeStrategy = nc.LabelNodePrepareStrategy
} else if nc.UniqueNodeLabelStrategy != nil {
nodeStrategy = nc.UniqueNodeLabelStrategy
}

return dataItems
if nc.NodeTemplatePath != nil {
return framework.NewIntegrationTestNodePreparerWithNodeSpec(
clientset,
[]testutils.CountToStrategy{{Count: nc.Num, Strategy: nodeStrategy}},
getNodeSpecFromFile(nc.NodeTemplatePath),
)
}
return framework.NewIntegrationTestNodePreparer(
clientset,
[]testutils.CountToStrategy{{Count: nc.Num, Strategy: nodeStrategy}},
"scheduler-perf-",
)
}

func createPods(ns string, pc podCase, clientset clientset.Interface) {
strategy := getPodStrategy(pc)
config := testutils.NewTestPodCreatorConfig()
config.AddStrategy(ns, pc.Num, strategy)
podCreator := testutils.NewTestPodCreator(clientset, config)
podCreator.CreatePods()
}

func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy {
Expand Down
52 changes: 40 additions & 12 deletions test/integration/scheduler_perf/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,42 @@ func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
return ioutil.WriteFile(destFile, b, 0644)
}

// metricsCollectorConfig is the config to be marshalled to YAML config file.
type metricsCollectorConfig struct {
Metrics []string
}

// metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint.
// Currently only Histrogram metrics are supported.
type metricsCollector struct {
metric string
metricsCollectorConfig
labels map[string]string
}

func newMetricsCollector(metric string) *metricsCollector {
func newMetricsCollector(config metricsCollectorConfig, labels map[string]string) *metricsCollector {
return &metricsCollector{
metric: metric,
metricsCollectorConfig: config,
labels: labels,
}
}

func (pc *metricsCollector) collect() *DataItem {
hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, pc.metric)
func (*metricsCollector) run(stopCh chan struct{}) {
// metricCollector doesn't need to start before the tests, so nothing to do here.
}

func (pc *metricsCollector) collect() []DataItem {
var dataItems []DataItem
for _, metric := range pc.Metrics {
dataItem := collectHistogram(metric, pc.labels)
if dataItem != nil {
dataItems = append(dataItems, *dataItem)
}
}
return dataItems
}

func collectHistogram(metric string, labels map[string]string) *DataItem {
hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, metric)
if err != nil {
klog.Error(err)
return nil
Expand All @@ -153,10 +175,13 @@ func (pc *metricsCollector) collect() *DataItem {

msFactor := float64(time.Second) / float64(time.Millisecond)

// Copy labels and add "Metric" label for this metric.
labelMap := map[string]string{"Metric": metric}
for k, v := range labels {
labelMap[k] = v
}
return &DataItem{
Labels: map[string]string{
"Metric": pc.metric,
},
Labels: labelMap,
Data: map[string]float64{
"Perc50": q50 * msFactor,
"Perc90": q90 * msFactor,
Expand All @@ -170,11 +195,13 @@ func (pc *metricsCollector) collect() *DataItem {
type throughputCollector struct {
podInformer coreinformers.PodInformer
schedulingThroughputs []float64
labels map[string]string
}

func newThroughputCollector(podInformer coreinformers.PodInformer) *throughputCollector {
func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string) *throughputCollector {
return &throughputCollector{
podInformer: podInformer,
labels: labels,
}
}

Expand Down Expand Up @@ -205,8 +232,8 @@ func (tc *throughputCollector) run(stopCh chan struct{}) {
}
}

func (tc *throughputCollector) collect() *DataItem {
throughputSummary := &DataItem{}
func (tc *throughputCollector) collect() []DataItem {
throughputSummary := DataItem{Labels: tc.labels}
if length := len(tc.schedulingThroughputs); length > 0 {
sort.Float64s(tc.schedulingThroughputs)
sum := 0.0
Expand All @@ -225,5 +252,6 @@ func (tc *throughputCollector) collect() *DataItem {
}
throughputSummary.Unit = "pods/s"
}
return throughputSummary

return []DataItem{throughputSummary}
}

0 comments on commit d4c5637

Please sign in to comment.