From feb0cacddc49f4833b6440145cc60d9baa8b25e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Szymon=20Karpi=C5=84ski?= <34919255+szkarpinski@users.noreply.github.com> Date: Mon, 11 Dec 2023 10:37:49 +0100 Subject: [PATCH] Support checkpointing in Numpy reader (#5198) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds checkpointing support to FileLoader and fn.readers.numpy reader. Signed-off-by: Szymon KarpiƄski --- dali/operators/reader/fits_reader_gpu_op.h | 4 +- dali/operators/reader/fits_reader_op.h | 25 ++++-- dali/operators/reader/loader/file_loader.h | 48 ++++++++---- dali/operators/reader/loader/numpy_loader.cc | 6 +- dali/operators/reader/loader/numpy_loader.h | 3 +- dali/operators/reader/numpy_reader_gpu_op.cc | 5 +- dali/operators/reader/numpy_reader_gpu_op.h | 4 +- dali/operators/reader/numpy_reader_op.cc | 4 +- dali/operators/reader/numpy_reader_op.h | 17 ++-- .../checkpointing/test_dali_checkpointing.py | 77 +++++++++++++++++++ 10 files changed, 151 insertions(+), 42 deletions(-) diff --git a/dali/operators/reader/fits_reader_gpu_op.h b/dali/operators/reader/fits_reader_gpu_op.h index b619b031ae8..292a8bf6cb8 100644 --- a/dali/operators/reader/fits_reader_gpu_op.h +++ b/dali/operators/reader/fits_reader_gpu_op.h @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -35,7 +35,7 @@ class FitsReaderGPU : public FitsReader { using Operator::RunImpl; private: - USE_READER_OPERATOR_MEMBERS(GPUBackend, FitsFileWrapperGPU); + USE_READER_OPERATOR_MEMBERS(GPUBackend, FitsFileWrapperGPU, FitsFileWrapperGPU, true); }; } // namespace dali diff --git a/dali/operators/reader/fits_reader_op.h b/dali/operators/reader/fits_reader_op.h index 25cedb797df..b41b2c3e60b 100644 --- a/dali/operators/reader/fits_reader_op.h +++ b/dali/operators/reader/fits_reader_op.h @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -27,22 +27,31 @@ namespace dali { template -class FitsReader : public DataReader { +class FitsReader : public DataReader { public: - explicit FitsReader(const OpSpec& spec) : DataReader(spec) {} + explicit FitsReader(const OpSpec& spec) : DataReader(spec) {} bool CanInferOutputs() const override { return true; } - USE_READER_OPERATOR_MEMBERS(Backend, Target); - using DataReader::GetCurrBatchSize; - using DataReader::GetSample; + // TODO(skarpinski) Debug fits reader and add checkpointing support + void SaveState(OpCheckpoint &cpt, AccessOrder order) override { + DALI_FAIL("Fits reader does not support checkpointing."); + } + + void RestoreState(const OpCheckpoint &cpt) override { + DALI_FAIL("Fits reader does not support checkpointing."); + } + + USE_READER_OPERATOR_MEMBERS(Backend, Target, Target, true); + using DataReader::GetCurrBatchSize; + using DataReader::GetSample; using Operator::spec_; bool SetupImpl(std::vector& output_desc, const Workspace& ws) override { // If necessary start prefetching thread and wait for a consumable batch - DataReader::SetupImpl(output_desc, ws); + DataReader::SetupImpl(output_desc, ws); int num_outputs = ws.NumOutput(); int num_samples = GetCurrBatchSize(); // samples here are synonymous with files @@ -103,7 +112,7 @@ class FitsReaderCPU : public FitsReader { using Operator::RunImpl; private: - USE_READER_OPERATOR_MEMBERS(CPUBackend, FitsFileWrapper); + USE_READER_OPERATOR_MEMBERS(CPUBackend, FitsFileWrapper, FitsFileWrapper, true); }; } // namespace dali diff --git a/dali/operators/reader/loader/file_loader.h b/dali/operators/reader/loader/file_loader.h index 333cbe87f76..b7da55ea2a2 100755 --- a/dali/operators/reader/loader/file_loader.h +++ b/dali/operators/reader/loader/file_loader.h @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2021, 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -41,10 +41,10 @@ struct FileWrapper { template -class FileLoader : public Loader { +class FileLoader : public Loader { public: FileLoader(const OpSpec &spec, bool shuffle_after_epoch) - : Loader(spec), + : Loader(spec), file_filter_(spec.GetArgument("file_filter")), shuffle_after_epoch_(shuffle_after_epoch), current_index_(0), @@ -125,6 +125,9 @@ class FileLoader : public Loader { } DALI_ENFORCE(SizeImpl() > 0, "No files found."); + if (IsCheckpointingEnabled()) { + backup_files_ = files_; + } if (shuffle_) { // seeded with hardcoded value to get // the same sequence on every shard @@ -136,7 +139,7 @@ class FileLoader : public Loader { void Reset(bool wrap_to_shard) override { if (wrap_to_shard) { - current_index_ = start_index(shard_id_, num_shards_, SizeImpl()); + current_index_ = start_index(virtual_shard_id_, num_shards_, SizeImpl()); } else { current_index_ = 0; } @@ -144,26 +147,39 @@ class FileLoader : public Loader { current_epoch_++; if (shuffle_after_epoch_) { + if (IsCheckpointingEnabled()) { + // With checkpointing enabled dataset order must be easy to restore. + // Shuffling is run with different seed every epoch, so this doesn't + // reduce the randomness. + files_ = backup_files_; + } std::mt19937 g(kDaliDataloaderSeed + current_epoch_); std::shuffle(files_.begin(), files_.end(), g); } } - using Loader::shard_id_; - using Loader::num_shards_; - using Loader::stick_to_shard_; - using Loader::shuffle_; - using Loader::dont_use_mmap_; - using Loader::initial_buffer_fill_; - using Loader::copy_read_data_; - using Loader::read_ahead_; - using Loader::MoveToNextShard; - using Loader::ShouldSkipImage; - using Loader::Size; - using Loader::PrepareEmptyTensor; + void RestoreStateImpl(const LoaderStateSnapshot &state) override { + current_epoch_ = state.current_epoch; + } + + using Loader::shard_id_; + using Loader::virtual_shard_id_; + using Loader::num_shards_; + using Loader::stick_to_shard_; + using Loader::shuffle_; + using Loader::dont_use_mmap_; + using Loader::initial_buffer_fill_; + using Loader::copy_read_data_; + using Loader::read_ahead_; + using Loader::MoveToNextShard; + using Loader::ShouldSkipImage; + using Loader::Size; + using Loader::PrepareEmptyTensor; + using Loader::IsCheckpointingEnabled; string file_list_, file_root_, file_filter_; vector files_; + vector backup_files_; bool has_files_arg_ = false; bool has_file_list_arg_ = false; diff --git a/dali/operators/reader/loader/numpy_loader.cc b/dali/operators/reader/loader/numpy_loader.cc index 3c1f15360f3..ecbaf5ee629 100644 --- a/dali/operators/reader/loader/numpy_loader.cc +++ b/dali/operators/reader/loader/numpy_loader.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -115,4 +115,8 @@ void NumpyLoader::ReadSample(NumpyFileWrapper& target) { target.fortran_order = header.fortran_order; } +void NumpyLoader::Skip() { + MoveToNextShard(++current_index_); +} + } // namespace dali diff --git a/dali/operators/reader/loader/numpy_loader.h b/dali/operators/reader/loader/numpy_loader.h index 0961786e36a..b627d0bcf79 100755 --- a/dali/operators/reader/loader/numpy_loader.h +++ b/dali/operators/reader/loader/numpy_loader.h @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -100,6 +100,7 @@ class NumpyLoader : public FileLoader { // we want to make it possible to override this function as well void ReadSample(NumpyFileWrapper& target) override; + void Skip() override; private: detail::NumpyHeaderCache header_cache_; diff --git a/dali/operators/reader/numpy_reader_gpu_op.cc b/dali/operators/reader/numpy_reader_gpu_op.cc index 83e1e65028e..245a300a7f7 100644 --- a/dali/operators/reader/numpy_reader_gpu_op.cc +++ b/dali/operators/reader/numpy_reader_gpu_op.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ NumpyReaderGPU::NumpyReaderGPU(const OpSpec& spec) // init loader bool shuffle_after_epoch = spec.GetArgument("shuffle_after_epoch"); loader_ = InitLoader(spec, std::vector(), shuffle_after_epoch); + this->SetInitialSnapshot(); kmgr_transpose_.Resize(1); } @@ -54,7 +55,7 @@ void NumpyReaderGPU::Prefetch() { // We actually prepare the next batch DomainTimeRange tr("[DALI][NumpyReaderGPU] Prefetch #" + to_string(curr_batch_producer_), DomainTimeRange::kRed); - DataReader::Prefetch(); + DataReader::Prefetch(); auto &curr_batch = prefetched_batch_queue_[curr_batch_producer_]; auto &curr_tensor_list = prefetched_batch_tensors_[curr_batch_producer_]; diff --git a/dali/operators/reader/numpy_reader_gpu_op.h b/dali/operators/reader/numpy_reader_gpu_op.h index c39a32d5454..860d8ae832f 100644 --- a/dali/operators/reader/numpy_reader_gpu_op.h +++ b/dali/operators/reader/numpy_reader_gpu_op.h @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -70,7 +70,7 @@ class NumpyReaderGPU : gds::GDSLazyInit, public NumpyReader::RunImpl; - USE_READER_OPERATOR_MEMBERS(GPUBackend, NumpyFileWrapperGPU); + USE_READER_OPERATOR_MEMBERS(GPUBackend, NumpyFileWrapperGPU, NumpyFileWrapperGPU, true); private: using TransposeKernel = kernels::TransposeGPU; diff --git a/dali/operators/reader/numpy_reader_op.cc b/dali/operators/reader/numpy_reader_op.cc index 1b4eeb52df4..e86c7e4a5ad 100644 --- a/dali/operators/reader/numpy_reader_op.cc +++ b/dali/operators/reader/numpy_reader_op.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -244,7 +244,7 @@ void NumpyReaderCPU::Prefetch() { // We actually prepare the next batch DomainTimeRange tr("[DALI][NumpyReaderCPU] Prefetch #" + to_string(curr_batch_producer_), DomainTimeRange::kRed); - DataReader::Prefetch(); + DataReader::Prefetch(); if (!dont_use_mmap_) return; diff --git a/dali/operators/reader/numpy_reader_op.h b/dali/operators/reader/numpy_reader_op.h index e4fa2589327..318c5c528ee 100644 --- a/dali/operators/reader/numpy_reader_op.h +++ b/dali/operators/reader/numpy_reader_op.h @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -32,10 +32,10 @@ namespace dali { template -class NumpyReader : public DataReader { +class NumpyReader : public DataReader { public: explicit NumpyReader(const OpSpec& spec) - : DataReader(spec), + : DataReader(spec), slice_attr_(spec, "roi_start", "rel_roi_start", "roi_end", "rel_roi_end", "roi_shape", "rel_roi_shape", "roi_axes", nullptr) { out_of_bounds_policy_ = GetOutOfBoundsPolicy(spec); @@ -48,14 +48,14 @@ class NumpyReader : public DataReader { return true; } - USE_READER_OPERATOR_MEMBERS(Backend, Target); - using DataReader::GetCurrBatchSize; - using DataReader::GetSample; + USE_READER_OPERATOR_MEMBERS(Backend, Target, Target, true); + using DataReader::GetCurrBatchSize; + using DataReader::GetSample; using Operator::spec_; bool SetupImpl(std::vector& output_desc, const Workspace &ws) override { // If necessary start prefetching thread and wait for a consumable batch - DataReader::SetupImpl(output_desc, ws); + DataReader::SetupImpl(output_desc, ws); int batch_size = GetCurrBatchSize(); const auto& file_0 = GetSample(0); @@ -167,6 +167,7 @@ class NumpyReaderCPU : public NumpyReader { } loader_ = InitLoader(spec, shuffle_after_epoch, use_o_direct_, o_direct_alignm_, o_direct_read_len_alignm_); + this->SetInitialSnapshot(); } ~NumpyReaderCPU() override; void Prefetch() override; @@ -176,7 +177,7 @@ class NumpyReaderCPU : public NumpyReader { using Operator::RunImpl; private: - USE_READER_OPERATOR_MEMBERS(CPUBackend, NumpyFileWrapper); + USE_READER_OPERATOR_MEMBERS(CPUBackend, NumpyFileWrapper, NumpyFileWrapper, true); bool dont_use_mmap_ = false; bool use_o_direct_ = false; diff --git a/dali/test/python/checkpointing/test_dali_checkpointing.py b/dali/test/python/checkpointing/test_dali_checkpointing.py index 6d9a6716973..deffa122f73 100644 --- a/dali/test/python/checkpointing/test_dali_checkpointing.py +++ b/dali/test/python/checkpointing/test_dali_checkpointing.py @@ -16,6 +16,7 @@ import nvidia.dali.fn as fn import nvidia.dali.types as types import os +import shutil import webdataset_base import numpy as np from nvidia.dali.pipeline import pipeline_def @@ -25,6 +26,7 @@ from nose.plugins.attrib import attr from dataclasses import dataclass from nvidia.dali import tfrecord as tfrec +from reader.test_numpy import is_gds_supported data_root = get_dali_extra_path() images_dir = os.path.join(data_root, "db", "single", "jpeg") @@ -568,6 +570,81 @@ def test_nemo_asr_reader( manifest.close() +# device, +# num_epochs, batch_size, shard_id, num_shards, +# random_shuffle, shuffle_after_epoch, stick_to_shard, pad_last_batch, +# iters_into_epoch, initial_fill +@params( + ("cpu", 0, 1, 0, 1, False, False, False, False, None), + ("cpu", 5, 2, 4, 7, False, False, False, True, 1), + ("cpu", 4, 4, 0, 2, False, False, True, False, 2), + ("cpu", 3, 8, 4, 6, False, False, True, True, 3), + ("cpu", 6, 1, 2, 3, False, True, False, False, 4), + ("cpu", 5, 2, 2, 5, False, True, False, True, 3), + ("cpu", 4, 4, 3, 4, True, False, False, False, 2), + ("cpu", 3, 8, 1, 4, True, False, False, True, 1), + ("cpu", 2, 1, 1, 2, True, False, True, False, None), + ("cpu", 0, 2, 0, 1, True, False, True, True, 2), + *( + [ + ("gpu", 2, 1, 1, 2, False, False, False, False, None), + ("gpu", 5, 2, 0, 5, False, False, False, True, 1), + ("gpu", 3, 4, 2, 3, False, False, True, False, 2), + ("gpu", 6, 8, 3, 5, False, False, True, True, 3), + ("gpu", 7, 1, 1, 4, False, True, False, False, 4), + ("gpu", 3, 2, 2, 4, False, True, False, True, 3), + ("gpu", 3, 4, 2, 5, True, False, False, False, 2), + ("gpu", 4, 8, 0, 2, True, False, False, True, 1), + ("gpu", 1, 1, 2, 3, True, False, True, False, None), + ("gpu", 0, 2, 0, 2, True, False, True, True, 2), + ] + if is_gds_supported() + else [] + ), +) +def test_numpy_reader( + device, + num_epochs, + batch_size, + shard_id, + num_shards, + random_shuffle, + shuffle_after_epoch, + stick_to_shard, + pad_last_batch, + iters_into_epoch=None, + initial_fill=1024, +): + numpy_dir = os.path.join(data_root, "db", "3D", "MRI", "Knee", "npy_2d_slices", "STU00001") + + # GDS doesn't support overlayfs, so we need to use runner's scratch + gds_data_root = "/scratch/" + if not os.path.isdir(gds_data_root): + gds_data_root = os.getcwd() + "/scratch/" + if not os.path.isdir(gds_data_root): + os.mkdir(gds_data_root) + assert os.path.isdir(gds_data_root) + + with tempfile.TemporaryDirectory(prefix=gds_data_root) as test_data_root: + shutil.copytree(numpy_dir, os.path.join(test_data_root, "numpy")) + + check_reader_checkpointing( + fn.readers.numpy, + num_epochs, + batch_size, + iters_into_epoch, + device=device, + file_root=os.path.join(test_data_root, "numpy"), + pad_last_batch=pad_last_batch, + random_shuffle=random_shuffle, + shuffle_after_epoch=shuffle_after_epoch, + shard_id=shard_id, + num_shards=num_shards, + stick_to_shard=stick_to_shard, + initial_fill=initial_fill, + ) + + @attr("pytorch") @params( (1, 3, 0, 1, True, False, False),