Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nodePort support #332

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion USAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Flags:
-n, --source-namespace string namespace of the source PVC
-p, --source-path string the filesystem path to migrate in the source PVC (default "/")
-a, --ssh-key-algorithm string ssh key algorithm to be used. Valid values are rsa,ed25519 (default "ed25519")
-s, --strategies strings the comma-separated list of strategies to be used in the given order (default [mnt2,svc,lbsvc])
-s, --strategies strings the comma-separated list of strategies to be used in the given order (default [mnt2,svc,lbsvc,nodePort])
-v, --version version for pv-migrate

Use "pv-migrate [command] --help" for more information about a command.
Expand All @@ -63,6 +63,7 @@ resources, serviceacccounts, additional annotations etc.
| `mnt2` | **Mount both** - Mounts both PVCs in a single pod and runs a regular rsync, without using SSH or the network. Only applicable if source and destination PVCs are in the same namespace and both can be mounted from a single pod. |
| `svc` | **Service** - Runs rsync+ssh over a Kubernetes Service (`ClusterIP`). Only applicable when source and destination PVCs are in the same Kubernetes cluster. |
| `lbsvc` | **Load Balancer Service** - Runs rsync+ssh over a Kubernetes Service of type `LoadBalancer`. Always applicable (will fail if `LoadBalancer` IP is not assigned for a long period). |
| `nodePort` | **Node Port Service** - Runs rsync+ssh over a Kubernetes Service of type `NodePort`. Always applicable. |
| `local` | **Local Transfer** - Runs sshd on both source and destination, then uses a combination of `kubectl port-forward` logic and an SSH reverse proxy to tunnel all the traffic over the client device (the device which runs pv-migrate, e.g. your laptop). Requires `ssh` command to be available on the client device. <br/><br/>Note that this strategy is **experimental** (and not enabled by default), potentially can put heavy load on both apiservers and is not as resilient as others. It is recommended for small amounts of data and/or when the only access to both clusters seems to be through `kubectl` (e.g. for air-gapped clusters, on jump hosts etc.). |

## Examples
Expand Down
12 changes: 12 additions & 0 deletions k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,15 @@ func waitForPodTermination(ctx context.Context, cli kubernetes.Interface,

return result, nil
}

// GetNodeAddress permit to get the host address that host the pod
// We use it on nodePort strategy
func GetNodeAddress(ctx context.Context, cli kubernetes.Interface, ns, labelSelector string) (string, error) {

pod, err := WaitForPod(ctx, cli, ns, labelSelector)
if err != nil {
return "", err
}

return pod.Status.HostIP, nil
}
64 changes: 64 additions & 0 deletions k8s/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,67 @@ func GetServiceAddress(

return result, nil
}

// GetServiceNodePort permit to get the service port
// We use it on nodePort strategy
func GetServiceNodePort(
ctx context.Context,
cli kubernetes.Interface,
namespace string,
name string,
serviceTimeout time.Duration,
) (int, error) {
var result int

resCli := cli.CoreV1().Services(namespace)
fieldSelector := fields.OneTermEqualSelector(metav1.ObjectNameField, name).String()

ctx, cancel := context.WithTimeout(ctx, serviceTimeout)
defer cancel()

listWatch := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector

list, err := resCli.List(ctx, options)
if err != nil {
return nil, fmt.Errorf("failed to list services %s/%s: %w", namespace, name, err)
}

return list, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector

resWatch, err := resCli.Watch(ctx, options)
if err != nil {
return nil, fmt.Errorf("failed to watch services %s/%s: %w", namespace, name, err)
}

return resWatch, nil
},
}

if _, err := watchtools.UntilWithSync(ctx, listWatch, &corev1.Service{}, nil,
func(event watch.Event) (bool, error) {
res, ok := event.Object.(*corev1.Service)
if !ok {
return false, fmt.Errorf("unexpected type while watching service: %s/%s", namespace, name)
}

if res.Spec.Type != corev1.ServiceTypeNodePort {
return false, fmt.Errorf("the service must be a nodePort type %s/%s", namespace, name)
}

if len(res.Spec.Ports) > 0 {
result = int(res.Spec.Ports[0].NodePort)
return true, nil
}

return false, nil
}); err != nil {
return 0, fmt.Errorf("failed to get service %s/%s address: %w", namespace, name, err)
}

return result, nil
}
166 changes: 166 additions & 0 deletions strategy/nodeportsvc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package strategy

import (
"context"
"fmt"
"log/slog"

"github.com/utkuozdemir/pv-migrate/k8s"
"github.com/utkuozdemir/pv-migrate/migration"
"github.com/utkuozdemir/pv-migrate/rsync"
"github.com/utkuozdemir/pv-migrate/ssh"
)

type NodePortSvc struct{}

