Skip to content

Commit

Permalink
Code duplicate cleanups (networkservicemesh#1823)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo authored and edwarnicke committed Nov 5, 2019
1 parent 8dcb3ff commit 7dd40ca
Show file tree
Hide file tree
Showing 30 changed files with 238 additions and 532 deletions.
5 changes: 3 additions & 2 deletions controlplane/pkg/api/nsm/nsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package nsm
import (
"time"

"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/properties"

"github.com/networkservicemesh/networkservicemesh/controlplane/api/networkservice"

"github.com/networkservicemesh/networkservicemesh/controlplane/api/connection"
"github.com/networkservicemesh/networkservicemesh/controlplane/api/crossconnect"
"github.com/networkservicemesh/networkservicemesh/controlplane/api/nsm"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/model"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/plugins"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/serviceregistry"
Expand Down Expand Up @@ -85,7 +86,7 @@ type MonitorManager interface {

//NetworkServiceManager - hold useful nsm structures
type NetworkServiceManager interface {
GetHealProperties() *nsm.Properties
GetHealProperties() *properties.Properties
WaitForForwarder(ctx context.Context, duration time.Duration) error
RemoteConnectionLost(ctx context.Context, clientConnection ClientConnection)
NotifyRenamedEndpoint(nseOldName, nseNewName string)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package local
package common

import (
"context"
Expand All @@ -24,23 +24,23 @@ import (

"github.com/networkservicemesh/networkservicemesh/controlplane/api/connection"
"github.com/networkservicemesh/networkservicemesh/controlplane/api/networkservice"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/common"
)

// CompositeNetworkService is the base service composition struct
type CompositeNetworkService struct {
services []networkservice.NetworkServiceServer
services []networkservice.NetworkServiceServer
factoryName string
}

// Request implements a dummy request handler
func (cns *CompositeNetworkService) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*connection.Connection, error) {
if len(cns.services) == 0 {
return request.Connection, nil
}
ctx = WithNext(ctx, &nextEndpoint{composite: cns, index: 0})
ctx = WithNext(ctx, &nextEndpoint{factoryName: cns.factoryName, composite: cns, index: 0})

if opentracing.IsGlobalTracerRegistered() {
ctx = common.WithOriginalSpan(ctx, spanhelper.GetSpanHelper(ctx))
ctx = WithOriginalSpan(ctx, spanhelper.GetSpanHelper(ctx))
}
return cns.services[0].Request(ctx, request)
}
Expand All @@ -55,8 +55,9 @@ func (cns *CompositeNetworkService) Close(ctx context.Context, connection *conne
}

// NewCompositeService creates a new composed endpoint
func NewCompositeService(services ...networkservice.NetworkServiceServer) networkservice.NetworkServiceServer {
func NewCompositeService(factoryName string, services ...networkservice.NetworkServiceServer) networkservice.NetworkServiceServer {
return &CompositeNetworkService{
services: services,
factoryName: factoryName,
services: services,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package local
package common

import (
"context"
Expand All @@ -23,18 +23,17 @@ import (
"github.com/networkservicemesh/networkservicemesh/controlplane/api/connection"
"github.com/networkservicemesh/networkservicemesh/controlplane/api/crossconnect"
"github.com/networkservicemesh/networkservicemesh/controlplane/api/networkservice"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/common"
)

// ConnectionService makes basic Mechanism selection for the incoming connection
type сrossConnectService struct {
}

func (cce *сrossConnectService) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*connection.Connection, error) {
logger := common.Log(ctx)
endpointConnection := common.EndpointConnection(ctx)
endpoint := common.Endpoint(ctx)
clientConnection := common.ModelConnection(ctx)
logger := Log(ctx)
endpointConnection := EndpointConnection(ctx)
endpoint := Endpoint(ctx)
clientConnection := ModelConnection(ctx)

if endpointConnection == nil || endpoint == nil || clientConnection == nil {
err := errors.Errorf("endpoint connection/Endpoint/ClientConnection should be specified with context")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package local
package common

import (
"github.com/golang/protobuf/ptypes/empty"
Expand All @@ -21,7 +21,6 @@ import (

"github.com/networkservicemesh/networkservicemesh/controlplane/api/connection"
"github.com/networkservicemesh/networkservicemesh/controlplane/api/networkservice"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/common"
"github.com/networkservicemesh/networkservicemesh/sdk/monitor/connectionmonitor"
)

Expand All @@ -37,7 +36,7 @@ func NewMonitorService(monitor connectionmonitor.MonitorServer) networkservice.N
}

func (srv *monitorService) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*connection.Connection, error) {
ctx = common.WithConnectionMonitor(ctx, srv.monitor)
ctx = WithConnectionMonitor(ctx, srv.monitor)

conn, err := ProcessNext(ctx, request)
if err == nil {
Expand All @@ -50,8 +49,10 @@ func (srv *monitorService) Close(ctx context.Context, connection *connection.Con
logrus.Infof("Closing connection: %v", connection)

// Pass model connection with context
ctx = common.WithConnectionMonitor(ctx, srv.monitor)
ctx = WithConnectionMonitor(ctx, srv.monitor)
conn, err := ProcessClose(ctx, connection)

// We send update if conn != nil
if conn != nil {
srv.monitor.Delete(ctx, connection)
}
Expand Down
25 changes: 13 additions & 12 deletions controlplane/pkg/local/next.go → controlplane/pkg/common/next.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package local
// Package common - define a common set of services for both local/remote chains
package common

import (
"context"
Expand All @@ -24,15 +25,15 @@ import (

"github.com/networkservicemesh/networkservicemesh/controlplane/api/connection"
"github.com/networkservicemesh/networkservicemesh/controlplane/api/networkservice"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/common"
"github.com/networkservicemesh/networkservicemesh/utils/typeutils"
)

const nextKey common.ContextKeyType = "Next"
const nextKey ContextKeyType = "Next"

type nextEndpoint struct {
composite *CompositeNetworkService
index int
composite *CompositeNetworkService
index int
factoryName string
}

// WithNext -
Expand Down Expand Up @@ -62,7 +63,7 @@ func ProcessNext(ctx context.Context, request *networkservice.NetworkServiceRequ
return request.Connection, nil
}

// ProcessClose - performs a next close operation on chain if defined
// ProcessClose - perform a next close operation on chain if defined
func ProcessClose(ctx context.Context, connection *connection.Connection) (*empty.Empty, error) {
if Next(ctx) != nil {
return Next(ctx).Close(ctx, connection)
Expand All @@ -72,17 +73,17 @@ func ProcessClose(ctx context.Context, connection *connection.Connection) (*empt

func (n *nextEndpoint) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*connection.Connection, error) {
if n.index+1 < len(n.composite.services) {
ctx = WithNext(ctx, &nextEndpoint{composite: n.composite, index: n.index + 1})
ctx = WithNext(ctx, &nextEndpoint{factoryName: n.factoryName, composite: n.composite, index: n.index + 1})
} else {
ctx = WithNext(ctx, nil)
}

span := spanhelper.FromContext(ctx, fmt.Sprintf("Local.%s.Request", typeutils.GetTypeName(n.composite.services[n.index])))
span := spanhelper.FromContext(ctx, fmt.Sprintf("%s.%s.Request", n.factoryName, typeutils.GetTypeName(n.composite.services[n.index])))
defer span.Finish()
logger := span.Logger()
ctx = span.Context()

ctx = common.WithLog(ctx, logger)
ctx = WithLog(ctx, logger)
span.LogObject("request", request)

// Actually call the next
Expand All @@ -95,18 +96,18 @@ func (n *nextEndpoint) Request(ctx context.Context, request *networkservice.Netw

func (n *nextEndpoint) Close(ctx context.Context, connection *connection.Connection) (*empty.Empty, error) {
if n.index+1 < len(n.composite.services) {
ctx = WithNext(ctx, &nextEndpoint{composite: n.composite, index: n.index + 1})
ctx = WithNext(ctx, &nextEndpoint{factoryName: n.factoryName, composite: n.composite, index: n.index + 1})
} else {
ctx = WithNext(ctx, nil)
}
// Create a new span
span := spanhelper.FromContext(ctx, fmt.Sprintf("Local.%s.Close", typeutils.GetTypeName(n.composite.services[n.index])))
span := spanhelper.FromContext(ctx, fmt.Sprintf("%s.%s.Close", n.factoryName, typeutils.GetTypeName(n.composite.services[n.index])))
defer span.Finish()
ctx = span.Context()

// Make sure we log to span
logger := span.Logger()
ctx = common.WithLog(ctx, logger)
ctx = WithLog(ctx, logger)

span.LogObject("request", connection)
rv, err := n.composite.services[n.index].Close(ctx, connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package local
package common

import (
"context"
Expand All @@ -21,7 +21,6 @@ import (

"github.com/networkservicemesh/networkservicemesh/controlplane/api/connection"
"github.com/networkservicemesh/networkservicemesh/controlplane/api/networkservice"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/common"
)

// requestValidator -
Expand All @@ -32,7 +31,7 @@ func (cce *requestValidator) Request(ctx context.Context, request *networkservic
err := request.IsValid()

if err != nil {
common.Log(ctx).Error(err)
Log(ctx).Error(err)
return nil, err
}
return ProcessNext(ctx, request)
Expand Down
17 changes: 11 additions & 6 deletions controlplane/pkg/local/connection_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ func (cce *connectionService) Request(ctx context.Context, request *networkservi
span := spanhelper.GetSpanHelper(ctx)
span.Logger().Infof("Received request from client to connect to NetworkService: %v", request)

workspaceName := common.WorkspaceName(ctx)

id := request.GetRequestConnection().GetId()
span.LogValue("connection-id", id)

workspaceName := common.WorkspaceName(ctx)
span.LogValue("workspace", workspaceName)

// We need to take updated connection in case of updates
Expand Down Expand Up @@ -86,7 +86,7 @@ func (cce *connectionService) Request(ctx context.Context, request *networkservi
clientConnection.Request = request
ctx = common.WithModelConnection(ctx, clientConnection)

conn, err := ProcessNext(ctx, request)
conn, err := common.ProcessNext(ctx, request)
if err != nil {
if !healing {
// In case of error we need to remove it from model
Expand Down Expand Up @@ -132,12 +132,13 @@ func (cce *connectionService) createClientConnection(ctx context.Context, reques
}

func (cce *connectionService) Close(ctx context.Context, connection *connection.Connection) (*empty.Empty, error) {
logrus.Infof("Closing connection: %v", connection)
logger := common.Log(ctx)
logger.Infof("Closing connection: %v", connection)

clientConnection := cce.model.GetClientConnection(connection.GetId())
if clientConnection == nil {
err := errors.Errorf("there is no such client connection %v", connection)
logrus.Error(err)
logger.Error(err)
return nil, err
}
if clientConnection.ConnectionState == model.ClientConnectionClosing {
Expand All @@ -151,9 +152,13 @@ func (cce *connectionService) Close(ctx context.Context, connection *connection.
// Pass model connection with context
ctx = common.WithModelConnection(ctx, clientConnection)

_, err := ProcessClose(ctx, connection)
_, err := common.ProcessClose(ctx, connection)

if err != nil {
logger.Error(err)
}
cce.model.DeleteClientConnection(ctx, clientConnection.GetID())

// Return empty to send update
return &empty.Empty{}, err
}
25 changes: 15 additions & 10 deletions controlplane/pkg/local/endpoint_selector_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import (
"github.com/networkservicemesh/networkservicemesh/controlplane/api/networkservice"
pluginapi "github.com/networkservicemesh/networkservicemesh/controlplane/api/plugins"
"github.com/networkservicemesh/networkservicemesh/controlplane/api/registry"
unifiednsm "github.com/networkservicemesh/networkservicemesh/controlplane/pkg/api/nsm"
nsm "github.com/networkservicemesh/networkservicemesh/controlplane/pkg/api/nsm"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/common"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/model"
"github.com/networkservicemesh/networkservicemesh/controlplane/pkg/plugins"
)

// ConnectionService makes basic Mechanism selection for the incoming connection
type endpointSelectorService struct {
nseManager unifiednsm.NetworkServiceEndpointManager
nseManager nsm.NetworkServiceEndpointManager
pluginRegistry plugins.PluginRegistry
}

Expand Down Expand Up @@ -76,6 +76,7 @@ func (cce *endpointSelectorService) Request(ctx context.Context, request *networ
if clientConnection.ConnectionState == model.ClientConnectionHealing && !requestNSEOnUpdate {
return cce.checkUpdateConnectionContext(ctx, request, clientConnection)
}

// 7.1 try find NSE and do a Request to it.
var lastError error
ignoreEndpoints := common.IgnoredEndpoints(ctx)
Expand All @@ -96,9 +97,11 @@ func (cce *endpointSelectorService) Request(ctx context.Context, request *networ
if err = cce.checkTimeout(parentCtx, span); err != nil {
return nil, err
}

// 7.1.7 perform request to NSE/remote NSMD/NSE
ctx = common.WithEndpoint(ctx, endpoint)
// Perform passing execution to next chain element.
conn, err := ProcessNext(ctx, newRequest)
conn, err := common.ProcessNext(ctx, newRequest)

// 7.1.8 in case of error we put NSE into ignored list to check another one.
if err != nil {
Expand Down Expand Up @@ -155,14 +158,16 @@ func (cce *endpointSelectorService) checkNSEUpdateIsRequired(ctx context.Context
requestNSEOnUpdate = true

// Just close, since client connection already passed with context.
_, err := ProcessClose(ctx, request.GetConnection())
// Network service is closing, we need to close remote NSM and re-program local one.
if err != nil {
if _, err := common.ProcessClose(ctx, request.GetConnection()); err != nil {
logger.Errorf("NSM:(4.1) Error during close of NSE during Request.Upgrade %v Existing connection: %v error %v", request, clientConnection, err)
}
} else {
// 4.2 Check if NSE is still required, if some more context requests are different.
requestNSEOnUpdate = cce.checkNeedNSERequest(logger, request.Connection, clientConnection, dp)
if requestNSEOnUpdate {
logger.Infof("Context is different, NSE request is required")
}
}
}
return requestNSEOnUpdate
Expand Down Expand Up @@ -244,13 +249,13 @@ func (cce *endpointSelectorService) findMechanism(mechanismPreferences []*connec
}

func (cce *endpointSelectorService) Close(ctx context.Context, connection *connection.Connection) (*empty.Empty, error) {
return ProcessClose(ctx, connection)
return common.ProcessClose(ctx, connection)
}

func (cce *endpointSelectorService) checkUpdateConnectionContext(ctx context.Context, request *networkservice.NetworkServiceRequest, clientConnection *model.ClientConnection) (*connection.Connection, error) {
logger := common.Log(ctx)
// We do not need to do request to endpoint and just need to update all stuff.
// 7.2 We do not need to access NSE, since all parameters are same.
logger := common.Log(ctx)
clientConnection.Xcon.Source.Mechanism = request.Connection.GetMechanism()
clientConnection.Xcon.Source.State = connection.State_UP

Expand All @@ -259,8 +264,8 @@ func (cce *endpointSelectorService) checkUpdateConnectionContext(ctx context.Con
err = errors.Errorf("NSM:(7.3) Failed to update source connection context: %v", err)

// Just close since client connection is already passed with context
if _, closeErr := ProcessClose(ctx, request.GetConnection()); closeErr != nil {
logger.Errorf("Close error: %v", closeErr)
if _, closeErr := common.ProcessClose(ctx, request.GetConnection()); closeErr != nil {
logger.Errorf("Failed to perform close: %v", closeErr)
}
return nil, err
}
Expand Down Expand Up @@ -304,7 +309,7 @@ func (cce *endpointSelectorService) checkTimeout(ctx context.Context, span spanh
}

// NewEndpointSelectorService - creates a service to select endpoint
func NewEndpointSelectorService(nseManager unifiednsm.NetworkServiceEndpointManager, pluginRegistry plugins.PluginRegistry) networkservice.NetworkServiceServer {
func NewEndpointSelectorService(nseManager nsm.NetworkServiceEndpointManager, pluginRegistry plugins.PluginRegistry) networkservice.NetworkServiceServer {
return &endpointSelectorService{
nseManager: nseManager,
pluginRegistry: pluginRegistry,
Expand Down
Loading

0 comments on commit 7dd40ca

Please sign in to comment.