Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic host volumes: change env vars, fixup auto-delete #24943

Merged
merged 6 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie

// set up dynamic host volume manager
c.hostVolumeManager = hvm.NewHostVolumeManager(logger, hvm.Config{
PluginDir: cfg.HostVolumePluginDir,
SharedMountDir: cfg.AllocMountsDir,
PluginDir: c.GetConfig().HostVolumePluginDir,
VolumesDir: c.GetConfig().HostVolumesDir,
NodePool: c.Node().NodePool,
StateMgr: c.stateDB,
UpdateNodeVols: c.batchNodeUpdates.updateNodeFromHostVolume,
})
Expand Down Expand Up @@ -702,6 +703,13 @@ func (c *Client) init() error {

c.stateDB = db

// Ensure host_volumes_dir config is not empty.
if conf.HostVolumesDir == "" {
conf = c.UpdateConfig(func(c *config.Config) {
c.HostVolumesDir = filepath.Join(conf.StateDir, "host_volumes")
})
}

// Ensure the alloc mounts dir exists if we are configured with a custom path.
if conf.AllocMountsDir != "" {
if err := os.MkdirAll(conf.AllocMountsDir, 0o711); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ type Config struct {
// HostVolumes is a map of the configured host volumes by name.
HostVolumes map[string]*structs.ClientHostVolumeConfig

// HostVolumesDir is the suggested directory for plugins to put volumes.
// Volume plugins may ignore this suggestion, but we provide this default.
HostVolumesDir string

// HostVolumePluginDir is the directory with dynamic host volume plugins.
HostVolumePluginDir string

Expand Down
12 changes: 6 additions & 6 deletions client/fingerprint/dynamic_host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (h *DynamicHostVolumePluginFingerprint) Fingerprint(request *FingerprintReq
return nil
}

plugins, err := GetHostVolumePluginVersions(h.logger, pluginDir)
plugins, err := GetHostVolumePluginVersions(h.logger, pluginDir, request.Node.NodePool)
if err != nil {
if os.IsNotExist(err) {
h.logger.Debug("plugin dir does not exist", "dir", pluginDir)
Expand Down Expand Up @@ -74,10 +74,10 @@ func (h *DynamicHostVolumePluginFingerprint) Periodic() (bool, time.Duration) {
return false, 0
}

// GetHostVolumePluginVersions finds all the executable files on disk
// that respond to a Version call (arg $1 = 'version' / env $OPERATION = 'version')
// The return map's keys are plugin IDs, and the values are version strings.
func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string]string, error) {
// GetHostVolumePluginVersions finds all the executable files on disk that
// respond to a `fingerprint` call. The return map's keys are plugin IDs,
// and the values are version strings.
func GetHostVolumePluginVersions(log hclog.Logger, pluginDir, nodePool string) (map[string]string, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodepool isn't really needed in this context, because fingerprint calls don't include it, but I pass it through here anyway to avoid potential confusion in the future if someone does want to add it to fingerprint env.

files, err := helper.FindExecutableFiles(pluginDir)
if err != nil {
return nil, err
Expand All @@ -97,7 +97,7 @@ func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string

log := log.With("plugin_id", file)

p, err := hvm.NewHostVolumePluginExternal(log, pluginDir, file, "")
p, err := hvm.NewHostVolumePluginExternal(log, pluginDir, file, "", nodePool)
if err != nil {
log.Warn("error getting plugin", "error", err)
return
Expand Down
4 changes: 2 additions & 2 deletions client/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestHostVolume(t *testing.T) {
StateMgr: client.stateDB,
UpdateNodeVols: client.updateNodeFromHostVol,
PluginDir: "/no/ext/plugins",
SharedMountDir: tmp,
VolumesDir: tmp,
})
client.hostVolumeManager = manager
hostPathCreate := filepath.Join(tmp, "test-vol-id-1")
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestHostVolume(t *testing.T) {
StateMgr: client.stateDB,
UpdateNodeVols: client.updateNodeFromHostVol,
PluginDir: "/no/ext/plugins",
SharedMountDir: "host_volume_endpoint_test.go",
VolumesDir: "host_volume_endpoint_test.go",
})

req := &cstructs.ClientHostVolumeCreateRequest{
Expand Down
130 changes: 72 additions & 58 deletions client/hostvolumemanager/host_volume_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ import (

const (
// environment variables for external plugins

EnvOperation = "DHV_OPERATION"
EnvHostPath = "DHV_HOST_PATH"
EnvNodeID = "DHV_NODE_ID"
EnvVolumesDir = "DHV_VOLUMES_DIR"
EnvPluginDir = "DHV_PLUGIN_DIR"
EnvCreatedPath = "DHV_CREATED_PATH"
EnvNamespace = "DHV_NAMESPACE"
EnvVolumeName = "DHV_VOLUME_NAME"
EnvVolumeID = "DHV_VOLUME_ID"
EnvNodeID = "DHV_NODE_ID"
EnvNodePool = "DHV_NODE_POOL"
EnvCapacityMin = "DHV_CAPACITY_MIN_BYTES"
EnvCapacityMax = "DHV_CAPACITY_MAX_BYTES"
EnvPluginDir = "DHV_PLUGIN_DIR"
EnvParameters = "DHV_PARAMETERS"
)

Expand Down Expand Up @@ -62,10 +66,10 @@ const HostVolumePluginMkdirVersion = "0.0.1"
var _ HostVolumePlugin = &HostVolumePluginMkdir{}

// HostVolumePluginMkdir is a plugin that creates a directory within the
// specified TargetPath. It is built-in to Nomad, so is always available.
// specified VolumesDir. It is built-in to Nomad, so is always available.
type HostVolumePluginMkdir struct {
ID string
TargetPath string
VolumesDir string

log hclog.Logger
}
Expand All @@ -80,7 +84,7 @@ func (p *HostVolumePluginMkdir) Fingerprint(_ context.Context) (*PluginFingerpri
func (p *HostVolumePluginMkdir) Create(_ context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {

path := filepath.Join(p.TargetPath, req.ID)
path := filepath.Join(p.VolumesDir, req.ID)
log := p.log.With(
"operation", "create",
"volume_id", req.ID,
Expand All @@ -102,7 +106,7 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
return nil, err
}

err := os.Mkdir(path, 0o700)
err := os.MkdirAll(path, 0o700)
if err != nil {
log.Debug("error with plugin", "error", err)
return nil, err
Expand All @@ -113,7 +117,7 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
}

func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
path := filepath.Join(p.TargetPath, req.ID)
path := filepath.Join(p.VolumesDir, req.ID)
log := p.log.With(
"operation", "delete",
"volume_id", req.ID,
Expand All @@ -135,7 +139,7 @@ var _ HostVolumePlugin = &HostVolumePluginExternal{}
// NewHostVolumePluginExternal returns an external host volume plugin
// if the specified executable exists on disk.
func NewHostVolumePluginExternal(log hclog.Logger,
pluginDir, filename, targetPath string) (*HostVolumePluginExternal, error) {
pluginDir, filename, volumesDir, nodePool string) (*HostVolumePluginExternal, error) {
// this should only be called with already-detected executables,
// but we'll double-check it anyway, so we can provide a tidy error message
// if it has changed between fingerprinting and execution.
Expand All @@ -153,8 +157,9 @@ func NewHostVolumePluginExternal(log hclog.Logger,
return &HostVolumePluginExternal{
ID: filename,
Executable: executable,
TargetPath: targetPath,
VolumesDir: volumesDir,
PluginDir: pluginDir,
NodePool: nodePool,
log: log,
}, nil
}
Expand All @@ -166,16 +171,17 @@ func NewHostVolumePluginExternal(log hclog.Logger,
type HostVolumePluginExternal struct {
ID string
Executable string
TargetPath string
VolumesDir string
PluginDir string
NodePool string

log hclog.Logger
}

// Fingerprint calls the executable with the following parameters:
// arguments: fingerprint
// arguments: $1=fingerprint
// environment:
// DHV_OPERATION=fingerprint
// - DHV_OPERATION=fingerprint
//
// Response should be valid JSON on stdout, with a "version" key, e.g.:
// {"version": "0.0.1"}
Expand Down Expand Up @@ -205,21 +211,23 @@ func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFing
}

// Create calls the executable with the following parameters:
// arguments: create {path to create}
// arguments: $1=create
// environment:
// DHV_OPERATION=create
// DHV_HOST_PATH={path to create}
// DHV_NODE_ID={Nomad node ID}
// DHV_VOLUME_NAME={name from the volume specification}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec}
// DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec}
// DHV_PARAMETERS={json of parameters from the volume spec}
// DHV_PLUGIN_DIR={path to directory containing plugins}
// - DHV_OPERATION=create
// - DHV_VOLUMES_DIR={directory to put the volume in}
// - DHV_PLUGIN_DIR={path to directory containing plugins}
// - DHV_NAMESPACE={volume namespace}
// - DHV_VOLUME_NAME={name from the volume specification}
// - DHV_VOLUME_ID={volume ID generated by Nomad}
// - DHV_NODE_ID={Nomad node ID}
// - DHV_NODE_POOL={Nomad node pool}
// - DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec, expressed in bytes}
// - DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec, expressed in bytes}
// - DHV_PARAMETERS={stringified json of parameters from the volume spec}
//
// Response should be valid JSON on stdout with "path" and "bytes", e.g.:
// {"path": $HOST_PATH, "bytes": 50000000}
// "path" must be provided to confirm that the requested path is what was
// {"path": "/path/that/was/created", "bytes": 50000000}
// "path" must be provided to confirm the requested path is what was
// created by the plugin. "bytes" is the actual size of the volume created
// by the plugin; if excluded, it will default to 0.
//
Expand All @@ -233,14 +241,22 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvOperation, "create"),
fmt.Sprintf("%s=%s", EnvVolumesDir, p.VolumesDir),
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
fmt.Sprintf("%s=%s", EnvNodePool, p.NodePool),
// values from volume spec
fmt.Sprintf("%s=%s", EnvNamespace, req.Namespace),
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
fmt.Sprintf("%s=%s", EnvVolumeID, req.ID),
fmt.Sprintf("%s=%d", EnvCapacityMin, req.RequestedCapacityMinBytes),
fmt.Sprintf("%s=%d", EnvCapacityMax, req.RequestedCapacityMaxBytes),
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvParameters, params),
}

stdout, _, err := p.runPlugin(ctx, "create", req.ID, envVars)
log := p.log.With("volume_name", req.Name, "volume_id", req.ID)
stdout, _, err := p.runPlugin(ctx, log, "create", envVars)
if err != nil {
return nil, fmt.Errorf("error creating volume %q with plugin %q: %w", req.ID, p.ID, err)
}
Expand All @@ -253,20 +269,22 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
// an error here after the plugin has done who-knows-what.
return nil, err
}
// TODO: validate returned host path
return &pluginResp, nil
}

// Delete calls the executable with the following parameters:
// arguments: delete {path to create}
// arguments: $1=delete
// environment:
// DHV_OPERATION=delete
// DHV_HOST_PATH={path to create}
// DHV_NODE_ID={Nomad node ID}
// DHV_VOLUME_NAME={name from the volume specification}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_PARAMETERS={json of parameters from the volume spec}
// DHV_PLUGIN_DIR={path to directory containing plugins}
// - DHV_OPERATION=delete
// - DHV_CREATED_PATH={path that `create` returned}
// - DHV_VOLUMES_DIR={directory that volumes should be put in}
// - DHV_PLUGIN_DIR={path to directory containing plugins}
// - DHV_NAMESPACE={volume namespace}
// - DHV_VOLUME_NAME={name from the volume specification}
// - DHV_VOLUME_ID={volume ID generated by Nomad}
// - DHV_NODE_ID={Nomad node ID}
// - DHV_NODE_POOL={Nomad node pool}
// - DHV_PARAMETERS={stringified json of parameters from the volume spec}
//
// Response on stdout is discarded.
//
Expand All @@ -280,42 +298,38 @@ func (p *HostVolumePluginExternal) Delete(ctx context.Context,
return fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvOperation, "delete"),
fmt.Sprintf("%s=%s", EnvVolumesDir, p.VolumesDir),
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
fmt.Sprintf("%s=%s", EnvNodePool, p.NodePool),
// from create response
fmt.Sprintf("%s=%s", EnvCreatedPath, req.HostPath),
// values from volume spec
fmt.Sprintf("%s=%s", EnvNamespace, req.Namespace),
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
fmt.Sprintf("%s=%s", EnvVolumeID, req.ID),
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvParameters, params),
}

_, _, err = p.runPlugin(ctx, "delete", req.ID, envVars)
log := p.log.With("volume_name", req.Name, "volume_id", req.ID)
_, _, err = p.runPlugin(ctx, log, "delete", envVars)
if err != nil {
return fmt.Errorf("error deleting volume %q with plugin %q: %w", req.ID, p.ID, err)
}
return nil
}

// runPlugin executes the... executable with these additional env vars:
// DHV_OPERATION={op}
// DHV_HOST_PATH={path to create}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_PLUGIN_DIR={path to directory containing plugins}
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
op, volID string, env []string) (stdout, stderr []byte, err error) {
// runPlugin executes the... executable
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context, log hclog.Logger,
op string, env []string) (stdout, stderr []byte, err error) {

path := filepath.Join(p.TargetPath, volID)
log := p.log.With(
"operation", op,
"volume_id", volID,
"path", path)
log = log.With("operation", op)
log.Debug("running plugin")

// set up plugin execution
cmd := exec.CommandContext(ctx, p.Executable, op, path)

cmd.Env = append([]string{
fmt.Sprintf("%s=%s", EnvOperation, op),
fmt.Sprintf("%s=%s", EnvHostPath, path),
fmt.Sprintf("%s=%s", EnvVolumeID, volID),
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
}, env...)
cmd := exec.CommandContext(ctx, p.Executable, op)
cmd.Env = env

stdout, stderr, err = runCommand(cmd)

Expand Down
Loading
Loading