func (r *NodePortSvc) Run(ctx context.Context, attempt *migration.Attempt, logger *slog.Logger) error {
mig := attempt.Migration

sourceInfo := mig.SourceInfo
destInfo := mig.DestInfo
sourceNs := sourceInfo.Claim.Namespace
destNs := destInfo.Claim.Namespace
keyAlgorithm := mig.Request.KeyAlgorithm

logger.Info("🔑 Generating SSH key pair", "algorithm", keyAlgorithm)

publicKey, privateKey, err := ssh.CreateSSHKeyPair(keyAlgorithm)
if err != nil {
return fmt.Errorf("failed to create ssh key pair: %w", err)
}

privateKeyMountPath := "/tmp/id_" + keyAlgorithm

srcReleaseName := attempt.HelmReleaseNamePrefix + "-src"
destReleaseName := attempt.HelmReleaseNamePrefix + "-dest"
releaseNames := []string{srcReleaseName, destReleaseName}

doneCh := registerCleanupHook(attempt, releaseNames, logger)
defer cleanupAndReleaseHook(ctx, attempt, releaseNames, doneCh, logger)

err = installNodePortSvcOnSource(attempt, srcReleaseName, publicKey, srcMountPath, logger)
if err != nil {
return fmt.Errorf("failed to install on source: %w", err)
}

sourceKubeClient := attempt.Migration.SourceInfo.ClusterClient.KubeClient
svcName := srcReleaseName + "-sshd"

nodePort, err := k8s.GetServiceNodePort(ctx, sourceKubeClient, sourceNs, svcName, mig.Request.LBSvcTimeout)
if err != nil {
return fmt.Errorf("failed to get service address: %w", err)
}
labelSelector := "app.kubernetes.io/component=sshd,app.kubernetes.io/instance=" + srcReleaseName
nodeAddress, err := k8s.GetNodeAddress(ctx, sourceKubeClient, sourceNs, labelSelector)
if err != nil {
return fmt.Errorf("failed to get node address: %w", err)
}

sshTargetHost := formatSSHTargetHost(nodeAddress)
if mig.Request.DestHostOverride != "" {
sshTargetHost = mig.Request.DestHostOverride
}

err = installNodePortSvcOnDest(attempt, destReleaseName, privateKey, privateKeyMountPath,
sshTargetHost, nodePort, srcMountPath, destMountPath, logger)
if err != nil {
return fmt.Errorf("failed to install on dest: %w", err)
}

showProgressBar := !attempt.Migration.Request.NoProgressBar
kubeClient := destInfo.ClusterClient.KubeClient
jobName := destReleaseName + "-rsync"

if err = k8s.WaitForJobCompletion(ctx, kubeClient, destNs, jobName, showProgressBar, logger); err != nil {
return fmt.Errorf("failed to wait for job completion: %w", err)
}

return nil
}

func installNodePortSvcOnSource(
attempt *migration.Attempt,
releaseName,
publicKey,
srcMountPath string,
logger *slog.Logger,
) error {
mig := attempt.Migration
sourceInfo := mig.SourceInfo
namespace := sourceInfo.Claim.Namespace

vals := map[string]interface{}{
"sshd": map[string]interface{}{
"enabled": true,
"namespace": namespace,
"publicKey": publicKey,
"service": map[string]interface{}{
"type": "NodePort",
},
"pvcMounts": []map[string]interface{}{
{
"name": sourceInfo.Claim.Name,
"readOnly": mig.Request.SourceMountReadOnly,
"mountPath": srcMountPath,
},
},
"affinity": sourceInfo.AffinityHelmValues,
},
}

return installHelmChart(attempt, sourceInfo, releaseName, vals, logger)
}

func installNodePortSvcOnDest(
attempt *migration.Attempt,
releaseName string,
privateKey string,
privateKeyMountPath string,
sshHost string,
sshPort int,
srcMountPath string,
destMountPath string,
logger *slog.Logger,
) error {
mig := attempt.Migration
destInfo := mig.DestInfo
namespace := destInfo.Claim.Namespace

srcPath := srcMountPath + "/" + mig.Request.Source.Path
destPath := destMountPath + "/" + mig.Request.Dest.Path
rsyncCmd := rsync.Cmd{
NoChown: mig.Request.NoChown,
Delete: mig.Request.DeleteExtraneousFiles,
SrcPath: srcPath,
DestPath: destPath,
SrcUseSSH: true,
SrcSSHHost: sshHost,
Port: sshPort,
Compress: mig.Request.Compress,
}
rsyncCmdStr, err := rsyncCmd.Build()
if err != nil {
return err
}

vals := map[string]interface{}{
"rsync": map[string]interface{}{
"enabled": true,
"namespace": namespace,
"privateKeyMount": true,
"privateKey": privateKey,
"privateKeyMountPath": privateKeyMountPath,
"sshRemoteHost": sshHost,
"pvcMounts": []map[string]interface{}{
{
"name": destInfo.Claim.Name,
"mountPath": destMountPath,
},
},
"command": rsyncCmdStr,
"affinity": destInfo.AffinityHelmValues,
},
}

return installHelmChart(attempt, destInfo, releaseName, vals, logger)
}
22 changes: 12 additions & 10 deletions strategy/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
)

const (
Mnt2Strategy = "mnt2"
SvcStrategy = "svc"
LbSvcStrategy = "lbsvc"
LocalStrategy = "local"
Mnt2Strategy = "mnt2"
SvcStrategy = "svc"
LbSvcStrategy = "lbsvc"
LocalStrategy = "local"
NodePortSvcStrategy = "nodePort"

helmValuesYAMLIndent = 2

Expand All @@ -36,14 +37,15 @@ const (
)

var (
DefaultStrategies = []string{Mnt2Strategy, SvcStrategy, LbSvcStrategy}
AllStrategies = []string{Mnt2Strategy, SvcStrategy, LbSvcStrategy, LocalStrategy}
DefaultStrategies = []string{Mnt2Strategy, SvcStrategy, LbSvcStrategy, NodePortSvcStrategy}
AllStrategies = []string{Mnt2Strategy, SvcStrategy, LbSvcStrategy, LocalStrategy, NodePortSvcStrategy}

nameToStrategy = map[string]Strategy{
Mnt2Strategy: &Mnt2{},
SvcStrategy: &Svc{},
LbSvcStrategy: &LbSvc{},
LocalStrategy: &Local{},
Mnt2Strategy: &Mnt2{},
SvcStrategy: &Svc{},
LbSvcStrategy: &LbSvc{},
LocalStrategy: &Local{},
NodePortSvcStrategy: &NodePortSvc{},
}

helmProviders = getter.All(cli.New())
Expand Down