Skip to content

Commit

Permalink
Fix: Interdomain NSM: BulkRegisterNSE registers endpoints with the wr…
Browse files Browse the repository at this point in the history
…ong IP to NSMRS (networkservicemesh#2168)

* Potential fix for issue 2142

Signed-off-by: denis-tingajkin <[email protected]>

* add test for cover

Signed-off-by: denis-tingajkin <[email protected]>

* fix gke script issue

Signed-off-by: denis-tingajkin <[email protected]>

* fix nsc name

Signed-off-by: denis-tingajkin <[email protected]>
  • Loading branch information
denis-tingaikin authored Jun 29, 2020
1 parent d4855f5 commit 99fc16c
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 10 deletions.
6 changes: 5 additions & 1 deletion controlplane/pkg/nsmd/serviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/networkservicemesh/networkservicemesh/utils"

"github.com/pkg/errors"

"github.com/sirupsen/logrus"
Expand All @@ -33,6 +35,8 @@ const (
// NsmDevicePluginEnv is the name of the env variable to configure enabled device plugin name
NsmDevicePluginEnv = "NSM_DEVICE_PLUGIN"
registryConnectTimeout = time.Second * 30
// PublicAPIAddressEnv sets nsmd public API address
PublicAPIAddressEnv utils.EnvVar = "NSMD_PUBLIC_API"
)

type apiRegistry struct {
Expand Down Expand Up @@ -147,7 +151,7 @@ func (impl *nsmdServiceRegistry) NsmRegistryClient(ctx context.Context) (registr
}

func (impl *nsmdServiceRegistry) GetPublicAPI() string {
return GetLocalIPAddress() + ":5001"
return PublicAPIAddressEnv.GetStringOrDefault(GetLocalIPAddress() + ":5001")
}

func (impl *nsmdServiceRegistry) DiscoveryClient(ctx context.Context) (registry.NetworkServiceDiscoveryClient, error) {
Expand Down
18 changes: 18 additions & 0 deletions k8s/pkg/proxyregistryserver/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,24 @@ func (rs *nseRegistryService) BulkRegisterNSE(srv registry.NetworkServiceRegistr
return err
}

nodeConfiguration, cErr := rs.clusterInfoService.GetNodeIPConfiguration(span.Context(), &clusterinfo.NodeIPConfiguration{NodeName: request.NetworkServiceManager.Name})
if cErr != nil {
err = errors.Wrapf(cErr, "cannot get Network Service Manager's IP address: %s", cErr)
logger.Errorf("%s: %v", NSRegistryForwarderLogPrefix, err)
return err
}

externalIP := nodeConfiguration.ExternalIP
if externalIP == "" {
externalIP = nodeConfiguration.InternalIP
}
// Swapping IP address to external (keep port)
url := request.NetworkServiceManager.Url
if idx := strings.Index(url, ":"); idx > -1 {
externalIP += url[idx:]
}
request.NetworkServiceManager.Url = externalIP

logger.Infof("%s: Forward BulkRegisterNSE request: %v", NSRegistryForwarderLogPrefix, request)
err = stream.Send(request)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion scripts/gke/install-gcloud-sdk.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ CLOUD_SDK_REPO="cloud-sdk-stretch"
export CLOUD_SDK_REPO
echo "deb http://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add -
apt-get update -y && apt-get install google-cloud-sdk -y
apt-get update -y --allow-unauthenticated && apt-get install google-cloud-sdk -y --allow-unauthenticated
122 changes: 122 additions & 0 deletions test/integration/floating_interdomain_few_clients_with_delay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build interdomain

package integration

import (
"fmt"
"os"
"testing"
"time"

"github.com/networkservicemesh/networkservicemesh/applications/nsmrs/pkg/serviceregistryserver"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/nsmd"

. "github.com/onsi/gomega"

"github.com/networkservicemesh/networkservicemesh/k8s/pkg/proxyregistryserver"
"github.com/networkservicemesh/networkservicemesh/test/kubetest"
"github.com/networkservicemesh/networkservicemesh/test/kubetest/pods"
)

func TestFloatingInterdomainFewClientsWithDelay(t *testing.T) {
if testing.Short() {
t.Skip("Skip, please run without -short")
return
}

g := NewWithT(t)
clustersCount := 2
k8ss := []*kubetest.ExtK8s{}
for i := 0; i < clustersCount; i++ {
kubeconfig := os.Getenv(fmt.Sprintf("KUBECONFIG_CLUSTER_%d", i+1))
g.Expect(len(kubeconfig)).ToNot(Equal(0))

k8s, err := kubetest.NewK8sForConfig(g, true, kubeconfig)
g.Expect(err).To(BeNil())

defer k8s.Cleanup()
defer k8s.SaveTestArtifacts(t)

k8ss = append(k8ss, &kubetest.ExtK8s{
K8s: k8s,
NodesSetup: nil,
})
}

nsmrsNode := &k8ss[clustersCount-1].K8s.GetNodesWait(2, defaultTimeout)[1]
nsmrsPod := kubetest.DeployNSMRS(k8ss[clustersCount-1].K8s, nsmrsNode, "nsmrs", defaultTimeout, map[string]string{
serviceregistryserver.NSEExpirationTimeoutEnv.Name(): "30s",
})

nsmrsExternalIP, err := kubetest.GetNodeExternalIP(nsmrsNode)
if err != nil {
nsmrsExternalIP, err = kubetest.GetNodeInternalIP(nsmrsNode)
g.Expect(err).To(BeNil())
}
nsmrsInternalIP, err := kubetest.GetNodeInternalIP(nsmrsNode)
g.Expect(err).To(BeNil())

for i := 0; i < clustersCount; i++ {
k8s := k8ss[i].K8s

nodesSetup, err := kubetest.SetupNodesConfig(k8s, 1, defaultTimeout, []*pods.NSMgrPodConfig{
{
Variables: map[string]string{
nsmd.NSETrackingIntervalSecondsEnv.Name(): "10s",
nsmd.PublicAPIAddressEnv.Name(): "127.0.0.1:5001",
},
Namespace: k8s.GetK8sNamespace(),
ForwarderVariables: kubetest.DefaultForwarderVariables(k8s.GetForwardingPlane()),
},
}, k8s.GetK8sNamespace())
g.Expect(err).To(BeNil())

k8ss[i].NodesSetup = nodesSetup

pnsmdName := fmt.Sprintf("pnsmgr-%s", nodesSetup[0].Node.Name)
proxyNSMgrConfig := &pods.NSMgrPodConfig{
Variables: pods.DefaultProxyNSMD(),
Namespace: k8s.GetK8sNamespace(),
}
proxyNSMgrConfig.Variables[proxyregistryserver.NSMRSAddressEnv] = nsmrsInternalIP + ":80"
proxyNSMgrConfig.Variables[nsmd.PublicAPIAddressEnv.Name()] = "127.0.0.1:5001"
_, err = kubetest.DeployProxyNSMgrWithConfig(k8s, nodesSetup[0].Node, pnsmdName, defaultTimeout, proxyNSMgrConfig)
g.Expect(err).To(BeNil())

serviceCleanup := kubetest.RunProxyNSMgrService(k8s)
defer serviceCleanup()
}

_ = kubetest.DeployICMP(k8ss[clustersCount-1].K8s, k8ss[clustersCount-1].NodesSetup[0].Node, "icmp-responder-nse-1", defaultTimeout)
k8ss[clustersCount-1].K8s.WaitLogsContains(nsmrsPod, "nsmrs", "Registered NSE entry", defaultTimeout)

nsc1 := kubetest.DeployNSCWithEnv(k8ss[0].K8s, k8ss[0].NodesSetup[0].Node, "nsc-1", defaultTimeout, map[string]string{
"CLIENT_LABELS": "app=icmp",
"CLIENT_NETWORK_SERVICE": fmt.Sprintf("icmp-responder@%s", nsmrsExternalIP),
})

kubetest.CheckNSC(k8ss[0].K8s, nsc1)
<-time.After(time.Second * 35)

nsc2 := kubetest.DeployNSCWithEnv(k8ss[0].K8s, k8ss[0].NodesSetup[0].Node, "nsc-2", defaultTimeout, map[string]string{
"CLIENT_LABELS": "app=icmp",
"CLIENT_NETWORK_SERVICE": fmt.Sprintf("icmp-responder@%s", nsmrsExternalIP),
})
kubetest.CheckNSC(k8ss[0].K8s, nsc2)
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func testFloatingInterdomainMonitor(t *testing.T, killPod string) {
}

nsmrsNode := &k8ss[1].K8s.GetNodesWait(2, defaultTimeout)[1]
nsmrsPod := kubetest.DeployNSMRS(k8ss[1].K8s, nsmrsNode, "nsmrs", defaultTimeout)
nsmrsPod := kubetest.DeployNSMRS(k8ss[1].K8s, nsmrsNode, "nsmrs", defaultTimeout, pods.DefaultNSMRS())

nsmrsExternalIP, err := kubetest.GetNodeExternalIP(nsmrsNode)
if err != nil {
Expand Down Expand Up @@ -105,12 +105,12 @@ func testFloatingInterdomainMonitor(t *testing.T, killPod string) {
switch killPod {
case "nsmrs":
k8ss[1].K8s.DeletePods(nsmrsPod)
nsmrsPod := kubetest.DeployNSMRS(k8ss[1].K8s, nsmrsNode, "nsmrs-recovered", defaultTimeout)
nsmrsPod := kubetest.DeployNSMRS(k8ss[1].K8s, nsmrsNode, "nsmrs-recovered", defaultTimeout, pods.DefaultNSMRS())
k8ss[1].K8s.WaitLogsContains(nsmrsPod, "nsmrs", "Registered NSE entry", defaultTimeout)
case "proxy-nsmgr":
k8ss[0].K8s.DeletePods(proxyNSMGRPod)
k8ss[1].K8s.DeletePods(nsmrsPod)
nsmrsPod := kubetest.DeployNSMRS(k8ss[1].K8s, nsmrsNode, "nsmrs", defaultTimeout)
nsmrsPod := kubetest.DeployNSMRS(k8ss[1].K8s, nsmrsNode, "nsmrs", defaultTimeout, pods.DefaultNSMRS())
startProxyNSMGRPod(g, pnsmdName+"-recovered", k8ss[0].K8s, k8ss[0].NodesSetup, nsmrsExternalIP)
k8ss[1].K8s.WaitLogsContains(nsmrsPod, "nsmrs", "Registered NSE entry", defaultTimeout)
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/interdomain_floating_interdomain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func testFloatingInterdomain(t *testing.T, clustersCount int) {
}

nsmrsNode := &k8ss[clustersCount-1].K8s.GetNodesWait(2, defaultTimeout)[1]
nsmrsPod := kubetest.DeployNSMRS(k8ss[clustersCount-1].K8s, nsmrsNode, "nsmrs", defaultTimeout)
nsmrsPod := kubetest.DeployNSMRS(k8ss[clustersCount-1].K8s, nsmrsNode, "nsmrs", defaultTimeout, pods.DefaultNSMRS())

nsmrsExternalIP, err := kubetest.GetNodeExternalIP(nsmrsNode)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions test/kubetest/pods/nsmrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type NSMRSPodConfig struct {
}

// NSMRSPod - create NSMRS pod with default config
func NSMRSPod(name string, node *v1.Node) *v1.Pod {
func NSMRSPod(name string, node *v1.Node, variables map[string]string) *v1.Pod {
return NSMRSPodWithConfig(name, node, &NSMgrPodConfig{
Variables: DefaultNSMRS(),
Variables: variables,
})
}

Expand Down
4 changes: 2 additions & 2 deletions test/kubetest/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ func RunProxyNSMgrService(k8s *K8s) func() {
}

// DeployNSMRS - Setup NSMRS on Cluster with default config
func DeployNSMRS(k8s *K8s, node *v1.Node, name string, timeout time.Duration) *v1.Pod {
func DeployNSMRS(k8s *K8s, node *v1.Node, name string, timeout time.Duration, variables map[string]string) *v1.Pod {
return deployNSMRS(k8s, nodeName(node), name, timeout,
pods.NSMRSPod(name, node),
pods.NSMRSPod(name, node, variables),
)
}

Expand Down

0 comments on commit 99fc16c

Please sign in to comment.