Skip to content

Commit

Permalink
Merge pull request superedge#86 from chenkaiyue/endpoint-update
Browse files Browse the repository at this point in the history
Edge autonomy enhancement: Endpoints can be updated when disconnect with apiserver
  • Loading branch information
chenkaiyue authored Apr 27, 2021
2 parents 2e0056f + eac4350 commit 8a12b22
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 60 deletions.
27 changes: 17 additions & 10 deletions cmd/application-grid-wrapper/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ import (
)

type Options struct {
CAFile string
KeyFile string
CertFile string
KubeConfig string
BindAddress string
InsecureMode bool
HostName string
WrapperInCluster bool
NotifyChannelSize int
Debug bool
CAFile string
KeyFile string
CertFile string
KubeConfig string
BindAddress string
InsecureMode bool
HostName string
WrapperInCluster bool
NotifyChannelSize int
Debug bool
ServiceAutonomyEnhancementOption ServiceAutonomyEnhancementOptions
}

func NewGridWrapperOptions() *Options {
Expand All @@ -39,6 +40,11 @@ func NewGridWrapperOptions() *Options {
InsecureMode: true,
NotifyChannelSize: 100,
WrapperInCluster: true,
ServiceAutonomyEnhancementOption: ServiceAutonomyEnhancementOptions{
Enabled: false,
UpdateInterval: 30,
NeighborStatusSvc: "http://localhost:51005/localinfo",
},
}
}

Expand All @@ -56,4 +62,5 @@ func (sc *Options) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&sc.NotifyChannelSize, "notify-channel-size", sc.NotifyChannelSize,
"channel size for service and endpoints sent")
fs.BoolVar(&sc.Debug, "debug", sc.Debug, "enable pprof handler")
fs.Var(&sc.ServiceAutonomyEnhancementOption, "service-autonomy-enhancement", "service-autonomy-enhancement")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package options

import (
"fmt"
"strconv"
"strings"
)

type ServiceAutonomyEnhancementOptions struct {
Enabled bool
NeighborStatusSvc string
UpdateInterval int
}

func (p ServiceAutonomyEnhancementOptions) Name() string {
return "ServiceAutonomyEnhancement"
}

func (p *ServiceAutonomyEnhancementOptions) Set(s string) error {
var err error

for _, para := range strings.Split(s, ",") {
if len(para) == 0 {
continue
}
arr := strings.Split(para, "=")
trimkey := strings.TrimSpace(arr[0])
switch trimkey {
case "address":
(*p).NeighborStatusSvc = strings.TrimSpace(arr[1])
case "interval":
interval, _ := strconv.Atoi(strings.TrimSpace(arr[1]))
(*p).UpdateInterval = interval
case "enabled":
enabled, _ := strconv.ParseBool(strings.TrimSpace(arr[1]))
(*p).Enabled = enabled
}
}
return err
}

func (p *ServiceAutonomyEnhancementOptions) String() string {
return fmt.Sprintf("%#v", *p)
}

