Skip to content

Commit

Permalink
application-grid-controller and application-grid-wrapper support k8s …
Browse files Browse the repository at this point in the history
…1.22 (superedge#371)

application-grid-controller and application-grid-wrapper support k8s 1.22 

Signed-off-by: 00pf00 <[email protected]>
  • Loading branch information
00pf00 authored Apr 21, 2022
1 parent 4da713e commit e81d211
Show file tree
Hide file tree
Showing 25 changed files with 28,906 additions and 26,415 deletions.
1 change: 1 addition & 0 deletions build/docker/application-grid-controller/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
From alpine:3.9

ADD application-grid-controller /usr/local/bin
COPY manifests /etc/superedge/application-grid-controller/manifests

ENTRYPOINT ["/usr/local/bin/application-grid-controller"]
3 changes: 3 additions & 0 deletions build/docker/application-grid-controller/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

cp -r pkg/application-grid-controller/manifests "$DST_DIR"
17 changes: 8 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/dgraph-io/badger/v3 v3.2011.1
github.com/golang/protobuf v1.5.2
github.com/google/flatbuffers v1.12.1
github.com/google/flatbuffers v1.12.1 // indirect
github.com/google/uuid v1.1.2
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.14.6 // indirect
github.com/hashicorp/go-multierror v1.1.0
github.com/lithammer/dedent v1.1.0
github.com/moby/term v0.0.0-20200312100748-672ec06f55cd
github.com/moby/term v0.0.0-20201216013528-df9cb8a40635
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/pelletier/go-toml v1.2.0
github.com/pkg/errors v0.9.1
Expand All @@ -27,7 +27,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/tarscloud/gopractice v1.0.1
go.etcd.io/bbolt v1.3.5
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3
google.golang.org/grpc v1.29.1
Expand All @@ -42,7 +42,6 @@ require (
k8s.io/cluster-bootstrap v0.22.3
k8s.io/code-generator v0.22.3
k8s.io/component-base v0.22.3
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.8.0
k8s.io/kubernetes v1.19.14
k8s.io/system-validators v1.4.0 // indirect
Expand All @@ -57,16 +56,16 @@ replace (
go.etcd.io/etcd => go.etcd.io/etcd v0.5.0-alpha.5.0.20200329194405-dd816f0735f8
gopkg.in/yaml.v2 => gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v3 => gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
k8s.io/api => k8s.io/api v0.20.5
k8s.io/api => k8s.io/api v0.21.0
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.20.5
k8s.io/apimachinery => k8s.io/apimachinery v0.20.5
k8s.io/apiserver => k8s.io/apiserver v0.20.5
k8s.io/apimachinery => k8s.io/apimachinery v0.21.0
k8s.io/apiserver => k8s.io/apiserver v0.21.0
k8s.io/cli-runtime => k8s.io/cli-runtime v0.20.5
k8s.io/client-go => k8s.io/client-go v0.20.5
k8s.io/client-go => k8s.io/client-go v0.21.0
k8s.io/cloud-provider => k8s.io/cloud-provider v0.20.5
k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.20.5
k8s.io/code-generator => k8s.io/code-generator v0.20.5
k8s.io/component-base => k8s.io/component-base v0.20.5
k8s.io/component-base => k8s.io/component-base v0.21.0
k8s.io/component-helpers => k8s.io/component-helpers v0.20.5
k8s.io/controller-manager => k8s.io/controller-manager v0.20.5
k8s.io/cri-api => k8s.io/cri-api v0.20.5
Expand Down
95 changes: 74 additions & 21 deletions go.sum

Large diffs are not rendered by default.

12,621 changes: 0 additions & 12,621 deletions pkg/application-grid-controller/controller/common/deploymentgrid-crd.go

This file was deleted.

497 changes: 0 additions & 497 deletions pkg/application-grid-controller/controller/common/servicegrid-crd.go

This file was deleted.

13,028 changes: 0 additions & 13,028 deletions pkg/application-grid-controller/controller/common/statefulsetgrid-crd.go

This file was deleted.

13,657 changes: 13,657 additions & 0 deletions pkg/application-grid-controller/manifests/superedge.io_deploymentgrids.yaml

Large diffs are not rendered by default.

Large diffs are not rendered by default.

14,065 changes: 14,065 additions & 0 deletions pkg/application-grid-controller/manifests/superedge.io_statefulsetgrids.yaml

Large diffs are not rendered by default.

40 changes: 29 additions & 11 deletions pkg/application-grid-controller/prepare/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"context"
"github.com/superedge/superedge/pkg/application-grid-controller/controller/common"
"github.com/superedge/superedge/pkg/util/kubeclient"
"io/ioutil"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
"time"

"fmt"
apiext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -35,6 +36,12 @@ import (
"k8s.io/klog/v2"
)

const (
DeploymentGridCRDYaml = "/etc/superedge/application-grid-controller/manifests/superedge.io_deploymentgrids.yaml"
StatefulSetGridCRDYaml = "/etc/superedge/application-grid-controller/manifests/superedge.io_statefulsetgrids.yaml"
ServiceGridCRDYaml = "/etc/superedge/application-grid-controller/manifests/superedge.io_servicegrids.yaml"
)

type crdPreparator struct {
client clientset.Interface
}
Expand Down Expand Up @@ -84,11 +91,23 @@ func (p *crdPreparator) createOrUpdateCRD(gvk schema.GroupVersionKind) (*apiext.
// create specified GroupVersionKind edge CRD
switch gvk.Kind {
case common.DeploymentGridKind:
crdBytes, err = kubeclient.ParseString(common.DeploymentGridCRDYaml, map[string]interface{}{})
f, err := ioutil.ReadFile(DeploymentGridCRDYaml)
if err != nil {
return nil, err
}
crdBytes, err = kubeclient.ParseString(string(f), map[string]interface{}{})
case common.StatefulSetGridKind:
crdBytes, err = kubeclient.ParseString(common.StatefulSetGridCRDYaml, map[string]interface{}{})
f, err := ioutil.ReadFile(StatefulSetGridCRDYaml)
if err != nil {
return nil, err
}
crdBytes, err = kubeclient.ParseString(string(f), map[string]interface{}{})
case common.ServiceGridKind:
crdBytes, err = kubeclient.ParseString(common.ServiceGridCRDYaml, map[string]interface{}{})
f, err := ioutil.ReadFile(ServiceGridCRDYaml)
if err != nil {
return nil, err
}
crdBytes, err = kubeclient.ParseString(string(f), map[string]interface{}{})
default:
err = fmt.Errorf("Invalid edge group version kind resource %s", gvk.Kind)
}
Expand All @@ -101,24 +120,24 @@ func (p *crdPreparator) createOrUpdateCRD(gvk schema.GroupVersionKind) (*apiext.
return nil, err
}
// create or update relevant edge CRD
curCRD, err := p.client.ApiextensionsV1beta1().CustomResourceDefinitions().Get(context.TODO(), crd.Name, metav1.GetOptions{})
curCRD, err := p.client.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crd.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
// try to create edge CRD
klog.V(4).Infof("Creating CRD %s", crd.Name)
if newCrd, err := p.client.ApiextensionsV1beta1().CustomResourceDefinitions().Create(context.TODO(), crd, metav1.CreateOptions{}); errors.IsAlreadyExists(err) {
return p.client.ApiextensionsV1beta1().CustomResourceDefinitions().Get(context.TODO(), crd.Name, metav1.GetOptions{})
if newCrd, err := p.client.ApiextensionsV1().CustomResourceDefinitions().Create(context.TODO(), crd, metav1.CreateOptions{}); errors.IsAlreadyExists(err) {
return p.client.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crd.Name, metav1.GetOptions{})
} else if err != nil {
return nil, err
} else {
return newCrd, nil
}
}
// update edge CRD if necessary
if !equality.Semantic.DeepEqual(crd.Spec.Validation, curCRD.Spec.Validation) ||
if !equality.Semantic.DeepEqual(crd.Spec.Versions, curCRD.Spec.Versions) ||
!equality.Semantic.DeepEqual(crd.Spec.Versions, curCRD.Spec.Versions) {
curCRD.Spec = crd.Spec
klog.V(4).Infof("Updating CRD %s", crd.Name)
return p.client.ApiextensionsV1beta1().CustomResourceDefinitions().Update(context.TODO(), curCRD, metav1.UpdateOptions{})
return p.client.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), curCRD, metav1.UpdateOptions{})
}
return curCRD, nil
}
Expand All @@ -135,11 +154,10 @@ func (p *crdPreparator) waitCRD(name string) error {
}
first = false

