Skip to content

Commit

Permalink
Merge pull request kubernetes#93873 from roycaihw/storage-version/han…
Browse files Browse the repository at this point in the history
…dler

Apiserver updates storageversions API and filters certain write requests during bootstrap
  • Loading branch information
k8s-ci-robot authored Nov 10, 2020
2 parents 734889e + 23f77ce commit 4261200
Show file tree
Hide file tree
Showing 27 changed files with 910 additions and 116 deletions.
9 changes: 9 additions & 0 deletions cmd/kube-apiserver/app/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/features"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
genericoptions "k8s.io/apiserver/pkg/server/options"
Expand Down Expand Up @@ -67,6 +68,14 @@ func createAggregatorConfig(
genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
genericConfig.RESTOptionsGetter = nil

if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
// Add StorageVersionPrecondition handler to aggregator-apiserver.
// The handler will block write requests to built-in resources until the
// target resources' storage versions are up-to-date.
genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
}

// override genericConfig.AdmissionControl with kube-aggregator's scheme,
// because aggregator apiserver should use its own scheme to convert its own resources.
err := commandOptions.Admission.ApplyTo(
Expand Down
1 change: 1 addition & 0 deletions cmd/kube-apiserver/app/testing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storageversion:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
Expand Down
84 changes: 46 additions & 38 deletions cmd/kube-apiserver/app/testing/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storageversion"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/cert"
Expand All @@ -49,9 +50,10 @@ type TearDownFunc func()
type TestServerInstanceOptions struct {
// DisableStorageCleanup Disable the automatic storage cleanup
DisableStorageCleanup bool

// Enable cert-auth for the kube-apiserver
EnableCertAuth bool
// Wrap the storage version interface of the created server's generic server.
StorageVersionWrapFunc func(storageversion.Manager) storageversion.Manager
}

// TestServer return values supplied by kube-test-ApiServer
Expand Down Expand Up @@ -182,6 +184,9 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig)
t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort)
server, err := app.CreateServerChain(completedOptions, stopCh)
if instanceOptions.StorageVersionWrapFunc != nil {
server.GenericAPIServer.StorageVersionManager = instanceOptions.StorageVersionWrapFunc(server.GenericAPIServer.StorageVersionManager)
}
if err != nil {
return result, fmt.Errorf("failed to create server chain: %v", err)
}
Expand All @@ -196,51 +201,54 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}
}(stopCh)

t.Logf("Waiting for /healthz to be ok...")

client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err)
}
// skip healthz check when we test the storage version manager poststart hook
if instanceOptions.StorageVersionWrapFunc == nil {
t.Logf("Waiting for /healthz to be ok...")

// wait until healthz endpoint returns ok
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err)
}

result := client.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
}
// wait until healthz endpoint returns ok
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}

// wait until default namespace is created
err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
result := client.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
}

if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil {
if !errors.IsNotFound(err) {
t.Logf("Unable to get default namespace: %v", err)
// wait until default namespace is created
err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}
return false, nil

if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil {
if !errors.IsNotFound(err) {
t.Logf("Unable to get default namespace: %v", err)
}
return false, nil
}
return true, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err)
}
return true, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err)
}

