From 704536b48d7d93c3be83a05a9db3293d4f655d83 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 12 Oct 2022 11:24:38 -0700 Subject: [PATCH] ARROW-17853: [Python][CI] Timeout in test_dataset.py::test_write_dataset_s3_put_only (#14257) This reintroduces the dataset writer change with a fix that should prevent the deadlock. The python test was unique in that the "create directory" step failed. My change had made the files writers wait on this step and, if it failed, they were not being properly notified. Authored-by: Weston Pace Signed-off-by: Weston Pace --- cpp/src/arrow/dataset/dataset_writer.cc | 39 +++-- cpp/src/arrow/dataset/dataset_writer_test.cc | 12 ++ cpp/src/arrow/dataset/file_base.cc | 6 +- cpp/src/arrow/dataset/scan_node.cc | 31 ++-- cpp/src/arrow/util/async_generator_test.cc | 2 + cpp/src/arrow/util/async_util.cc | 124 ++++++++++---- cpp/src/arrow/util/async_util.h | 64 +++---- cpp/src/arrow/util/async_util_test.cc | 165 ++++++++++++++----- 8 files changed, 314 insertions(+), 129 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 844f1fe2698cf..220bdad23daf6 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -131,7 +131,8 @@ class DatasetWriterFileQueue { DatasetWriterState* writer_state) : options_(options), schema_(schema), writer_state_(writer_state) {} - void Start(util::AsyncTaskScheduler* scheduler, const std::string& filename) { + void Start(std::shared_ptr scheduler, + const std::string& filename) { scheduler_ = scheduler; // Because the scheduler runs one task at a time we know the writer will // be opened before any attempt to write @@ -212,7 +213,7 @@ class DatasetWriterFileQueue { // is a 1-task FIFO we know this task will run at the very end and can // add it now. scheduler_->AddSimpleTask([this] { return DoFinish(); }); - scheduler_->End(); + scheduler_.reset(); return Status::OK(); } @@ -247,7 +248,7 @@ class DatasetWriterFileQueue { // point they are merged together and added to write_queue_ std::deque> staged_batches_; uint64_t rows_currently_staged_ = 0; - util::AsyncTaskScheduler* scheduler_ = nullptr; + std::shared_ptr scheduler_ = nullptr; }; struct WriteTask { @@ -323,17 +324,17 @@ class DatasetWriterDirectoryQueue { util::AsyncTaskScheduler::MakeThrottle(1); util::AsyncTaskScheduler::Throttle* throttle_view = throttle.get(); auto file_finish_task = [self = this, file_queue = std::move(file_queue), - throttle = std::move(throttle)]() { + throttle = std::move(throttle)](Status) { self->writer_state_->open_files_throttle.Release(1); return Status::OK(); }; - util::AsyncTaskScheduler* file_scheduler = + std::shared_ptr file_scheduler = scheduler_->MakeSubScheduler(std::move(file_finish_task), throttle_view); - if (init_task_) { - file_scheduler->AddSimpleTask(init_task_); - init_task_ = {}; + if (init_future_.is_valid()) { + file_scheduler->AddSimpleTask( + [init_future = init_future_]() { return init_future; }); } - file_queue_view->Start(file_scheduler, filename); + file_queue_view->Start(std::move(file_scheduler), filename); return file_queue_view; } @@ -343,21 +344,33 @@ class DatasetWriterDirectoryQueue { if (directory_.empty() || !write_options_.create_dir) { return; } + init_future_ = Future<>::Make(); auto create_dir_cb = [this] { return DeferNotOk(write_options_.filesystem->io_context().executor()->Submit( [this]() { return write_options_.filesystem->CreateDir(directory_); })); }; + // We need to notify waiters whether the directory succeeded or failed. + auto notify_waiters_cb = [this] { init_future_.MarkFinished(); }; + auto notify_waiters_on_err_cb = [this](const Status& err) { + init_future_.MarkFinished(); + return err; + }; + std::function()> init_task; if (write_options_.existing_data_behavior == ExistingDataBehavior::kDeleteMatchingPartitions) { - init_task_ = [this, create_dir_cb] { + init_task = [this, create_dir_cb, notify_waiters_cb, notify_waiters_on_err_cb] { return write_options_.filesystem ->DeleteDirContentsAsync(directory_, /*missing_dir_ok=*/true) - .Then(create_dir_cb); + .Then(create_dir_cb) + .Then(notify_waiters_cb, notify_waiters_on_err_cb); }; } else { - init_task_ = [create_dir_cb] { return create_dir_cb(); }; + init_task = [create_dir_cb, notify_waiters_cb, notify_waiters_on_err_cb] { + return create_dir_cb().Then(notify_waiters_cb, notify_waiters_on_err_cb); + }; } + scheduler_->AddSimpleTask(std::move(init_task)); } static Result> Make( @@ -388,7 +401,7 @@ class DatasetWriterDirectoryQueue { std::shared_ptr schema_; const FileSystemDatasetWriteOptions& write_options_; DatasetWriterState* writer_state_; - std::function()> init_task_; + Future<> init_future_; std::string current_filename_; DatasetWriterFileQueue* latest_open_file_ = nullptr; uint64_t rows_written_ = 0; diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc b/cpp/src/arrow/dataset/dataset_writer_test.cc index 6c9c292739399..e27f98dc6ef60 100644 --- a/cpp/src/arrow/dataset/dataset_writer_test.cc +++ b/cpp/src/arrow/dataset/dataset_writer_test.cc @@ -234,6 +234,18 @@ TEST_F(DatasetWriterTestFixture, BasicFileDirectoryPrefix) { AssertFilesCreated({"testdir/a/1_chunk-0.arrow"}); } +TEST_F(DatasetWriterTestFixture, DirectoryCreateFails) { + // This should fail to be created + write_options_.base_dir = "///doesnotexist"; + EXPECT_OK_AND_ASSIGN(auto dataset_writer, + DatasetWriter::Make(write_options_, scheduler_.get())); + Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "a", "1_"); + AssertFinished(queue_fut); + ASSERT_OK(dataset_writer->Finish()); + scheduler_->End(); + ASSERT_FINISHES_AND_RAISES(Invalid, scheduler_->OnFinished()); +} + TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) { write_options_.max_rows_per_file = 10; write_options_.max_rows_per_group = 10; diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index a4aaaee99e914..bd19c99a52eef 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -484,7 +484,7 @@ class TeeNode : public compute::MapNode { util::AsyncTaskScheduler::MakeThrottle(1); util::AsyncTaskScheduler::Throttle* serial_throttle_view = serial_throttle.get(); serial_scheduler_ = plan_->async_scheduler()->MakeSubScheduler( - [owned_throttle = std::move(serial_throttle)]() { return Status::OK(); }, + [owned_throttle = std::move(serial_throttle)](Status) { return Status::OK(); }, serial_throttle_view); } @@ -519,7 +519,7 @@ class TeeNode : public compute::MapNode { MapNode::Finish(std::move(writer_finish_st)); return; } - serial_scheduler_->End(); + serial_scheduler_.reset(); MapNode::Finish(Status::OK()); } @@ -581,7 +581,7 @@ class TeeNode : public compute::MapNode { // We use a serial scheduler to submit tasks to the dataset writer. The dataset writer // only returns an unfinished future when it needs backpressure. Using a serial // scheduler here ensures we pause while we wait for backpressure to clear - util::AsyncTaskScheduler* serial_scheduler_; + std::shared_ptr serial_scheduler_; int32_t backpressure_counter_ = 0; }; diff --git a/cpp/src/arrow/dataset/scan_node.cc b/cpp/src/arrow/dataset/scan_node.cc index da397312b553e..31470bf988221 100644 --- a/cpp/src/arrow/dataset/scan_node.cc +++ b/cpp/src/arrow/dataset/scan_node.cc @@ -264,19 +264,22 @@ class ScanNode : public cp::ExecNode { ScanState* state_view = scan_state.get(); // Finish callback keeps the scan state alive until all scan tasks done struct StateHolder { - Status operator()() { return Status::OK(); } + Status operator()(Status) { return Status::OK(); } std::unique_ptr scan_state; }; - util::AsyncTaskScheduler* frag_scheduler = scan_scheduler->MakeSubScheduler( - StateHolder{std::move(scan_state)}, node->batches_throttle_.get()); + std::shared_ptr frag_scheduler = + scan_scheduler->MakeSubScheduler(StateHolder{std::move(scan_state)}, + node->batches_throttle_.get()); for (int i = 0; i < fragment_scanner->NumBatches(); i++) { node->num_batches_.fetch_add(1); frag_scheduler->AddTask(std::make_unique(node, state_view, i)); } Future<> list_and_scan_node = frag_scheduler->OnFinished(); - frag_scheduler->End(); // The "list fragments" task doesn't actually end until the fragments are // all scanned. This allows us to enforce fragment readahead. + if (--node->list_tasks_ == 0) { + node->scan_scheduler_.reset(); + } return list_and_scan_node; } @@ -313,8 +316,8 @@ class ScanNode : public cp::ExecNode { END_SPAN_ON_FUTURE_COMPLETION(span_, finished_); AsyncGenerator> frag_gen = GetFragments(options_.dataset.get(), options_.filter); - util::AsyncTaskScheduler* scan_scheduler = plan_->async_scheduler()->MakeSubScheduler( - [this]() { + scan_scheduler_ = plan_->async_scheduler()->MakeSubScheduler( + [this](Status st) { outputs_[0]->InputFinished(this, num_batches_.load()); finished_.MarkFinished(); return Status::OK(); @@ -322,12 +325,15 @@ class ScanNode : public cp::ExecNode { fragments_throttle_.get()); plan_->async_scheduler()->AddAsyncGenerator>( std::move(frag_gen), - [this, scan_scheduler](const std::shared_ptr& fragment) { - scan_scheduler->AddTask(std::make_unique(this, fragment)); + [this](const std::shared_ptr& fragment) { + list_tasks_++; + scan_scheduler_->AddTask(std::make_unique(this, fragment)); return Status::OK(); }, - [scan_scheduler]() { - scan_scheduler->End(); + [this](Status) { + if (--list_tasks_ == 0) { + scan_scheduler_.reset(); + } return Status::OK(); }); return Status::OK(); @@ -351,6 +357,11 @@ class ScanNode : public cp::ExecNode { private: ScanV2Options options_; std::atomic num_batches_{0}; + // TODO(ARROW-17509) list_tasks_, and scan_scheduler_ are just + // needed to figure out when to end scan_scheduler_. In the future, we should not need + // to call end and these variables can go away. + std::atomic list_tasks_{1}; + std::shared_ptr scan_scheduler_; std::unique_ptr fragments_throttle_; std::unique_ptr batches_throttle_; }; diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 4d86df34b8f92..37718f743fffc 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -705,6 +705,7 @@ TEST_P(MergedGeneratorTestFixture, MergedLimitedSubscriptions) { AssertGeneratorExhausted(merged); } +#ifndef ARROW_VALGRIND TEST_P(MergedGeneratorTestFixture, MergedStress) { constexpr int NGENERATORS = 10; constexpr int NITEMS = 10; @@ -739,6 +740,7 @@ TEST_P(MergedGeneratorTestFixture, MergedParallelStress) { ASSERT_EQ(NITEMS * NGENERATORS, items.size()); } } +#endif TEST_P(MergedGeneratorTestFixture, MergedRecursion) { // Regression test for an edge case in MergedGenerator. Ensure if diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index e11f299ba0789..d5b1388da8822 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -98,13 +98,51 @@ class FifoQueue : public AsyncTaskScheduler::Queue { std::list> tasks_; }; +class AlreadyFailedScheduler : public AsyncTaskScheduler { + public: + explicit AlreadyFailedScheduler(Status failure_reason, + FnOnce finish_callback) + : failure_reason_(std::move(failure_reason)), + finish_callback_(std::move(finish_callback)) {} + ~AlreadyFailedScheduler() override { + std::ignore = std::move(finish_callback_)(failure_reason_); + } + bool AddTask(std::unique_ptr task) override { return false; } + void End() override { + Status::UnknownError("Do not call End on a sub-scheduler.").Abort(); + } + Future<> OnFinished() const override { + Status::UnknownError( + "You should not rely on sub-scheduler's OnFinished. Use a " + "finished callback when creating the sub-scheduler instead") + .Abort(); + } + std::shared_ptr MakeSubScheduler( + FnOnce finish_callback, Throttle* throttle, + std::unique_ptr queue) override { + return AlreadyFailedScheduler::Make(failure_reason_, std::move(finish_callback)); + } + static std::unique_ptr Make( + Status failure, FnOnce finish_callback) { + DCHECK(!failure.ok()); + return std::make_unique(std::move(failure), + std::move(finish_callback)); + } + // This is deleted when ended so there is no possible way for this to return true + bool IsEnded() override { return false; } + + private: + Status failure_reason_; + FnOnce finish_callback_; +}; + class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { public: using Task = AsyncTaskScheduler::Task; using Queue = AsyncTaskScheduler::Queue; AsyncTaskSchedulerImpl(AsyncTaskSchedulerImpl* parent, std::unique_ptr queue, - Throttle* throttle, FnOnce finish_callback) + Throttle* throttle, FnOnce finish_callback) : AsyncTaskScheduler(), queue_(std::move(queue)), throttle_(throttle), @@ -128,6 +166,9 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { Status::UnknownError("AsyncTaskScheduler abandoned before completion"), std::move(lk)); } + if (state_ != State::kEnded) { + End(/*from_destructor=*/true); + } } finished_.Wait(); } @@ -172,16 +213,27 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { return true; } - AsyncTaskScheduler* MakeSubScheduler(FnOnce finish_callback, - Throttle* throttle, - std::unique_ptr queue) override { - std::unique_ptr owned_child = - std::make_unique(this, std::move(queue), throttle, - std::move(finish_callback)); - AsyncTaskScheduler* child = owned_child.get(); + bool IsEnded() override { + std::lock_guard lk(mutex_); + return state_ == State::kEnded; + } + + std::shared_ptr MakeSubScheduler( + FnOnce finish_callback, Throttle* throttle, + std::unique_ptr queue) override { + AsyncTaskSchedulerImpl* child; std::list>::iterator child_itr; { std::lock_guard lk(mutex_); + DCHECK_NE(state_, State::kEnded) + << "Attempt to create a sub-scheduler on an ended parent."; + if (state_ != State::kRunning) { + return AlreadyFailedScheduler::Make(maybe_error_, std::move(finish_callback)); + } + std::unique_ptr owned_child = + std::make_unique(this, std::move(queue), throttle, + std::move(finish_callback)); + child = owned_child.get(); running_tasks_++; sub_schedulers_.push_back(std::move(owned_child)); child_itr = --sub_schedulers_.end(); @@ -190,22 +242,17 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { struct Finalizer { void operator()(const Status& st) { std::unique_lock lk(self->mutex_); - FnOnce finish_callback; + FnOnce finish_callback = + std::move((*child_itr)->finish_callback_); + self->sub_schedulers_.erase(child_itr); + lk.unlock(); + Status finish_st = std::move(finish_callback)(st); + lk.lock(); + self->running_tasks_--; if (!st.ok()) { - self->running_tasks_--; self->AbortUnlocked(st, std::move(lk)); return; - } else { - // We only eagerly erase the sub-scheduler on a successful completion. This is - // because, if the sub-scheduler aborted, then the caller of MakeSubScheduler - // might still be planning to call End - finish_callback = std::move((*child_itr)->finish_callback_); - self->sub_schedulers_.erase(child_itr); } - lk.unlock(); - Status finish_st = std::move(finish_callback)(); - lk.lock(); - self->running_tasks_--; if (!finish_st.ok()) { self->AbortUnlocked(finish_st, std::move(lk)); return; @@ -221,14 +268,16 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { }; child->OnFinished().AddCallback(Finalizer{this, child_itr}); - return child; + return CreateEndingHolder(child); } - void End() override { - std::unique_lock lk(mutex_); - if (state_ == State::kAborted) { - return; + void End() override { End(/*from_destrutor=*/false); } + + void End(bool from_destructor) { + if (!from_destructor && finish_callback_) { + Status::UnknownError("Do not call End on a sub-scheduler.").Abort(); } + std::unique_lock lk(mutex_); state_ = State::kEnded; if (running_tasks_ == 0 && (!queue_ || queue_->Empty())) { lk.unlock(); @@ -239,6 +288,16 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { Future<> OnFinished() const override { return finished_; } private: + std::shared_ptr CreateEndingHolder( + AsyncTaskSchedulerImpl* target) { + struct SchedulerEnder { + void operator()(AsyncTaskSchedulerImpl* scheduler) { + scheduler->End(/*from_destructor=*/true); + } + }; + return std::shared_ptr(target, SchedulerEnder()); + } + void ContinueTasksUnlocked(std::unique_lock* lk) { while (!queue_->Empty()) { int next_cost = std::min(queue_->Peek().cost(), throttle_->Capacity()); @@ -267,8 +326,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { } bool IsFullyFinished() { - return state_ != State::kRunning && (!queue_ || queue_->Empty()) && - running_tasks_ == 0; + return state_ == State::kEnded && (!queue_ || queue_->Empty()) && running_tasks_ == 0; } bool OnTaskFinished(const Status& st, int task_cost) { @@ -307,8 +365,8 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { } Result> submit_result = (*task)(this); if (!submit_result.ok()) { - global_abort_->store(true); std::unique_lock lk(mutex_); + global_abort_->store(true); running_tasks_--; AbortUnlocked(submit_result.status(), std::move(lk)); return false; @@ -337,9 +395,11 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { maybe_error_ = st; } } - if (running_tasks_ == 0) { + if (running_tasks_ == 0 && state_ == State::kEnded) { + lk.unlock(); + finished_.MarkFinished(maybe_error_); + } else { lk.unlock(); - finished_.MarkFinished(std::move(maybe_error_)); } } @@ -353,7 +413,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { std::unique_ptr queue_; Throttle* throttle_; - FnOnce finish_callback_; + FnOnce finish_callback_; Future<> finished_ = Future<>::Make(); int running_tasks_ = 0; @@ -375,7 +435,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { std::unique_ptr AsyncTaskScheduler::Make( Throttle* throttle, std::unique_ptr queue) { return std::make_unique(nullptr, std::move(queue), throttle, - FnOnce()); + FnOnce()); } } // namespace util diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index e636eb805dec5..e0b6cfc699127 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -69,7 +69,7 @@ namespace util { /// the final task future. /// /// It is also possible to limit the number of concurrent tasks the scheduler will -/// execute. This is done by setting a task limit. The task limit initially assumes all +/// execute. This is done by setting a throttle. The throttle initially assumes all /// tasks are equal but a custom cost can be supplied when scheduling a task (e.g. based /// on the total I/O cost of the task, or the expected RAM utilization of the task) /// @@ -88,7 +88,7 @@ class ARROW_EXPORT AsyncTaskScheduler { /// Destructor for AsyncTaskScheduler /// /// If a scheduler is not in the ended state when it is destroyed then it - /// will enter an aborted state. + /// will abort with an error and enter the ended state. /// /// The destructor will block until all submitted tasks have finished. virtual ~AsyncTaskScheduler() = default; @@ -170,14 +170,14 @@ class ARROW_EXPORT AsyncTaskScheduler { /// If the scheduler is in an aborted state this call will return false and the task /// will never be run. This is harmless and does not need to be guarded against. /// - /// If the scheduler is in an ended state then this call will cause an abort. This - /// represents a logic error in the program and should be avoidable. + /// If the scheduler is in an ended state then this call will cause an program abort. + /// This represents a logic error in the program and should be avoidable. /// /// If there are no limits on the number of concurrent tasks then the submit function /// will be run immediately. /// - /// Otherwise, if there is a limit to the number of concurrent tasks, then this task - /// will be inserted into the scheduler's queue and submitted when there is space. + /// Otherwise, if there is a throttle, and it is full, then this task will be inserted + /// into the scheduler's queue and submitted when there is space. /// /// The return value for this call can usually be ignored. There is little harm in /// attempting to add tasks to an aborted scheduler. It is only included for callers @@ -204,39 +204,41 @@ class ARROW_EXPORT AsyncTaskScheduler { template bool AddAsyncGenerator(std::function()> generator, std::function visitor, - FnOnce finish_callback) { - AsyncTaskScheduler* generator_scheduler = + FnOnce finish_callback) { + std::shared_ptr generator_scheduler = MakeSubScheduler(std::move(finish_callback)); struct State { - State(std::function()> generator, std::function visitor) - : generator(std::move(generator)), visitor(std::move(visitor)) {} + State(std::function()> generator, std::function visitor, + std::shared_ptr scheduler) + : generator(std::move(generator)), + visitor(std::move(visitor)), + scheduler(std::move(scheduler)) {} std::function()> generator; std::function visitor; + std::shared_ptr scheduler; }; - std::unique_ptr state_holder = - std::make_unique(std::move(generator), std::move(visitor)); + std::unique_ptr state_holder = std::make_unique( + std::move(generator), std::move(visitor), generator_scheduler); struct SubmitTask : public Task { explicit SubmitTask(std::unique_ptr state_holder) : state_holder(std::move(state_holder)) {} struct SubmitTaskCallback { - SubmitTaskCallback(AsyncTaskScheduler* scheduler, - std::unique_ptr state_holder) - : scheduler(scheduler), state_holder(std::move(state_holder)) {} + explicit SubmitTaskCallback(std::unique_ptr state_holder) + : state_holder(std::move(state_holder)) {} Status operator()(const T& item) { if (IsIterationEnd(item)) { - scheduler->End(); return Status::OK(); } ARROW_RETURN_NOT_OK(state_holder->visitor(item)); - scheduler->AddTask(std::make_unique(std::move(state_holder))); + state_holder->scheduler->AddTask( + std::make_unique(std::move(state_holder))); return Status::OK(); } - AsyncTaskScheduler* scheduler; std::unique_ptr state_holder; }; Result> operator()(AsyncTaskScheduler* scheduler) { Future next = state_holder->generator(); - return next.Then(SubmitTaskCallback(scheduler, std::move(state_holder))); + return next.Then(SubmitTaskCallback(std::move(state_holder))); } std::unique_ptr state_holder; }; @@ -259,7 +261,8 @@ class ARROW_EXPORT AsyncTaskScheduler { } /// Signal that tasks are done being added /// - /// If the scheduler is in an aborted state then this call will have no effect. + /// If the scheduler is in an aborted state then this call will have no effect + /// except (if there are no running tasks) potentially finishing the scheduler. /// /// Otherwise, this will transition the scheduler into the ended state. Once all /// remaining tasks have finished the OnFinished future will be marked completed. @@ -268,13 +271,6 @@ class ARROW_EXPORT AsyncTaskScheduler { /// attempt to do so will cause an abort. virtual void End() = 0; /// A future that will be finished after End is called and all tasks have completed - /// - /// This is the same future that is returned by End() but calling this method does - /// not indicate that top level tasks are done being added. End() must still be called - /// at some point or the future returned will never finish. - /// - /// This is a utility method for workflows where the finish future needs to be - /// referenced before all top level tasks have been queued. virtual Future<> OnFinished() const = 0; /// Create a sub-scheduler for tracking a subset of tasks @@ -292,15 +288,16 @@ class ARROW_EXPORT AsyncTaskScheduler { /// /// If either the parent scheduler or the sub-scheduler encounter an error /// then they will both enter an aborted state (this is a shared state). - /// Finish callbacks will not be run when the scheduler is aborted. + /// Finish callbacks will always be run and only when the sub-scheduler + /// has been ended and all ongoing tasks completed. /// /// The parent scheduler will not complete until the sub-scheduler's /// tasks (and finish callback) have all executed. /// /// A sub-scheduler can share the same throttle as its parent but it /// can also have its own unique throttle. - virtual AsyncTaskScheduler* MakeSubScheduler( - FnOnce finish_callback, Throttle* throttle = NULLPTR, + virtual std::shared_ptr MakeSubScheduler( + FnOnce finish_callback, Throttle* throttle = NULLPTR, std::unique_ptr queue = NULLPTR) = 0; /// Construct a scheduler @@ -311,6 +308,13 @@ class ARROW_EXPORT AsyncTaskScheduler { /// The default (nullptr) will use a FIFO queue if there is a throttle. static std::unique_ptr Make(Throttle* throttle = NULLPTR, std::unique_ptr queue = NULLPTR); + + /// Check to see if the scheduler is currently ended + /// + /// This method is primarily for testing purposes and won't normally need to be + /// called to use the scheduler. Note that a return value of false is not conclusive as + /// the scheduler may end immediately after the call. + virtual bool IsEnded() = 0; }; } // namespace util diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index ea03b5431dffd..5131a467c6f4a 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -82,11 +82,11 @@ TEST(AsyncTaskScheduler, Abandoned) { // submit any pending tasks. bool submitted_task_finished = false; bool pending_task_submitted = false; + AsyncTaskScheduler* weak_scheduler; std::unique_ptr throttle = AsyncTaskScheduler::MakeThrottle(1); Future<> finished_fut; Future<> first_task = Future<>::Make(); - AsyncTaskScheduler* weak_scheduler; std::thread delete_scheduler_thread; { std::unique_ptr scheduler = @@ -113,10 +113,7 @@ TEST(AsyncTaskScheduler, Abandoned) { } // Here we are waiting for the scheduler to enter aborted state. Once aborted the // scheduler will no longer accept new tasks and will return false. - BusyWait(10, [&] { - SleepABit(); - return !weak_scheduler->AddSimpleTask([] { return Future<>::MakeFinished(); }); - }); + BusyWait(10, [&] { return weak_scheduler->IsEnded(); }); // Now that the scheduler deletion is in progress we should be able to finish the // first task and be confident the second task should not be submitted. first_task.MarkFinished(); @@ -157,56 +154,138 @@ TEST(AsyncTaskScheduler, TaskFailsAfterEnd) { ASSERT_FINISHES_AND_RAISES(Invalid, scheduler->OnFinished()); } +FnOnce EmptyFinishCallback() { + return [](Status) { return Status::OK(); }; +} + +#ifndef ARROW_VALGRIND +TEST(AsyncTaskScheduler, FailingTaskStress) { + // Test many tasks failing at the same time + constexpr int kNumTasks = 256; + for (int i = 0; i < kNumTasks; i++) { + std::unique_ptr scheduler = AsyncTaskScheduler::Make(); + ASSERT_TRUE(scheduler->AddSimpleTask([] { return SleepABitAsync(); })); + ASSERT_TRUE(scheduler->AddSimpleTask( + [] { return SleepABitAsync().Then([]() { return Status::Invalid("XYZ"); }); })); + scheduler->End(); + ASSERT_FINISHES_AND_RAISES(Invalid, scheduler->OnFinished()); + } + for (int i = 0; i < kNumTasks; i++) { + std::unique_ptr scheduler = AsyncTaskScheduler::Make(); + { + std::shared_ptr sub_scheduler = + scheduler->MakeSubScheduler(EmptyFinishCallback()); + ASSERT_TRUE(sub_scheduler->AddSimpleTask([] { return SleepABitAsync(); })); + ASSERT_TRUE(sub_scheduler->AddSimpleTask( + [] { return SleepABitAsync().Then([]() { return Status::Invalid("XYZ"); }); })); + } + scheduler->End(); + ASSERT_FINISHES_AND_RAISES(Invalid, scheduler->OnFinished()); + } + // Test many schedulers failing at the same time (also a mixture of failing due + // to global abort racing with local task failure) + constexpr int kNumSchedulers = 32; + for (int i = 0; i < kNumTasks; i++) { + std::unique_ptr scheduler = AsyncTaskScheduler::Make(); + std::vector> tests; + for (int i = 0; i < kNumSchedulers; i++) { + tests.push_back(SleepABitAsync().Then([&] { + std::shared_ptr sub_scheduler = + scheduler->MakeSubScheduler(EmptyFinishCallback()); + std::ignore = sub_scheduler->AddSimpleTask([] { + return SleepABitAsync().Then([]() { return Status::Invalid("XYZ"); }); + }); + })); + } + AllComplete(std::move(tests)).Wait(); + scheduler->End(); + ASSERT_FINISHES_AND_RAISES(Invalid, scheduler->OnFinished()); + } +} +#endif + TEST(AsyncTaskScheduler, SubSchedulerFinishCallback) { bool finish_callback_ran = false; std::unique_ptr scheduler = AsyncTaskScheduler::Make(); - AsyncTaskScheduler* sub_scheduler = scheduler->MakeSubScheduler([&] { - finish_callback_ran = true; - return Status::OK(); - }); - ASSERT_FALSE(finish_callback_ran); - sub_scheduler->End(); + { + std::shared_ptr sub_scheduler = + scheduler->MakeSubScheduler([&](Status) { + finish_callback_ran = true; + return Status::OK(); + }); + ASSERT_FALSE(finish_callback_ran); + } ASSERT_TRUE(finish_callback_ran); scheduler->End(); ASSERT_FINISHES_OK(scheduler->OnFinished()); + + // Finish callback should run even if sub scheduler never starts any tasks + finish_callback_ran = false; + scheduler = AsyncTaskScheduler::Make(); + ASSERT_TRUE(scheduler->AddSimpleTask([] { return Status::Invalid("XYZ"); })); + { + std::shared_ptr sub_scheduler = + scheduler->MakeSubScheduler([&](Status) { + finish_callback_ran = true; + return Status::OK(); + }); + } + scheduler->End(); + ASSERT_FINISHES_AND_RAISES(Invalid, scheduler->OnFinished()); + ASSERT_TRUE(finish_callback_ran); + + // Finish callback should run even if scheduler aborts + finish_callback_ran = false; + scheduler = AsyncTaskScheduler::Make(); + { + std::shared_ptr sub_scheduler = + scheduler->MakeSubScheduler([&](Status) { + finish_callback_ran = true; + return Status::OK(); + }); + + ASSERT_TRUE(sub_scheduler->AddSimpleTask([] { return Status::Invalid("XYZ"); })); + } + ASSERT_TRUE(finish_callback_ran); + scheduler->End(); + ASSERT_FINISHES_AND_RAISES(Invalid, scheduler->OnFinished()); } TEST(AsyncTaskScheduler, SubSchedulerFinishAbort) { bool finish_callback_ran = false; std::unique_ptr scheduler = AsyncTaskScheduler::Make(); - AsyncTaskScheduler* sub_scheduler = scheduler->MakeSubScheduler([&] { - finish_callback_ran = true; - return Status::Invalid("XYZ"); - }); - ASSERT_FALSE(finish_callback_ran); - sub_scheduler->End(); + { + std::shared_ptr sub_scheduler = + scheduler->MakeSubScheduler([&](Status) { + finish_callback_ran = true; + return Status::Invalid("XYZ"); + }); + ASSERT_FALSE(finish_callback_ran); + } ASSERT_TRUE(finish_callback_ran); scheduler->End(); ASSERT_FINISHES_AND_RAISES(Invalid, scheduler->OnFinished()); } -FnOnce EmptyFinishCallback() { - return [] { return Status::OK(); }; -} - TEST(AsyncTaskScheduler, SubSchedulerNoticesParentAbort) { std::unique_ptr parent = AsyncTaskScheduler::Make(); std::unique_ptr child_throttle = AsyncTaskScheduler::MakeThrottle(1); - AsyncTaskScheduler* child = - parent->MakeSubScheduler(EmptyFinishCallback(), child_throttle.get()); - - Future<> task = Future<>::Make(); - bool was_submitted = false; - ASSERT_TRUE(child->AddSimpleTask([task] { return task; })); - ASSERT_TRUE(child->AddSimpleTask([&was_submitted] { - was_submitted = true; - return Future<>::MakeFinished(); - })); - ASSERT_TRUE(parent->AddSimpleTask([] { return Status::Invalid("XYZ"); })); - ASSERT_FALSE(child->AddSimpleTask([task] { return task; })); - task.MarkFinished(); - child->End(); + { + std::shared_ptr child = + parent->MakeSubScheduler(EmptyFinishCallback(), child_throttle.get()); + + Future<> task = Future<>::Make(); + bool was_submitted = false; + ASSERT_TRUE(child->AddSimpleTask([task] { return task; })); + ASSERT_TRUE(child->AddSimpleTask([&was_submitted] { + was_submitted = true; + return Future<>::MakeFinished(); + })); + ASSERT_TRUE(parent->AddSimpleTask([] { return Status::Invalid("XYZ"); })); + ASSERT_FALSE(child->AddSimpleTask([task] { return task; })); + task.MarkFinished(); + } parent->End(); ASSERT_FINISHES_AND_RAISES(Invalid, parent->OnFinished()); } @@ -215,10 +294,12 @@ TEST(AsyncTaskScheduler, SubSchedulerNoTasks) { // An unended sub-scheduler should keep the parent scheduler unfinished even if there // there are no tasks. std::unique_ptr parent = AsyncTaskScheduler::Make(); - AsyncTaskScheduler* child = parent->MakeSubScheduler(EmptyFinishCallback()); - parent->End(); - AssertNotFinished(parent->OnFinished()); - child->End(); + { + std::shared_ptr child = + parent->MakeSubScheduler(EmptyFinishCallback()); + parent->End(); + AssertNotFinished(parent->OnFinished()); + } ASSERT_FINISHES_OK(parent->OnFinished()); } @@ -389,6 +470,7 @@ TEST(AsyncTaskScheduler, PurgeUnsubmitted) { ASSERT_FALSE(was_submitted); } +#ifndef ARROW_VALGRIND TEST(AsyncTaskScheduler, FifoStress) { // Regresses an issue where adding a task, when the throttle was // just cleared, could lead to the added task being run immediately, @@ -410,6 +492,7 @@ TEST(AsyncTaskScheduler, FifoStress) { EXPECT_TRUE(middle_task_run); return Future<>::MakeFinished(); }); + task_group->End(); } } @@ -459,12 +542,11 @@ TEST(AsyncTaskScheduler, ScanningStress) { auto scan_batch = [&] { batches_scanned++; }; auto submit_scan = [&]() { return SleepABitAsync().Then(scan_batch); }; auto list_fragment = [&]() { - AsyncTaskScheduler* batch_scheduler = + std::shared_ptr batch_scheduler = listing_scheduler->MakeSubScheduler(EmptyFinishCallback(), batch_limit.get()); for (int i = 0; i < kBatchesPerFragment; i++) { ASSERT_TRUE(batch_scheduler->AddSimpleTask(submit_scan)); } - batch_scheduler->End(); if (++fragments_scanned == kNumFragments) { listing_scheduler->End(); } @@ -477,6 +559,7 @@ TEST(AsyncTaskScheduler, ScanningStress) { ASSERT_EQ(kExpectedBatchesScanned, batches_scanned.load()); } } +#endif class TaskWithPriority : public AsyncTaskScheduler::Task { public: