Skip to content

Commit

Permalink
fix: Interdomain NSM: client and endpoint on the same cluster node fa…
Browse files Browse the repository at this point in the history
…ils (networkservicemesh#2170)

* Fix: Interdomain NSM: client and endpoint on the same cluster node fails

Signed-off-by: Denis Tingajkin <[email protected]>

* fix linter issues

Signed-off-by: Denis Tingajkin <[email protected]>
  • Loading branch information
denis-tingaikin authored Jul 21, 2020
1 parent 4dd079f commit 804ad50
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 4 deletions.
58 changes: 56 additions & 2 deletions k8s/pkg/proxyregistryserver/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os"
"strings"

"github.com/pkg/errors"

utils "github.com/networkservicemesh/networkservicemesh/utils/interdomain"

"github.com/sirupsen/logrus"
Expand All @@ -27,12 +29,14 @@ const (
type discoveryService struct {
cache registryserver.RegistryCache
clusterInfoService clusterinfo.ClusterInfoServer
nodeName string
}

func newDiscoveryService(cache registryserver.RegistryCache, clusterInfoService clusterinfo.ClusterInfoServer) *discoveryService {
return &discoveryService{
cache: cache,
clusterInfoService: clusterInfoService,
nodeName: os.Getenv("NODE_NAME"),
}
}

Expand Down Expand Up @@ -66,16 +70,18 @@ func (d *discoveryService) FindNetworkService(ctx context.Context, request *regi
if dErr != nil {
return nil, dErr
}

for _, nsm := range response.NetworkServiceManagers {
if url, urlErr := d.currentDomainNSMgrURL(ctx, d.clusterInfoService, nsm.Url); urlErr == nil && nsm.Url == url {
return d.handleLocalFindCase(response, url), nil
}
nsm.Name = fmt.Sprintf("%s@%s", nsm.Name, nsm.Url)
nsmURL := os.Getenv(ProxyNsmdAPIAddressEnv)
if strings.TrimSpace(nsmURL) == "" {
nsmURL = ProxyNsmdAPIAddressDefaults
}
nsm.Url = nsmURL
response.NetworkService.Name = originNetworkService
}
response.NetworkService.Name = originNetworkService

logrus.Infof("Received response: %v", response)
return response, nil
Expand Down Expand Up @@ -108,3 +114,51 @@ func (d *discoveryService) FindNetworkService(ctx context.Context, request *regi
}
return response, err
}

func (d *discoveryService) handleLocalFindCase(r *registry.FindNetworkServiceResponse, url string) *registry.FindNetworkServiceResponse {
logrus.Infof("Handle local node case for %v, url: %v", r, url)
var nsmgrs = make(map[string]*registry.NetworkServiceManager)
var endpoints []*registry.NetworkServiceEndpoint

for _, nsmgr := range r.NetworkServiceManagers {
if nsmgr.Url == url {
nsmgr.Name = d.nodeName
nsmgrs[nsmgr.Name] = nsmgr
}
}

r.NetworkServiceManagers = nsmgrs

normalizedURL := strings.ReplaceAll(url, ":", "_")

for _, nse := range r.NetworkServiceEndpoints {
if strings.Contains(nse.NetworkServiceManagerName, normalizedURL) {
nse.NetworkServiceManagerName = d.nodeName
endpoints = append(endpoints, nse)
}
}

r.NetworkServiceManagers = nsmgrs
r.NetworkServiceEndpoints = endpoints

return r
}

func (d *discoveryService) currentDomainNSMgrURL(ctx context.Context, clusterInfoService clusterinfo.ClusterInfoServer, u string) (string, error) {
nodeConfiguration, cErr := clusterInfoService.GetNodeIPConfiguration(ctx, &clusterinfo.NodeIPConfiguration{NodeName: d.nodeName})
if cErr != nil {
err := errors.Wrapf(cErr, "cannot get Network Service Manager's IP address: %s", cErr)
return "", err
}

externalIP := nodeConfiguration.ExternalIP
if externalIP == "" {
externalIP = nodeConfiguration.InternalIP
}

if idx := strings.Index(u, ":"); idx > -1 {
externalIP += u[idx:]
}

return externalIP, nil
}
96 changes: 96 additions & 0 deletions test/integration/interdomain_single_node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2020 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build basic

package integration

import (
"fmt"
"testing"

. "github.com/onsi/gomega"

"github.com/networkservicemesh/networkservicemesh/k8s/pkg/proxyregistryserver"
"github.com/networkservicemesh/networkservicemesh/test/kubetest"
"github.com/networkservicemesh/networkservicemesh/test/kubetest/pods"
)

func TestInterdomainSingleNode(t *testing.T) {
if testing.Short() {
t.Skip("Skip, please run without -short")
return
}

g := NewWithT(t)

k8ss := []*kubetest.ExtK8s{}
for i := 0; i < 1; i++ {

k8s, err := kubetest.NewK8s(g, true)
g.Expect(err).To(BeNil())

defer k8s.Cleanup()
defer k8s.SaveTestArtifacts(t)

k8ss = append(k8ss, &kubetest.ExtK8s{
K8s: k8s,
NodesSetup: nil,
})
}

nsmrsNode := &k8ss[0].K8s.GetNodesWait(2, defaultTimeout)[1]
nsmrsPod := kubetest.DeployNSMRS(k8ss[0].K8s, nsmrsNode, "nsmrs", defaultTimeout, pods.DefaultNSMRS())

nsmrsExternalIP, err := kubetest.GetNodeExternalIP(nsmrsNode)
if err != nil {
nsmrsExternalIP, err = kubetest.GetNodeInternalIP(nsmrsNode)
g.Expect(err).To(BeNil())
}
nsmrsInternalIP, err := kubetest.GetNodeInternalIP(nsmrsNode)
g.Expect(err).To(BeNil())

for i := 0; i < 1; i++ {
k8s := k8ss[i].K8s

nodesSetup, err := kubetest.SetupNodes(k8s, 1, defaultTimeout)
g.Expect(err).To(BeNil())

k8ss[i].NodesSetup = nodesSetup

pnsmdName := fmt.Sprintf("pnsmgr-%s", nodesSetup[0].Node.Name)
proxyNSMgrConfig := &pods.NSMgrPodConfig{
Variables: pods.DefaultProxyNSMD(),
Namespace: k8s.GetK8sNamespace(),
}
proxyNSMgrConfig.Variables[proxyregistryserver.NSMRSAddressEnv] = nsmrsInternalIP + ":80"
_, err = kubetest.DeployProxyNSMgrWithConfig(k8s, nodesSetup[0].Node, pnsmdName, defaultTimeout, proxyNSMgrConfig)
g.Expect(err).To(BeNil())

serviceCleanup := kubetest.RunProxyNSMgrService(k8s)
defer serviceCleanup()
}

_ = kubetest.DeployICMP(k8ss[0].K8s, k8ss[0].NodesSetup[0].Node, "icmp-responder-nse-1", defaultTimeout)
k8ss[0].K8s.WaitLogsContains(nsmrsPod, "nsmrs", "Registered NSE entry", defaultTimeout)

nscPodNode := kubetest.DeployNSCWithEnv(k8ss[0].K8s, k8ss[0].NodesSetup[0].Node, "nsc-1", defaultTimeout, map[string]string{
"CLIENT_LABELS": "app=icmp",
"CLIENT_NETWORK_SERVICE": fmt.Sprintf("icmp-responder@%s", nsmrsExternalIP),
})

kubetest.CheckNSC(k8ss[0].K8s, nscPodNode)
}
4 changes: 2 additions & 2 deletions test/kubetest/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,9 @@ func NewK8sWithoutRolesForConfig(g *WithT, prepare bool, kubeconfigPath string)

if prepare {
start := time.Now()
client.DeletePodsByName("nsmgr", "nsmd", "vppagent", "vpn", "icmp", "nsc", "source", "dest", "xcon", "spire-proxy", "nse", "prefix-service")
client.DeletePodsByName("nsmrs", "nsmgr", "nsmd", "vppagent", "vpn", "icmp", "nsc", "source", "dest", "xcon", "spire-proxy", "nse", "prefix-service")
client.CleanupCRDs()
client.CleanupServices("nsm-admission-webhook-svc", "jaeger")
client.CleanupServices("nsm-admission-webhook-svc", "jaeger", "pnsmgr")
client.CleanupDeployments()
client.CleanupMutatingWebhookConfigurations()
client.CleanupSecrets("nsm-admission-webhook-certs")
Expand Down
10 changes: 10 additions & 0 deletions test/kubetest/pods/proxynsmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ func ProxyNSMgrPodWithConfig(name string, node *v1.Node, config *NSMgrPodConfig)
ContainerPort: 5005,
},
},
Env: []v1.EnvVar{
{
Name: "NODE_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "spec.nodeName",
},
},
},
},
VolumeMounts: []v1.VolumeMount{spireVolumeMount()},
}),
},
Expand Down

0 comments on commit 804ad50

Please sign in to comment.