Skip to content

Commit

Permalink
ARROW-17853: [Python][CI] Timeout in test_dataset.py::test_write_data…
Browse files Browse the repository at this point in the history
…set_s3_put_only (apache#14257)

This reintroduces the dataset writer change with a fix that should prevent the deadlock.  The python test was unique in that the "create directory" step failed.  My change had made the files writers wait on this step and, if it failed, they were not being properly notified.

Authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
  • Loading branch information
westonpace authored Oct 12, 2022
1 parent 20626f8 commit 704536b
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 129 deletions.
39 changes: 26 additions & 13 deletions cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ class DatasetWriterFileQueue {
DatasetWriterState* writer_state)
: options_(options), schema_(schema), writer_state_(writer_state) {}

void Start(util::AsyncTaskScheduler* scheduler, const std::string& filename) {
void Start(std::shared_ptr<util::AsyncTaskScheduler> scheduler,
const std::string& filename) {
scheduler_ = scheduler;
// Because the scheduler runs one task at a time we know the writer will
// be opened before any attempt to write
Expand Down Expand Up @@ -212,7 +213,7 @@ class DatasetWriterFileQueue {
// is a 1-task FIFO we know this task will run at the very end and can
// add it now.
scheduler_->AddSimpleTask([this] { return DoFinish(); });
scheduler_->End();
scheduler_.reset();
return Status::OK();
}

Expand Down Expand Up @@ -247,7 +248,7 @@ class DatasetWriterFileQueue {
// point they are merged together and added to write_queue_
std::deque<std::shared_ptr<RecordBatch>> staged_batches_;
uint64_t rows_currently_staged_ = 0;
util::AsyncTaskScheduler* scheduler_ = nullptr;
std::shared_ptr<util::AsyncTaskScheduler> scheduler_ = nullptr;
};

struct WriteTask {
Expand Down Expand Up @@ -323,17 +324,17 @@ class DatasetWriterDirectoryQueue {
util::AsyncTaskScheduler::MakeThrottle(1);
util::AsyncTaskScheduler::Throttle* throttle_view = throttle.get();
auto file_finish_task = [self = this, file_queue = std::move(file_queue),
throttle = std::move(throttle)]() {
throttle = std::move(throttle)](Status) {
self->writer_state_->open_files_throttle.Release(1);
return Status::OK();
};
util::AsyncTaskScheduler* file_scheduler =
std::shared_ptr<util::AsyncTaskScheduler> file_scheduler =
scheduler_->MakeSubScheduler(std::move(file_finish_task), throttle_view);
if (init_task_) {
file_scheduler->AddSimpleTask(init_task_);
init_task_ = {};
if (init_future_.is_valid()) {
file_scheduler->AddSimpleTask(
[init_future = init_future_]() { return init_future; });
}
file_queue_view->Start(file_scheduler, filename);
file_queue_view->Start(std::move(file_scheduler), filename);
return file_queue_view;
}

Expand All @@ -343,21 +344,33 @@ class DatasetWriterDirectoryQueue {
if (directory_.empty() || !write_options_.create_dir) {
return;
}
init_future_ = Future<>::Make();
auto create_dir_cb = [this] {
return DeferNotOk(write_options_.filesystem->io_context().executor()->Submit(
[this]() { return write_options_.filesystem->CreateDir(directory_); }));
};
// We need to notify waiters whether the directory succeeded or failed.
auto notify_waiters_cb = [this] { init_future_.MarkFinished(); };
auto notify_waiters_on_err_cb = [this](const Status& err) {
init_future_.MarkFinished();
return err;
};
std::function<Future<>()> init_task;
if (write_options_.existing_data_behavior ==
ExistingDataBehavior::kDeleteMatchingPartitions) {
init_task_ = [this, create_dir_cb] {
init_task = [this, create_dir_cb, notify_waiters_cb, notify_waiters_on_err_cb] {
return write_options_.filesystem
->DeleteDirContentsAsync(directory_,
/*missing_dir_ok=*/true)
.Then(create_dir_cb);
.Then(create_dir_cb)
.Then(notify_waiters_cb, notify_waiters_on_err_cb);
};
} else {
init_task_ = [create_dir_cb] { return create_dir_cb(); };
init_task = [create_dir_cb, notify_waiters_cb, notify_waiters_on_err_cb] {
return create_dir_cb().Then(notify_waiters_cb, notify_waiters_on_err_cb);
};
}
scheduler_->AddSimpleTask(std::move(init_task));
}

static Result<std::unique_ptr<DatasetWriterDirectoryQueue>> Make(
Expand Down Expand Up @@ -388,7 +401,7 @@ class DatasetWriterDirectoryQueue {
std::shared_ptr<Schema> schema_;
const FileSystemDatasetWriteOptions& write_options_;
DatasetWriterState* writer_state_;
std::function<Future<>()> init_task_;
Future<> init_future_;
std::string current_filename_;
DatasetWriterFileQueue* latest_open_file_ = nullptr;
uint64_t rows_written_ = 0;
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/dataset/dataset_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,18 @@ TEST_F(DatasetWriterTestFixture, BasicFileDirectoryPrefix) {
AssertFilesCreated({"testdir/a/1_chunk-0.arrow"});
}

TEST_F(DatasetWriterTestFixture, DirectoryCreateFails) {
// This should fail to be created
write_options_.base_dir = "///doesnotexist";
EXPECT_OK_AND_ASSIGN(auto dataset_writer,
DatasetWriter::Make(write_options_, scheduler_.get()));
Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "a", "1_");
AssertFinished(queue_fut);
ASSERT_OK(dataset_writer->Finish());
scheduler_->End();
ASSERT_FINISHES_AND_RAISES(Invalid, scheduler_->OnFinished());
}

TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
write_options_.max_rows_per_file = 10;
write_options_.max_rows_per_group = 10;
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ class TeeNode : public compute::MapNode {
util::AsyncTaskScheduler::MakeThrottle(1);
util::AsyncTaskScheduler::Throttle* serial_throttle_view = serial_throttle.get();
serial_scheduler_ = plan_->async_scheduler()->MakeSubScheduler(
[owned_throttle = std::move(serial_throttle)]() { return Status::OK(); },
[owned_throttle = std::move(serial_throttle)](Status) { return Status::OK(); },
serial_throttle_view);
}

Expand Down Expand Up @@ -519,7 +519,7 @@ class TeeNode : public compute::MapNode {
MapNode::Finish(std::move(writer_finish_st));
return;
}
serial_scheduler_->End();
serial_scheduler_.reset();
MapNode::Finish(Status::OK());
}

Expand Down Expand Up @@ -581,7 +581,7 @@ class TeeNode : public compute::MapNode {
// We use a serial scheduler to submit tasks to the dataset writer. The dataset writer
// only returns an unfinished future when it needs backpressure. Using a serial
// scheduler here ensures we pause while we wait for backpressure to clear
util::AsyncTaskScheduler* serial_scheduler_;
std::shared_ptr<util::AsyncTaskScheduler> serial_scheduler_;
int32_t backpressure_counter_ = 0;
};

Expand Down
31 changes: 21 additions & 10 deletions cpp/src/arrow/dataset/scan_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,22 @@ class ScanNode : public cp::ExecNode {
ScanState* state_view = scan_state.get();
// Finish callback keeps the scan state alive until all scan tasks done
struct StateHolder {
Status operator()() { return Status::OK(); }
Status operator()(Status) { return Status::OK(); }
std::unique_ptr<ScanState> scan_state;
};
util::AsyncTaskScheduler* frag_scheduler = scan_scheduler->MakeSubScheduler(
StateHolder{std::move(scan_state)}, node->batches_throttle_.get());
std::shared_ptr<util::AsyncTaskScheduler> frag_scheduler =
scan_scheduler->MakeSubScheduler(StateHolder{std::move(scan_state)},
node->batches_throttle_.get());
for (int i = 0; i < fragment_scanner->NumBatches(); i++) {
node->num_batches_.fetch_add(1);
frag_scheduler->AddTask(std::make_unique<ScanBatchTask>(node, state_view, i));
}
Future<> list_and_scan_node = frag_scheduler->OnFinished();
frag_scheduler->End();
// The "list fragments" task doesn't actually end until the fragments are
// all scanned. This allows us to enforce fragment readahead.
if (--node->list_tasks_ == 0) {
node->scan_scheduler_.reset();
}
return list_and_scan_node;
}

Expand Down Expand Up @@ -313,21 +316,24 @@ class ScanNode : public cp::ExecNode {
END_SPAN_ON_FUTURE_COMPLETION(span_, finished_);
AsyncGenerator<std::shared_ptr<Fragment>> frag_gen =
GetFragments(options_.dataset.get(), options_.filter);
util::AsyncTaskScheduler* scan_scheduler = plan_->async_scheduler()->MakeSubScheduler(
[this]() {
scan_scheduler_ = plan_->async_scheduler()->MakeSubScheduler(
[this](Status st) {
outputs_[0]->InputFinished(this, num_batches_.load());
finished_.MarkFinished();
return Status::OK();
},
fragments_throttle_.get());
plan_->async_scheduler()->AddAsyncGenerator<std::shared_ptr<Fragment>>(
std::move(frag_gen),
[this, scan_scheduler](const std::shared_ptr<Fragment>& fragment) {
scan_scheduler->AddTask(std::make_unique<ListFragmentTask>(this, fragment));
[this](const std::shared_ptr<Fragment>& fragment) {
list_tasks_++;
scan_scheduler_->AddTask(std::make_unique<ListFragmentTask>(this, fragment));
return Status::OK();
},
[scan_scheduler]() {
scan_scheduler->End();
[this](Status) {
if (--list_tasks_ == 0) {
scan_scheduler_.reset();
}
return Status::OK();
});
return Status::OK();
Expand All @@ -351,6 +357,11 @@ class ScanNode : public cp::ExecNode {
private:
ScanV2Options options_;
std::atomic<int> num_batches_{0};
// TODO(ARROW-17509) list_tasks_, and scan_scheduler_ are just
// needed to figure out when to end scan_scheduler_. In the future, we should not need
// to call end and these variables can go away.
std::atomic<int> list_tasks_{1};
std::shared_ptr<util::AsyncTaskScheduler> scan_scheduler_;
std::unique_ptr<util::AsyncTaskScheduler::Throttle> fragments_throttle_;
std::unique_ptr<util::AsyncTaskScheduler::Throttle> batches_throttle_;
};
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/util/async_generator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ TEST_P(MergedGeneratorTestFixture, MergedLimitedSubscriptions) {
AssertGeneratorExhausted(merged);
}

#ifndef ARROW_VALGRIND
TEST_P(MergedGeneratorTestFixture, MergedStress) {
constexpr int NGENERATORS = 10;
constexpr int NITEMS = 10;
Expand Down Expand Up @@ -739,6 +740,7 @@ TEST_P(MergedGeneratorTestFixture, MergedParallelStress) {
ASSERT_EQ(NITEMS * NGENERATORS, items.size());
}
}
#endif

TEST_P(MergedGeneratorTestFixture, MergedRecursion) {
// Regression test for an edge case in MergedGenerator. Ensure if
Expand Down
Loading

0 comments on commit 704536b

Please sign in to comment.