Skip to content

Commit

Permalink
Close importer data source before calling os.Exit. (kubevirt#1760)
Browse files Browse the repository at this point in the history
ImageIO transfers are especially sensitive to this, because disks remain
locked without the deferred Close if there are errors after an otherwise
successful transfer.

Signed-off-by: Matthew Arnold <[email protected]>
  • Loading branch information
mrnold authored Apr 21, 2021
1 parent 6a37c15 commit fceae7e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
7 changes: 6 additions & 1 deletion cmd/cdi-importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func main() {
finalCheckpoint, _ := util.ParseEnvVar(common.ImporterFinalCheckpoint, false)
preallocation, err := strconv.ParseBool(os.Getenv(common.Preallocation))
var preallocationApplied bool
var dp importer.DataSourceInterface

//Registry import currently support kubevirt content type only
if contentType != string(cdiv1.DataVolumeKubeVirt) && (source == controller.SourceRegistry || source == controller.SourceImageio) {
Expand Down Expand Up @@ -132,7 +133,6 @@ func main() {
os.Exit(1)
} else {
klog.V(1).Infoln("begin import process")
var dp importer.DataSourceInterface
switch source {
case controller.SourceHTTP:
dp, err = importer.NewHTTPDataSource(ep, acc, sec, certDir, cdiv1.DataVolumeContentType(contentType))
Expand Down Expand Up @@ -190,12 +190,14 @@ func main() {
if err != nil {
klog.Errorf("%+v", err)
if err == importer.ErrRequiresScratchSpace {
dp.Close()
os.Exit(common.ScratchSpaceNeededExitCode)
}
err = util.WriteTerminationMessage(fmt.Sprintf("Unable to process data: %+v", err))
if err != nil {
klog.Errorf("%+v", err)
}
dp.Close()
os.Exit(1)
}
preallocationApplied = processor.PreallocationApplied()
Expand All @@ -207,6 +209,9 @@ func main() {
err = util.WriteTerminationMessage(message)
if err != nil {
klog.Errorf("%+v", err)
if dp != nil {
dp.Close()
}
os.Exit(1)
}
klog.V(1).Infoln(message)
Expand Down
10 changes: 9 additions & 1 deletion pkg/importer/imageio-datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ func (is *ImageioDataSource) Close() error {
if itID, ok := is.imageTransfer.Id(); ok {
transfersService := is.connection.SystemService().ImageTransfersService()
_, err = transfersService.ImageTransferService(itID).Finalize().Send()
if err != nil {
_, cancelErr := transfersService.ImageTransferService(itID).Cancel().Send()
if cancelErr != nil {
klog.Errorf("Unable to finalize or cancel transfer! Disk may remain locked until inactivity timeout.\n%v\n%v", err, cancelErr)
}
}
}
}
if is.connection != nil {
Expand Down Expand Up @@ -241,7 +247,9 @@ func createImageioReader(ctx context.Context, ep string, accessKey string, secKe
func cancelTransfer(conn ConnectionInterface, it *ovirtsdk4.ImageTransfer) error {
var err error
if conn != nil && it != nil {
_, err = conn.SystemService().ImageTransfersService().ImageTransferService(it.MustId()).Cancel().Send()
if itID, ok := it.Id(); ok {
_, err = conn.SystemService().ImageTransfersService().ImageTransferService(itID).Cancel().Send()
}
}
return err
}
Expand Down
58 changes: 52 additions & 6 deletions pkg/importer/imageio-datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,24 +225,56 @@ var _ = Describe("Imageio pollprogress", func() {
})

var _ = Describe("Imageio cancel", func() {
It("should cancel transfer on SIGTERM", func() {
var (
ts *httptest.Server
tempDir string
err error
)

BeforeEach(func() {
newOvirtClientFunc = createMockOvirtClient
newTerminationChannel = createMockTerminationChannel
tempDir := createCert()
ts := createTestServer(imageDir)
tempDir = createCert()
ts = createTestServer(imageDir)
disk.SetTotalSize(1024)
disk.SetId("123")
it.SetPhase(ovirtsdk4.IMAGETRANSFERPHASE_TRANSFERRING)
it.SetTransferUrl(ts.URL)
it.SetId("123")
diskAvailable = true
diskCreateError = nil
})

_, err := NewImageioDataSource(ts.URL, "", "", tempDir, "")
AfterEach(func() {
newOvirtClientFunc = getOvirtClient
if tempDir != "" {
os.RemoveAll(tempDir)
}
ts.Close()
})

It("should cancel transfer on SIGTERM", func() {
_, err = NewImageioDataSource(ts.URL, "", "", tempDir, "")
Expect(err).ToNot(HaveOccurred())
mockTerminationChannel <- os.Interrupt
Expect(err).ToNot(HaveOccurred())
})

It("should cancel transfer when finalize fails", func() {
dp, err := NewImageioDataSource(ts.URL, "", "", tempDir, "")
Expect(err).ToNot(HaveOccurred())
cancelled := false
mockFinalizeHook = func() error {
return errors.New("Failing finalize")
}
mockCancelHook = func() error {
cancelled = true
return nil
}
err = dp.Close()
Expect(err).ToNot(HaveOccurred())
Expect(cancelled).To(Equal(true))
})
})

// MockOvirtClient is a mock minio client
Expand Down Expand Up @@ -306,6 +338,9 @@ func (conn *MockOvirtClient) ImageTransferService(string) ImageTransferServiceIn
}

func (conn *MockOvirtClient) Cancel() ImageTransferServiceCancelRequestInterface {
if mockCancelHook != nil {
mockCancelHook()
}
return &MockCancelService{
client: conn,
}
Expand All @@ -331,11 +366,19 @@ func (conn *MockAddService) Send() (ImageTransfersServiceAddResponseInterface, e
}

func (conn *MockCancelService) Send() (ImageTransferServiceCancelResponseInterface, error) {
return &MockImageTransferServiceCancelResponse{srv: nil}, nil
var err error
if mockCancelHook != nil {
err = mockCancelHook()
}
return &MockImageTransferServiceCancelResponse{srv: nil}, err
}

func (conn *MockFinalizeService) Send() (ImageTransferServiceFinalizeResponseInterface, error) {
return &MockImageTransferServiceFinalizeResponse{srv: nil}, nil
var err error
if mockFinalizeHook != nil {
err = mockFinalizeHook()
}
return &MockImageTransferServiceFinalizeResponse{srv: nil}, err
}

func (conn *MockImageTransfersServiceAddResponse) ImageTransfer() (*ovirtsdk4.ImageTransfer, bool) {
Expand All @@ -350,6 +393,9 @@ func (conn *MockOvirtClient) Close() error {
return nil
}

var mockCancelHook func() error
var mockFinalizeHook func() error

func failMockOvirtClient(ep string, accessKey string, secKey string) (ConnectionInterface, error) {
return nil, errors.New("Failed to create client")
}
Expand Down

0 comments on commit fceae7e

Please sign in to comment.