Skip to content

Commit

Permalink
refactor(orchestrator): edas executor (erda-project#6102)
Browse files Browse the repository at this point in the history
Signed-off-by: iutx <[email protected]>
  • Loading branch information
iutx authored Oct 25, 2023
1 parent eafea91 commit 0dd5da4
Show file tree
Hide file tree
Showing 20 changed files with 2,539 additions and 2,212 deletions.
2,178 changes: 205 additions & 1,973 deletions internal/tools/orchestrator/scheduler/executor/plugins/edas/edas.go

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/internal/tools/orchestrator/scheduler/events"
Expand All @@ -32,12 +31,14 @@ import (
)

func (e *EDAS) registerEventChanAndLocalStore(evCh chan *eventtypes.StatusEvent, stopCh chan struct{}, lstore *sync.Map) {
l := e.l.WithField("func", "registerEventChanAndLocalStore")

o11 := discover.Orchestrator()
eventAddr := "http://" + o11 + "/api/events/runtimes/actions/sync"

name := string(e.name)

logrus.Infof("edas registerEventChanAndLocalStore, name: %s", name)
l.Infof("edas registerEventChanAndLocalStore, name: %s", name)

// watch event handler for a specific etcd directory
syncRuntimeToLstore := func(key string, value interface{}, t storetypes.ChangeType) error {
Expand Down Expand Up @@ -72,19 +73,19 @@ func (e *EDAS) registerEventChanAndLocalStore(evCh chan *eventtypes.StatusEvent,
lstore.Store(runtimeName, *run)

default:
logrus.Errorf("unknown store type, try to skip, type: %s, name: %s", t, runtimeName)
l.Errorf("unknown store type, try to skip, type: %s, name: %s", t, runtimeName)
return nil
}

if strings.Contains(name, "EDAS") {
logrus.Infof("edas executor(%s) lstore stored key: %s", name, key)
l.Infof("edas executor(%s) lstore stored key: %s", name, key)
}
return nil
}

// Correspond the name of the registered executor and its event channel
getEvChanFn := func(executorName executortypes.Name) (chan *eventtypes.StatusEvent, chan struct{}, *sync.Map, error) {
logrus.Infof("in RegisterEvChan executor(%s)", name)
l.Infof("in RegisterEvChan executor(%s)", name)
if string(executorName) == name {
return evCh, stopCh, lstore, nil
}
Expand All @@ -96,10 +97,12 @@ func (e *EDAS) registerEventChanAndLocalStore(evCh chan *eventtypes.StatusEvent,

// Currently edas does not use an event-driven mechanism, so it uses polling to simulate each time
func (e *EDAS) WaitEvent(lstore *sync.Map, stopCh chan struct{}) {
l := e.l.WithField("func", "WaitEvent")

o11 := discover.Orchestrator()
eventAddr := "http://" + o11 + "/api/events/runtimes/actions/sync"

logrus.Infof("executor(%s) in waitEvent", e.name)
l.Infof("executor(%s) in waitEvent", e.name)

initStore := func(k string, v interface{}) error {
reKey := etcdKeyToMapKey(k)
Expand All @@ -117,7 +120,7 @@ func (e *EDAS) WaitEvent(lstore *sync.Map, stopCh chan struct{}) {

em := events.GetEventManager()
if err := em.MemEtcdStore.ForEach(context.Background(), "/dice/service/", apistructs.ServiceGroup{}, initStore); err != nil {
logrus.Errorf("executor(%s) foreach initStore error: %v", e.name, err)
l.Errorf("executor(%s) foreach initStore error: %v", e.name, err)
}

keys := make([]string, 0)
Expand All @@ -139,7 +142,7 @@ func (e *EDAS) WaitEvent(lstore *sync.Map, stopCh chan struct{}) {
go func() {
start := time.Now()
defer func() {
logrus.Infof("edas executor(%s) get status for key(%s) took %v", e.name, k.(string), time.Since(start))
l.Infof("edas executor(%s) get status for key(%s) took %v", e.name, k.(string), time.Since(start))
}()
_, err = e.Status(ctx, r)
c <- struct{}{}
Expand All @@ -150,7 +153,7 @@ func (e *EDAS) WaitEvent(lstore *sync.Map, stopCh chan struct{}) {
select {
case <-c:
if err != nil {
logrus.Errorf("executor(%s)'s key(%s) for edas get status error: %v", e.name, k, err)
l.Errorf("executor(%s)'s key(%s) for edas get status error: %v", e.name, k, err)
return true
}

Expand Down Expand Up @@ -190,21 +193,21 @@ func (e *EDAS) WaitEvent(lstore *sync.Map, stopCh chan struct{}) {
return true

case <-ctx.Done():
logrus.Errorf("executor(%s)'s key(%s) get status timeout", e.name, k)
l.Errorf("executor(%s)'s key(%s) get status timeout", e.name, k)
return true
}
}

for {
select {
case <-stopCh:
logrus.Errorf("edas executor(%s) got stop chan message", e.name)
l.Errorf("edas executor(%s) got stop chan message", e.name)
return
case <-time.After(10 * time.Second):
lstore.Range(f)
}

logrus.Infof("executor(%s) edas keys list: %v", e.name, keys)
l.Infof("executor(%s) edas keys list: %v", e.name, keys)
keys = nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,20 @@
package edas

import (
"context"
"net/http"
"reflect"
"testing"

"bou.ke/monkey"
api "github.com/aliyun/alibaba-cloud-sdk-go/services/edas"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
fakeclientset "k8s.io/client-go/kubernetes/fake"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/edas/types"
)

func Test_Set_Annotations(t *testing.T) {
type args struct {
name string
envs map[string]string
svcSpec *ServiceSpec
svcSpec *types.ServiceSpec
wantErr bool
}

svc := &ServiceSpec{
svc := &types.ServiceSpec{
Name: "fake-app",
Image: "busybox",
}
Expand Down Expand Up @@ -74,160 +63,3 @@ func Test_Set_Annotations(t *testing.T) {
})
}
}

func initClient() *api.Client {
c := &api.Client{}
monkey.PatchInstanceMethod(reflect.TypeOf(c), "StopK8sApplication", func(c *api.Client,
request *api.StopK8sApplicationRequest) (response *api.StopK8sApplicationResponse, err error) {
return &api.StopK8sApplicationResponse{
Code: http.StatusOK,
}, nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(c), "DeleteK8sApplication", func(c *api.Client,
request *api.DeleteK8sApplicationRequest) (response *api.DeleteK8sApplicationResponse, err error) {
return &api.DeleteK8sApplicationResponse{
Code: http.StatusOK,
}, nil
})
return c
}

func TestDeleteAppByID(t *testing.T) {
c := initClient()
e := EDAS{
addr: "cn-hangzhou",
client: c,
}

defer monkey.UnpatchAll()

if err := e.deleteAppByID("app1"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

func TestStopAppByID(t *testing.T) {
c := initClient()
e := EDAS{
addr: "cn-hangzhou",
client: c,
}

defer monkey.UnpatchAll()

if err := e.stopAppByID("app1"); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

func TestStatus(t *testing.T) {
fc := fakeclientset.NewSimpleClientset(&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "service-1-app-demo",
Namespace: defaultNamespace,
Labels: map[string]string{
"edas-domain": "edas-admin",
"edas.appname": "service-1-app-demo",
},
},
Status: appsv1.DeploymentStatus{
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: "True",
},
},
Replicas: 1,
ReadyReplicas: 1,
AvailableReplicas: 1,
UpdatedReplicas: 1,
},
})

e := EDAS{
cs: fc,
}

type args struct {
specObject interface{}
}

tests := []struct {
name string
args args
}{
{
name: "case1",
args: args{
specObject: apistructs.ServiceGroup{
Dice: apistructs.Dice{
ID: "1",
Type: "service",
Services: []apistructs.Service{
{
Name: "app-demo",
Image: "busybox",
},
},
},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// TODO: add more ut
_, err := e.Status(context.Background(), tt.args.specObject)
if err != nil {
t.Fatal(err)
}
})
}
}

func TestEDAS_GetDeploymentInfo(t *testing.T) {
// Create a FakeClient
client := fake.NewSimpleClientset()

// Define your test data
group := "test-group"
service := &apistructs.Service{
Name: "test-service",
}

// Create a test deployment
testDeployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
Namespace: defaultNamespace,
Labels: map[string]string{
"edas-domain": "edas-admin",
"edas.oam.acname": composeEDASAppNameWithGroup(group, service.Name),
},
},
}

// Add the test deployment to the FakeClient
_, _ = client.AppsV1().Deployments(defaultNamespace).Create(context.Background(), testDeployment, metav1.CreateOptions{})

// Create an instance of your EDAS struct
edas := &EDAS{
cs: client,
}

// Test the function
deployment, err := edas.getDeploymentInfo(group, service)
assert.NoError(t, err)
assert.NotNil(t, deployment)
assert.Equal(t, testDeployment.Name, deployment.Name)

// Test case when deployment is not found
nonExistentService := &apistructs.Service{
Name: "non-existent-service",
}
deployment, err = edas.getDeploymentInfo(group, nonExistentService)
assert.Error(t, err)
assert.Nil(t, deployment)
assert.Contains(t, err.Error(), "failed to find deployment")
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package edas
package types

import (
"strings"
Expand Down

This file was deleted.

Loading

0 comments on commit 0dd5da4

Please sign in to comment.