// from here the caller must call tearDown
Expand Down
47 changes: 45 additions & 2 deletions pkg/apis/apiserverinternal/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ func validateServerStorageVersion(ssv apiserverinternal.ServerStorageVersion, fl
for _, msg := range apimachineryvalidation.NameIsDNSSubdomain(ssv.APIServerID, false) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("apiServerID"), ssv.APIServerID, msg))
}
if errs := utilvalidation.IsDNS1035Label(ssv.EncodingVersion); len(errs) > 0 {
if errs := isValidAPIVersion(ssv.EncodingVersion); len(errs) > 0 {
allErrs = append(allErrs, field.Invalid(fldPath.Child("encodingVersion"), ssv.EncodingVersion, strings.Join(errs, ",")))
}

found := false
for i, dv := range ssv.DecodableVersions {
if errs := utilvalidation.IsDNS1035Label(dv); len(errs) > 0 {
if errs := isValidAPIVersion(dv); len(errs) > 0 {
allErrs = append(allErrs, field.Invalid(fldPath.Child("decodableVersions").Index(i), dv, strings.Join(errs, ",")))
}
if dv == ssv.EncodingVersion {
Expand Down Expand Up @@ -158,3 +158,46 @@ func validateStorageVersionCondition(conditions []apiserverinternal.StorageVersi
}
return allErrs
}

const dns1035LabelFmt string = "[a-z]([-a-z0-9]*[a-z0-9])?"
const dns1035LabelErrMsg string = "a DNS-1035 label, which must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character"

// isValidAPIVersion tests whether the value passed is a valid apiVersion. A
// valid apiVersion contains a version string that matches DNS_LABEL format,
// with an optional group/ prefix, where the group string matches DNS_SUBDOMAIN
// format. If the value is not valid, a list of error strings is returned.
// Otherwise an empty list (or nil) is returned.
func isValidAPIVersion(apiVersion string) []string {
var errs []string
parts := strings.Split(apiVersion, "/")
var version string
switch len(parts) {
case 1:
version = parts[0]
case 2:
var group string
group, version = parts[0], parts[1]
if len(group) == 0 {
errs = append(errs, "group part: "+utilvalidation.EmptyError())
} else if msgs := utilvalidation.IsDNS1123Subdomain(group); len(msgs) != 0 {
errs = append(errs, prefixEach(msgs, "group part: ")...)
}
default:
return append(errs, "an apiVersion is "+utilvalidation.RegexError(dns1035LabelErrMsg, dns1035LabelFmt, "my-name", "abc-123")+
" with an optional DNS subdomain prefix and '/' (e.g. 'example.com/MyVersion')")
}

if len(version) == 0 {
errs = append(errs, "version part: "+utilvalidation.EmptyError())
} else if msgs := utilvalidation.IsDNS1035Label(version); len(msgs) != 0 {
errs = append(errs, prefixEach(msgs, "version part: ")...)
}
return errs
}

func prefixEach(msgs []string, prefix string) []string {
for i := range msgs {
msgs[i] = prefix + msgs[i]
}
return msgs
}
56 changes: 56 additions & 0 deletions pkg/apis/apiserverinternal/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,62 @@ func TestValidateServerStorageVersion(t *testing.T) {
},
expectedErr: "",
},
{
ssv: apiserverinternal.ServerStorageVersion{
APIServerID: "fea",
EncodingVersion: "mygroup.com/v2",
DecodableVersions: []string{"v1alpha1", "v1", "mygroup.com/v2"},
},
expectedErr: "",
},
{
ssv: apiserverinternal.ServerStorageVersion{
APIServerID: "fea",
EncodingVersion: "mygroup.com/v2",
DecodableVersions: []string{"mygroup.com/v2", "/v3"},
},
expectedErr: `[].decodableVersions[1]: Invalid value: "/v3": group part: must be non-empty`,
},
{
ssv: apiserverinternal.ServerStorageVersion{
APIServerID: "fea",
EncodingVersion: "mygroup.com/v2",
DecodableVersions: []string{"mygroup.com/v2", "mygroup.com/"},
},
expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/": version part: must be non-empty`,
},
{
ssv: apiserverinternal.ServerStorageVersion{
APIServerID: "fea",
EncodingVersion: "/v3",
DecodableVersions: []string{"mygroup.com/v2", "/v3"},
},
expectedErr: `[].encodingVersion: Invalid value: "/v3": group part: must be non-empty`,
},
{
ssv: apiserverinternal.ServerStorageVersion{
APIServerID: "fea",
EncodingVersion: "v1",
DecodableVersions: []string{"v1", "mygroup_com/v2"},
},
expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup_com/v2": group part: a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character (e.g. 'example.com', regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*')`,
},
{
ssv: apiserverinternal.ServerStorageVersion{
APIServerID: "fea",
EncodingVersion: "v1",
DecodableVersions: []string{"v1", "mygroup.com/v2_"},
},
expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/v2_": version part: a DNS-1035 label must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123', regex used for validation is '[a-z]([-a-z0-9]*[a-z0-9])?')`,
},
{
ssv: apiserverinternal.ServerStorageVersion{
APIServerID: "fea",
EncodingVersion: "v1",
DecodableVersions: []string{"v1", "mygroup.com/v2/myresource"},
},
expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/v2/myresource": an apiVersion is a DNS-1035 label, which must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123', regex used for validation is '[a-z]([-a-z0-9]*[a-z0-9])?') with an optional DNS subdomain prefix and '/' (e.g. 'example.com/MyVersion')`,
},
}

for _, tc := range cases {
Expand Down
21 changes: 17 additions & 4 deletions pkg/registry/rbac/rest/storage_rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
rbacapiv1 "k8s.io/api/rbac/v1"
rbacapiv1alpha1 "k8s.io/api/rbac/v1alpha1"
rbacapiv1beta1 "k8s.io/api/rbac/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -160,6 +161,14 @@ type PolicyData struct {
ClusterRoleBindingsToSplit map[string]rbacapiv1.ClusterRoleBinding
}

func isConflictOrServiceUnavailable(err error) bool {
return errors.IsConflict(err) || errors.IsServiceUnavailable(err)
}

func retryOnConflictOrServiceUnavailable(backoff wait.Backoff, fn func() error) error {
return retry.OnError(backoff, isConflictOrServiceUnavailable, fn)
}

func (p *PolicyData) EnsureRBACPolicy() genericapiserver.PostStartHookFunc {
return func(hookContext genericapiserver.PostStartHookContext) error {
// initializing roles is really important. On some e2e runs, we've seen cases where etcd is down when the server
Expand Down Expand Up @@ -206,7 +215,8 @@ func (p *PolicyData) EnsureRBACPolicy() genericapiserver.PostStartHookFunc {
Client: reconciliation.ClusterRoleModifier{Client: clientset.ClusterRoles()},
Confirm: true,
}
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// ServiceUnavailble error is returned when the API server is blocked by storage version updates
err := retryOnConflictOrServiceUnavailable(retry.DefaultBackoff, func() error {
result, err := opts.Run()
if err != nil {
return err
Expand Down Expand Up @@ -234,7 +244,8 @@ func (p *PolicyData) EnsureRBACPolicy() genericapiserver.PostStartHookFunc {
Client: reconciliation.ClusterRoleBindingClientAdapter{Client: clientset.ClusterRoleBindings()},
Confirm: true,
}
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// ServiceUnavailble error is returned when the API server is blocked by storage version updates
err := retryOnConflictOrServiceUnavailable(retry.DefaultBackoff, func() error {
result, err := opts.Run()
if err != nil {
return err
Expand Down Expand Up @@ -265,7 +276,8 @@ func (p *PolicyData) EnsureRBACPolicy() genericapiserver.PostStartHookFunc {
Client: reconciliation.RoleModifier{Client: clientset, NamespaceClient: coreclientset.Namespaces()},
Confirm: true,
}
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// ServiceUnavailble error is returned when the API server is blocked by storage version updates
err := retryOnConflictOrServiceUnavailable(retry.DefaultBackoff, func() error {
result, err := opts.Run()
if err != nil {
return err
Expand Down Expand Up @@ -295,7 +307,8 @@ func (p *PolicyData) EnsureRBACPolicy() genericapiserver.PostStartHookFunc {
Client: reconciliation.RoleBindingClientAdapter{Client: clientset, NamespaceClient: coreclientset.Namespaces()},
Confirm: true,
}
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// ServiceUnavailble error is returned when the API server is blocked by storage version updates
err := retryOnConflictOrServiceUnavailable(retry.DefaultBackoff, func() error {
result, err := opts.Run()
if err != nil {
return err
Expand Down
11 changes: 8 additions & 3 deletions pkg/registry/scheduling/rest/storage_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,16 @@ func AddSystemPriorityClasses() genericapiserver.PostStartHookFunc {
if err != nil {
if apierrors.IsNotFound(err) {
_, err := schedClientSet.PriorityClasses().Create(context.TODO(), pc, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return false, err
} else {
if err == nil || apierrors.IsAlreadyExists(err) {
klog.Infof("created PriorityClass %s with value %v", pc.Name, pc.Value)
continue
}
// ServiceUnavailble error is returned when the API server is blocked by storage version updates
if apierrors.IsServiceUnavailable(err) {
klog.Infof("going to retry, unable to create PriorityClass %s: %v", pc.Name, err)
return false, nil
}
return false, err
} else {
// Unable to get the priority class for reasons other than "not found".
klog.Warningf("unable to get PriorityClass %v: %v. Retrying...", pc.Name, err)
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/endpoints/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/endpoints/warning:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storageversion:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/component-base/version:go_default_library",
"//vendor/github.com/emicklei/go-restful:go_default_library",
Expand Down
Loading

0 comments on commit 4261200

Please sign in to comment.