crd, err := p.client.ApiextensionsV1beta1().CustomResourceDefinitions().Get(context.TODO(), name, metav1.GetOptions{})
crd, err := p.client.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return false, err
}

for _, cond := range crd.Status.Conditions {
switch cond.Type {
case apiext.Established:
Expand Down
40 changes: 0 additions & 40 deletions pkg/application-grid-controller/prepare/crd_test.go

This file was deleted.

108 changes: 99 additions & 9 deletions pkg/application-grid-wrapper/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
"time"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discoveryv1 "k8s.io/api/discovery/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -285,9 +286,9 @@ func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http
})
}

func (s *interceptorServer) interceptEndpointSliceRequest(handler http.Handler) http.Handler {
func (s *interceptorServer) interceptEndpointSliceV1Request(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || !strings.HasPrefix(r.URL.Path, "/apis/discovery.k8s.io/v1beta1/endpointslices") {
if r.Method != http.MethodGet || !strings.HasPrefix(r.URL.Path, "/apis/discovery.k8s.io/v1/endpointslices") {
handler.ServeHTTP(w, r)
return
}
Expand All @@ -301,17 +302,17 @@ func (s *interceptorServer) interceptEndpointSliceRequest(handler http.Handler)
return
}

encoder := scheme.Codecs.EncoderForVersion(info.Serializer, discovery.SchemeGroupVersion)
encoder := scheme.Codecs.EncoderForVersion(info.Serializer, discoveryv1.SchemeGroupVersion)
// list request
if queries.Get("watch") == "" {
w.Header().Set("Content-Type", info.MediaType)
allEndpointSlices := s.cache.GetEndpointSlice()
epsItems := make([]discovery.EndpointSlice, 0, len(allEndpointSlices))
allEndpointSlices := s.cache.GetEndpointSliceV1()
epsItems := make([]discoveryv1.EndpointSlice, 0, len(allEndpointSlices))
for _, eps := range allEndpointSlices {
epsItems = append(epsItems, *eps)
}

epsList := &discovery.EndpointSliceList{
epsList := &discoveryv1.EndpointSliceList{
Items: epsItems,
}

Expand Down Expand Up @@ -360,15 +361,104 @@ func (s *interceptorServer) interceptEndpointSliceRequest(handler http.Handler)
return
case <-timer.C:
return
case evt := <-s.endpointSliceWatchCh:
case evt := <-s.endpointSliceV1WatchCh:
klog.V(4).Infof("Send endpointSlice watch event: %+#v", evt)
err := e.Encode(&evt)
if err != nil {
klog.Errorf("can't encode watch event, %v", err)
return
}
if len(s.endpointSliceV1WatchCh) == 0 {
flusher.Flush()
}
}
}
})
}

