Skip to content

Commit

Permalink
VDDK/ImageIO cleanup after importer termination. (kubevirt#1670)
Browse files Browse the repository at this point in the history
* Add signal catcher channel common to VDDK/ImageIO.

Signed-off-by: Matthew Arnold <[email protected]>

* Close nbdkit on VDDK importer termination.

Signed-off-by: Matthew Arnold <[email protected]>

* Unlock disk on ImageIO importer termination.

Signed-off-by: Matthew Arnold <[email protected]>

* Reset disk availability in new ImageIO unit test.

Signed-off-by: Matthew Arnold <[email protected]>
  • Loading branch information
mrnold authored Feb 23, 2021
1 parent 2b47638 commit aa7a800
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 1 deletion.
63 changes: 63 additions & 0 deletions pkg/importer/imageio-datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ func NewImageioDataSource(endpoint string, accessKey string, secKey string, cert
// We know this is a counting reader, so no need to check.
countingReader := imageioReader.(*util.CountingReader)
go imageioSource.pollProgress(countingReader, 10*time.Minute, time.Second)

terminationChannel := newTerminationChannel()
go func() {
<-terminationChannel
klog.Infof("Caught termination signal, closing ImageIO.")
err := cancelTransfer(conn, it)
if err != nil {
klog.Errorf("Error cancelling transfer: %v", err)
}
err = imageioSource.Close()
if err != nil {
klog.Errorf("Error closing source: %v", err)
}
}()

return imageioSource, nil
}

Expand Down Expand Up @@ -189,10 +204,12 @@ func createImageioReader(ctx context.Context, ep string, accessKey string, secKe
// Use the create client from http source.
client, err := createHTTPClient(certDir)
if err != nil {
cancelTransfer(conn, it)
return nil, uint64(0), it, conn, err
}
transferURL, available := it.TransferUrl()
if !available {
cancelTransfer(conn, it)
return nil, uint64(0), it, conn, errors.New("Error transfer url not available")
}

Expand All @@ -201,9 +218,11 @@ func createImageioReader(ctx context.Context, ep string, accessKey string, secKe

resp, err := client.Do(req)
if err != nil {
cancelTransfer(conn, it)
return nil, uint64(0), it, conn, errors.Wrap(err, "Sending request failed")
}
if resp.StatusCode != http.StatusOK {
cancelTransfer(conn, it)
return nil, uint64(0), it, conn, errors.Errorf("bad status: %s", resp.Status)
}

Expand All @@ -218,6 +237,15 @@ func createImageioReader(ctx context.Context, ep string, accessKey string, secKe
return countingReader, total, it, conn, nil
}

// cancelTransfer makes sure the disk is unlocked before shutting down importer
func cancelTransfer(conn ConnectionInterface, it *ovirtsdk4.ImageTransfer) error {
var err error
if conn != nil && it != nil {
_, err = conn.SystemService().ImageTransfersService().ImageTransferService(it.MustId()).Cancel().Send()
}
return err
}

func getTransfer(conn ConnectionInterface, diskID string) (*ovirtsdk4.ImageTransfer, uint64, error) {
disksService := conn.SystemService().DisksService()
diskService := disksService.DiskService(diskID)
Expand Down Expand Up @@ -366,9 +394,19 @@ type ImageTransfersServiceInterface interface {

// ImageTransferServiceInterface defines service methods
type ImageTransferServiceInterface interface {
Cancel() ImageTransferServiceCancelRequestInterface
Finalize() ImageTransferServiceFinalizeRequestInterface
}

// ImageTransferServiceCancelRequestInterface defines service methods
type ImageTransferServiceCancelRequestInterface interface {
Send() (ImageTransferServiceCancelResponseInterface, error)
}

// ImageTransferServiceCancelResponseInterface defines service methods
type ImageTransferServiceCancelResponseInterface interface {
}

// ImageTransferServiceFinalizeRequestInterface defines service methods
type ImageTransferServiceFinalizeRequestInterface interface {
Send() (ImageTransferServiceFinalizeResponseInterface, error)
Expand Down Expand Up @@ -449,6 +487,16 @@ type ImageTransfersServiceAddResponse struct {
srv *ovirtsdk4.ImageTransfersServiceAddResponse
}

// ImageTransferServiceCancelRequest wraps cancel request
type ImageTransferServiceCancelRequest struct {
srv *ovirtsdk4.ImageTransferServiceCancelRequest
}

// ImageTransferServiceCancelResponse wraps cancel response
type ImageTransferServiceCancelResponse struct {
srv *ovirtsdk4.ImageTransferServiceCancelResponse
}

// ImageTransferServiceFinalizeRequest warps finalize request
type ImageTransferServiceFinalizeRequest struct {
srv *ovirtsdk4.ImageTransferServiceFinalizeRequest
Expand Down Expand Up @@ -494,6 +542,14 @@ func (service *ImageTransfersServiceResponse) Send() (ImageTransfersServiceAddRe
}, err
}

// Send returns transfer cancel response
func (service *ImageTransferServiceCancelRequest) Send() (ImageTransferServiceCancelResponseInterface, error) {
resp, err := service.srv.Send()
return &ImageTransferServiceCancelResponse{
srv: resp,
}, err
}

// Send returns disk get response
func (service *ImageTransferServiceFinalizeRequest) Send() (ImageTransferServiceFinalizeResponseInterface, error) {
resp, err := service.srv.Send()
Expand Down Expand Up @@ -545,6 +601,13 @@ func (service *ImageTransfersService) ImageTransferService(id string) ImageTrans
}
}

// Cancel returns image service
func (service *ImageTransferService) Cancel() ImageTransferServiceCancelRequestInterface {
return &ImageTransferServiceCancelRequest{
srv: service.srv.Cancel(),
}
}

// Finalize returns image service
func (service *ImageTransferService) Finalize() ImageTransferServiceFinalizeRequestInterface {
return &ImageTransferServiceFinalizeRequest{
Expand Down
43 changes: 43 additions & 0 deletions pkg/importer/imageio-datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ var _ = Describe("Imageio reader", func() {

BeforeEach(func() {
newOvirtClientFunc = createMockOvirtClient
newTerminationChannel = createMockTerminationChannel
tempDir = createCert()
ts = createTestServer(imageDir)
disk.SetTotalSize(1024)
disk.SetId("123")
it.SetPhase(ovirtsdk4.IMAGETRANSFERPHASE_TRANSFERRING)
it.SetTransferUrl(ts.URL + "/" + cirrosFileName)
it.SetId("123")
diskCreateError = nil
diskAvailable = true
})
Expand Down Expand Up @@ -77,12 +79,14 @@ var _ = Describe("Imageio data source", func() {

BeforeEach(func() {
newOvirtClientFunc = createMockOvirtClient
newTerminationChannel = createMockTerminationChannel
tempDir = createCert()
ts = createTestServer(imageDir)
disk.SetTotalSize(1024)
disk.SetId("123")
it.SetPhase(ovirtsdk4.IMAGETRANSFERPHASE_TRANSFERRING)
it.SetTransferUrl(ts.URL)
it.SetId("123")
diskAvailable = true
diskCreateError = nil
})
Expand Down Expand Up @@ -220,6 +224,27 @@ var _ = Describe("Imageio pollprogress", func() {
})
})

var _ = Describe("Imageio cancel", func() {
It("should cancel transfer on SIGTERM", func() {
newOvirtClientFunc = createMockOvirtClient
newTerminationChannel = createMockTerminationChannel
tempDir := createCert()
ts := createTestServer(imageDir)
disk.SetTotalSize(1024)
disk.SetId("123")
it.SetPhase(ovirtsdk4.IMAGETRANSFERPHASE_TRANSFERRING)
it.SetTransferUrl(ts.URL)
it.SetId("123")
diskAvailable = true
diskCreateError = nil

_, err := NewImageioDataSource(ts.URL, "", "", tempDir, "")
Expect(err).ToNot(HaveOccurred())
mockTerminationChannel <- os.Interrupt
Expect(err).ToNot(HaveOccurred())
})
})

// MockOvirtClient is a mock minio client
type MockOvirtClient struct {
ep string
Expand All @@ -232,6 +257,10 @@ type MockAddService struct {
client *MockOvirtClient
}

type MockCancelService struct {
client *MockOvirtClient
}

type MockFinalizeService struct {
client *MockOvirtClient
}
Expand All @@ -240,6 +269,10 @@ type MockImageTransfersServiceAddResponse struct {
srv *ovirtsdk4.ImageTransfersServiceAddResponse
}

type MockImageTransferServiceCancelResponse struct {
srv *ovirtsdk4.ImageTransferServiceCancelResponse
}

type MockImageTransferServiceFinalizeResponse struct {
srv *ovirtsdk4.ImageTransferServiceFinalizeResponse
}
Expand Down Expand Up @@ -272,6 +305,12 @@ func (conn *MockOvirtClient) ImageTransferService(string) ImageTransferServiceIn
return conn
}

func (conn *MockOvirtClient) Cancel() ImageTransferServiceCancelRequestInterface {
return &MockCancelService{
client: conn,
}
}

func (conn *MockOvirtClient) Finalize() ImageTransferServiceFinalizeRequestInterface {
return &MockFinalizeService{
client: conn,
Expand All @@ -291,6 +330,10 @@ func (conn *MockAddService) Send() (ImageTransfersServiceAddResponseInterface, e
return &MockImageTransfersServiceAddResponse{srv: nil}, nil
}

func (conn *MockCancelService) Send() (ImageTransferServiceCancelResponseInterface, error) {
return &MockImageTransferServiceCancelResponse{srv: nil}, nil
}

func (conn *MockFinalizeService) Send() (ImageTransferServiceFinalizeResponseInterface, error) {
return &MockImageTransferServiceFinalizeResponse{srv: nil}, nil
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/importer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"io/ioutil"
"net/url"
"os"
"os/signal"
"path/filepath"
"syscall"

"github.com/pkg/errors"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -43,3 +45,13 @@ func CleanDir(dest string) error {
}
return nil
}

// GetTerminationChannel returns a channel that listens for SIGTERM
func GetTerminationChannel() <-chan os.Signal {
terminationChannel := make(chan os.Signal, 1)
signal.Notify(terminationChannel, os.Interrupt, syscall.SIGTERM)
return terminationChannel
}

// newTerminationChannel should be overriden for unit tests
var newTerminationChannel = GetTerminationChannel
8 changes: 8 additions & 0 deletions pkg/importer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,11 @@ var _ = Describe("Clean dir", func() {
Expect(0).To(Equal(len(dir)))
})
})

// For use in transfer cancellation unit tests, currently VDDK/ImageIO
var mockTerminationChannel chan os.Signal

func createMockTerminationChannel() <-chan os.Signal {
mockTerminationChannel = make(chan os.Signal, 1)
return mockTerminationChannel
}
15 changes: 14 additions & 1 deletion pkg/importer/vddk-datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,14 @@ func createVMwareClient(endpoint string, accessKey string, secKey string, thumbp
conn, err := govmomi.NewClient(ctx, vmwURL, true)
if err != nil {
klog.Errorf("Unable to connect to vCenter: %v", err)
cancel()
return nil, err
}

moref, vm, err := FindVM(ctx, conn, uuid)
if err != nil {
klog.Errorf("Unable to find MORef for VM with UUID %s!", uuid)
cancel()
return nil, err
}

Expand Down Expand Up @@ -857,6 +859,14 @@ func createVddkDataSource(endpoint string, accessKey string, secKey string, thum
Size: size,
VolumeMode: volumeMode,
}

terminationChannel := newTerminationChannel()
go func() {
<-terminationChannel
klog.Infof("Caught termination signal, closing nbdkit.")
source.Close()
}()

return source, nil
}

Expand All @@ -869,7 +879,10 @@ func (vs *VDDKDataSource) Info() (ProcessingPhase, error) {
// Close closes any readers or other open resources.
func (vs *VDDKDataSource) Close() error {
vs.NbdKit.Handle.Close()
return vs.NbdKit.Command.Process.Kill()
if vs.NbdKit.Command.Process != nil {
return vs.NbdKit.Command.Process.Signal(os.Interrupt)
}
return nil
}

// GetURL returns the url that the data processor can use when converting the data.
Expand Down
21 changes: 21 additions & 0 deletions pkg/importer/vddk-datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/md5"
"errors"
"net/url"
"os"
"os/exec"

libnbd "github.com/mrnold/go-libnbd"
Expand Down Expand Up @@ -46,6 +47,7 @@ var _ = Describe("VDDK data source", func() {
newVddkDataSink = createMockVddkDataSink
newVMwareClient = createMockVMwareClient
newNbdKitWrapper = createMockNbdKitWrapper
newTerminationChannel = createMockTerminationChannel
currentExport = defaultMockNbdExport()
currentVMwareFunctions = defaultMockVMwareFunctions()
})
Expand Down Expand Up @@ -320,6 +322,25 @@ var _ = Describe("VDDK data source", func() {
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("disk 'testdisk.vmdk' is not present in VM hardware config or snapshot list"))
})

It("should cancel transfer on SIGTERM", func() {
newVddkDataSource = createVddkDataSource
diskName := "testdisk.vmdk"

currentVMwareFunctions.Properties = func(ctx context.Context, ref types.ManagedObjectReference, property []string, result interface{}) error {
switch out := result.(type) {
case *mo.VirtualMachine:
if property[0] == "config.hardware.device" {
out.Config = createVirtualDiskConfig(diskName, 12345)
}
}
return nil
}
_, err := NewVDDKDataSource("http://vcenter.test", "user", "pass", "aa:bb:cc:dd", "1-2-3-4", diskName, "snapshot-1", "snapshot-2", "false", v1.PersistentVolumeFilesystem)
Expect(err).ToNot(HaveOccurred())
mockTerminationChannel <- os.Interrupt
Expect(err).ToNot(HaveOccurred())
})
})

type mockNbdOperations struct{}
Expand Down

0 comments on commit aa7a800

Please sign in to comment.