func (i *ServiceAutonomyEnhancementOptions) Type() string {
return "ServiceAutonomyEnhancement"
}
5 changes: 3 additions & 2 deletions cmd/application-grid-wrapper/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ func NewWrapperProxyCommand() *cobra.Command {
verflag.PrintAndExitIfRequested()
util.PrintFlags(cmd.Flags())

server := server.NewInterceptorServer(o.KubeConfig, o.HostName, o.WrapperInCluster, o.NotifyChannelSize)
server := server.NewInterceptorServer(o.KubeConfig, o.HostName, o.WrapperInCluster,
o.NotifyChannelSize, o.ServiceAutonomyEnhancementOption)
if server == nil {
return
}

if err := server.Run(o.Debug, o.BindAddress, o.InsecureMode,
o.CAFile, o.CertFile, o.KeyFile); err != nil {
o.CAFile, o.CertFile, o.KeyFile, o.ServiceAutonomyEnhancementOption); err != nil {
klog.Errorf("fail to start server, %v", err)
return
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/application-grid-controller/apis/superedge.io/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ type DeploymentGrid struct {
}

type DeploymentGridSpec struct {
GridUniqKey string `json:"gridUniqKey,omitempty"`
Template appv1.DeploymentSpec `json:"template,omitempty"`
TemplatePool map[string]appv1.DeploymentSpec `json:"templatePool,omitempty"`
Templates map[string]string `json:"templates,omitempty"`
DefaultTemplateName string `json:"defaultTemplateName,omitempty"`
AutoDeleteUnusedTemplate bool `json:"autoDeleteUnusedTemplate,omitempty"`
GridUniqKey string `json:"gridUniqKey,omitempty"`
Template appv1.DeploymentSpec `json:"template,omitempty"`
TemplatePool map[string]appv1.DeploymentSpec `json:"templatePool,omitempty"`
Templates map[string]string `json:"templates,omitempty"`
DefaultTemplateName string `json:"defaultTemplateName,omitempty"`
AutoDeleteUnusedTemplate bool `json:"autoDeleteUnusedTemplate,omitempty"`
}

type DeploymentGridStatusList struct {
Expand All @@ -73,12 +73,12 @@ type StatefulSetGrid struct {
}

type StatefulSetGridSpec struct {
GridUniqKey string `json:"gridUniqKey,omitempty"`
Template appv1.StatefulSetSpec `json:"template,omitempty"`
TemplatePool map[string]appv1.StatefulSetSpec `json:"templatePool,omitempty"`
Templates map[string]string `json:"templates,omitempty"`
DefaultTemplateName string `json:"defaultTemplateName,omitempty"`
AutoDeleteUnusedTemplate bool `json:"autoDeleteUnusedTemplate,omitempty"`
GridUniqKey string `json:"gridUniqKey,omitempty"`
Template appv1.StatefulSetSpec `json:"template,omitempty"`
TemplatePool map[string]appv1.StatefulSetSpec `json:"templatePool,omitempty"`
Templates map[string]string `json:"templates,omitempty"`
DefaultTemplateName string `json:"defaultTemplateName,omitempty"`
AutoDeleteUnusedTemplate bool `json:"autoDeleteUnusedTemplate,omitempty"`
}

type StatefulSetGridStatusList struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/application-grid-controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ func (c *ControllerConfig) Run(stop <-chan struct{}) {
go c.DeploymentInformer.Informer().Run(stop)
go c.StatefulSetInformer.Informer().Run(stop)
go c.NodeInformer.Informer().Run(stop)
}
}
75 changes: 63 additions & 12 deletions pkg/application-grid-wrapper/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"github.com/superedge/superedge/cmd/application-grid-wrapper/app/options"
"github.com/superedge/superedge/pkg/edge-health/data"
"io/ioutil"
"k8s.io/apimachinery/pkg/util/wait"
"net/http"
"time"

Expand All @@ -44,14 +48,15 @@ import (
)

type interceptorServer struct {
restConfig *rest.Config
cache storage.Cache
serviceWatchCh <-chan watch.Event
endpointsWatchCh <-chan watch.Event
mediaSerializer []runtime.SerializerInfo
restConfig *rest.Config
cache storage.Cache
serviceWatchCh <-chan watch.Event
endpointsWatchCh <-chan watch.Event
mediaSerializer []runtime.SerializerInfo
serviceAutonomyEnhancementAddress string
}

func NewInterceptorServer(kubeconfig string, hostName string, wrapperInCluster bool, channelSize int) *interceptorServer {
func NewInterceptorServer(kubeconfig string, hostName string, wrapperInCluster bool, channelSize int, serviceAutonomyEnhancement options.ServiceAutonomyEnhancementOptions) *interceptorServer {
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
klog.Errorf("can't build rest config, %v", err)
Expand All @@ -62,17 +67,18 @@ func NewInterceptorServer(kubeconfig string, hostName string, wrapperInCluster b
endpointsCh := make(chan watch.Event, channelSize)

server := &interceptorServer{
restConfig: restConfig,
cache: storage.NewStorageCache(hostName, wrapperInCluster, serviceCh, endpointsCh),
serviceWatchCh: serviceCh,
endpointsWatchCh: endpointsCh,
mediaSerializer: scheme.Codecs.SupportedMediaTypes(),
restConfig: restConfig,
cache: storage.NewStorageCache(hostName, wrapperInCluster, serviceAutonomyEnhancement.Enabled, serviceCh, endpointsCh),
serviceWatchCh: serviceCh,
endpointsWatchCh: endpointsCh,
mediaSerializer: scheme.Codecs.SupportedMediaTypes(),
serviceAutonomyEnhancementAddress: serviceAutonomyEnhancement.NeighborStatusSvc,
}

return server
}

func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {
func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string, serviceAutonomyEnhancement options.ServiceAutonomyEnhancementOptions) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -82,6 +88,12 @@ func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, c
return err
}

klog.Infof("Start to run GetLocalInfo client")

if serviceAutonomyEnhancement.Enabled {
go wait.Until(s.NodeStatusAcquisition, time.Duration(serviceAutonomyEnhancement.UpdateInterval)*time.Second, ctx.Done())
}

klog.Infof("Start to run interceptor server")
/* filter
*/
Expand Down Expand Up @@ -178,3 +190,42 @@ func (s *interceptorServer) buildFilterChains(debug bool) http.Handler {

return handler
}

func (s *interceptorServer) NodeStatusAcquisition() {
client := http.Client{Timeout: 5 * time.Second}
klog.V(4).Infof("serviceAutonomyEnhancementAddress is %s", s.serviceAutonomyEnhancementAddress)
req, err := http.NewRequest("GET", s.serviceAutonomyEnhancementAddress, nil)
if err != nil {
s.cache.ClearLocalNodeInfo()
klog.Errorf("Get local node info: new request err: %v", err)
return
}

res, err := client.Do(req)
if err != nil {
s.cache.ClearLocalNodeInfo()
klog.Errorf("Get local node info: do request err: %v", err)
return
}
defer func() {
if res != nil {
res.Body.Close()
}
}()

if res.StatusCode != http.StatusOK {
klog.Errorf("Get local node info: httpResponse.StatusCode!=200, is %d", res.StatusCode)
s.cache.ClearLocalNodeInfo()
return
}

var localNodeInfo map[string]data.ResultDetail
if err := json.NewDecoder(res.Body).Decode(&localNodeInfo); err != nil {
klog.Errorf("Get local node info: Decode err: %v", err)
s.cache.ClearLocalNodeInfo()
return
} else {
s.cache.SetLocalNodeInfo(localNodeInfo)
}
return
}
63 changes: 48 additions & 15 deletions pkg/application-grid-wrapper/storage/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package storage

import (
"k8s.io/klog"
"sync"

"github.com/superedge/superedge/pkg/edge-health/data"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -28,14 +30,16 @@ import (

type storageCache struct {
// hostName is the nodeName of node which application-grid-wrapper deploys on
hostName string
wrapperInCluster bool
hostName string
wrapperInCluster bool
serviceAutonomyEnhancementEnabled bool

// mu lock protect the following map structure
mu sync.RWMutex
servicesMap map[types.NamespacedName]*serviceContainer
endpointsMap map[types.NamespacedName]*endpointsContainer
nodesMap map[types.NamespacedName]*nodeContainer
mu sync.RWMutex
servicesMap map[types.NamespacedName]*serviceContainer
endpointsMap map[types.NamespacedName]*endpointsContainer
nodesMap map[types.NamespacedName]*nodeContainer
localNodeInfo map[string]data.ResultDetail

// service watch channel
serviceChan chan<- watch.Event
Expand Down Expand Up @@ -63,15 +67,17 @@ type endpointsContainer struct {

var _ Cache = &storageCache{}

func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache {
func NewStorageCache(hostName string, wrapperInCluster, serviceAutonomyEnhancementEnabled bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache {
msc := &storageCache{
hostName: hostName,
wrapperInCluster: wrapperInCluster,
servicesMap: make(map[types.NamespacedName]*serviceContainer),
endpointsMap: make(map[types.NamespacedName]*endpointsContainer),
nodesMap: make(map[types.NamespacedName]*nodeContainer),
serviceChan: serviceNotifier,
endpointsChan: endpointsNotifier,
hostName: hostName,
wrapperInCluster: wrapperInCluster,
serviceAutonomyEnhancementEnabled: serviceAutonomyEnhancementEnabled,
servicesMap: make(map[types.NamespacedName]*serviceContainer),
endpointsMap: make(map[types.NamespacedName]*endpointsContainer),
nodesMap: make(map[types.NamespacedName]*nodeContainer),
serviceChan: serviceNotifier,
endpointsChan: endpointsNotifier,
localNodeInfo: make(map[string]data.ResultDetail),
}

return msc
Expand Down Expand Up @@ -135,11 +141,38 @@ func (sc *storageCache) GetNode(hostName string) *v1.Node {
return nil
}

func (sc *storageCache) GetLocalNodeInfo() map[string]data.ResultDetail {
sc.mu.RLock()
defer sc.mu.RUnlock()
return sc.localNodeInfo
}

func (sc *storageCache) ClearLocalNodeInfo() {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.localNodeInfo = make(map[string]data.ResultDetail)
}

func (sc *storageCache) SetLocalNodeInfo(info map[string]data.ResultDetail) {
klog.V(4).Infof("Set local node info %#v", info)
sc.mu.Lock()
sc.localNodeInfo = info

// update endpoints
changedEps := sc.rebuildEndpointsMap()

sc.mu.Unlock()

for _, eps := range changedEps {
sc.endpointsChan <- eps
}
}

// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
func (sc *storageCache) rebuildEndpointsMap() []watch.Event {
evts := make([]watch.Event, 0)
for name, endpointsContainer := range sc.endpointsMap {
newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)
newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.localNodeInfo, sc.wrapperInCluster, sc.serviceAutonomyEnhancementEnabled)
if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/application-grid-wrapper/storage/endpoints_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (eh *endpointsHandler) add(endpoints *v1.Endpoints) {

endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
klog.Infof("Adding endpoints %v", endpointsKey)
newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster)
newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.localNodeInfo, sc.wrapperInCluster, sc.serviceAutonomyEnhancementEnabled)
sc.endpointsMap[endpointsKey] = &endpointsContainer{
endpoints: endpoints,
modified: newEps,
Expand Down Expand Up @@ -63,7 +63,7 @@ func (eh *endpointsHandler) update(endpoints *v1.Endpoints) {
return
}
endpointsContainer.endpoints = endpoints
newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster)
newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.localNodeInfo, sc.wrapperInCluster, sc.serviceAutonomyEnhancementEnabled)
changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps)
if changed {
endpointsContainer.modified = newEps
Expand Down
Loading

0 comments on commit 8a12b22

Please sign in to comment.