func (s *interceptorServer) interceptEndpointSliceV1Beta1Request(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet || !strings.HasPrefix(r.URL.Path, "/apis/discovery.k8s.io/v1beta1/endpointslices") {
handler.ServeHTTP(w, r)
return
}

queries := r.URL.Query()
acceptType := r.Header.Get("Accept")
info, found := s.parseAccept(acceptType, s.mediaSerializer)
if !found {
klog.Errorf("can't find %s serializer", acceptType)
w.WriteHeader(http.StatusBadRequest)
return
}

encoder := scheme.Codecs.EncoderForVersion(info.Serializer, discoveryv1beta1.SchemeGroupVersion)
// list request
if queries.Get("watch") == "" {
w.Header().Set("Content-Type", info.MediaType)
allEndpointSlices := s.cache.GetEndpointSliceV1Beta1()
epsItems := make([]discoveryv1beta1.EndpointSlice, 0, len(allEndpointSlices))
for _, eps := range allEndpointSlices {
epsItems = append(epsItems, *eps)
}

epsList := &discoveryv1beta1.EndpointSliceList{
Items: epsItems,
}

err := encoder.Encode(epsList, w)
if err != nil {
klog.Errorf("can't marshal endpointSlice list, %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

return
}

if len(s.endpointSliceWatchCh) == 0 {
// watch request
timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds")
timeout := time.Minute
if timeoutSecondsStr != "" {
timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr))
}

timer := time.NewTimer(timeout)
defer timer.Stop()

flusher, ok := w.(http.Flusher)
if !ok {
klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

e := restclientwatch.NewEncoder(
streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w),
scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)),
encoder)
if info.MediaType == runtime.ContentTypeProtobuf {
w.Header().Set("Content-Type", runtime.ContentTypeProtobuf+";stream=watch")
} else {
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
}
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
for {
select {
case <-r.Context().Done():
return
case <-timer.C:
return
case evt := <-s.endpointSliceV1Beta1WatchCh:
klog.V(4).Infof("Send endpointSlice watch event: %+#v", evt)
err := e.Encode(&evt)
if err != nil {
klog.Errorf("can't encode watch event, %v", err)
return
}
if len(s.endpointSliceV1WatchCh) == 0 {
flusher.Flush()
}
}
Expand Down
Loading

0 comments on commit e81d211

Please sign in to comment.