This is an automated email from the ASF dual-hosted git repository.

zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 596a60fa00 GH-47124: [C++][Dataset] Fix DatasetWriter deadlock on 
concurrent WriteRecordBatch (#47129)
596a60fa00 is described below

commit 596a60fa00d9d88bbe562602543474e8faf9ea5a
Author: gitmodimo <[email protected]>
AuthorDate: Wed Jul 30 16:40:36 2025 +0200

    GH-47124: [C++][Dataset] Fix DatasetWriter deadlock on concurrent 
WriteRecordBatch (#47129)
    
    ### Rationale for this change
    
    Throttle is accessed twice - once in Acquire and again using future. As a 
result current_value_ may not be increased due to throttle being applied and 
shorty after the returned future may become finished. That leads to issue 
described in #47124
    
https://github.com/apache/arrow/blob/c8fe26898ce49c58514f511be58afddce176826b/cpp/src/arrow/dataset/dataset_writer.cc#L682-L684
    
    ### What changes are included in this PR?
    
    Change throttle API to return optional (akin to 
[ThrottledAsyncTaskScheduler 
::Throttle](https://github.com/gitmodimo/arrow/blob/3ebe7ee1828793d0a619bcd773eb4d990ccb6b3c/cpp/src/arrow/util/async_util.h#L243))
 and prevent race.
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    No
    
    * GitHub Issue: #47124
    
    Lead-authored-by: RafaƂ Hibner <[email protected]>
    Co-authored-by: gitmodimo <[email protected]>
    Co-authored-by: Rossi Sun <[email protected]>
    Signed-off-by: Rossi Sun <[email protected]>
---
 cpp/src/arrow/dataset/dataset_writer.cc      | 18 +++++++------
 cpp/src/arrow/dataset/dataset_writer_test.cc | 38 +++++++++++++++++++++++++++-
 2 files changed, 47 insertions(+), 9 deletions(-)

diff --git a/cpp/src/arrow/dataset/dataset_writer.cc 
b/cpp/src/arrow/dataset/dataset_writer.cc
index a7807e30a6..f5104efb70 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -53,7 +53,7 @@ class Throttle {
 
   bool Unthrottled() const { return max_value_ <= 0; }
 
-  Future<> Acquire(uint64_t values) {
+  std::optional<Future<>> Acquire(uint64_t values) {
     if (Unthrottled()) {
       return Future<>::MakeFinished();
     }
@@ -61,10 +61,11 @@ class Throttle {
     if (current_value_ >= max_value_) {
       in_waiting_ = values;
       backpressure_ = Future<>::Make();
-    } else {
-      current_value_ += values;
+      return backpressure_;
     }
-    return backpressure_;
+    current_value_ += values;
+    DCHECK(backpressure_.is_finished());
+    return std::nullopt;
   }
 
   void Release(uint64_t values) {
@@ -662,7 +663,7 @@ class DatasetWriter::DatasetWriterImpl {
                                                        directory, prefix);
             }));
     std::shared_ptr<DatasetWriterDirectoryQueue> dir_queue = 
dir_queue_itr->second;
-    Future<> backpressure;
+    std::optional<Future<>> backpressure;
     while (batch) {
       // Keep opening new files until batch is done.
       std::shared_ptr<RecordBatch> remainder;
@@ -681,13 +682,13 @@ class DatasetWriter::DatasetWriterImpl {
       }
       backpressure =
           
writer_state_->rows_in_flight_throttle.Acquire(next_chunk->num_rows());
-      if (!backpressure.is_finished()) {
+      if (backpressure) {
         
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued");
         break;
       }
       if (will_open_file) {
         backpressure = writer_state_->open_files_throttle.Acquire(1);
-        if (!backpressure.is_finished()) {
+        if (backpressure) {
           
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
           
writer_state_->rows_in_flight_throttle.Release(next_chunk->num_rows());
           RETURN_NOT_OK(TryCloseLargestFile());
@@ -711,7 +712,8 @@ class DatasetWriter::DatasetWriterImpl {
     }
 
     if (batch) {
-      return backpressure.Then([this, batch, directory, prefix] {
+      DCHECK(backpressure);
+      return backpressure->Then([this, batch, directory, prefix] {
         return DoWriteRecordBatch(batch, directory, prefix);
       });
     }
diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc 
b/cpp/src/arrow/dataset/dataset_writer_test.cc
index cbc3d25fbb..2f34c21aec 100644
--- a/cpp/src/arrow/dataset/dataset_writer_test.cc
+++ b/cpp/src/arrow/dataset/dataset_writer_test.cc
@@ -20,6 +20,7 @@
 #include <chrono>
 #include <mutex>
 #include <optional>
+#include <thread>
 #include <vector>
 
 #include "arrow/array/builder_primitive.h"
@@ -232,7 +233,7 @@ class DatasetWriterTestFixture : public testing::Test {
   util::AsyncTaskScheduler* scheduler_;
   Future<> scheduler_finished_;
   FileSystemDatasetWriteOptions write_options_;
-  bool paused_{false};
+  std::atomic_bool paused_{false};
   uint64_t counter_ = 0;
 };
 
@@ -275,6 +276,41 @@ TEST_F(DatasetWriterTestFixture, 
BatchGreaterThanMaxRowsQueued) {
   ASSERT_EQ(paused_, false);
 }
 
+TEST_F(DatasetWriterTestFixture, BatchWriteConcurrent) {
+#ifndef ARROW_ENABLE_THREADING
+  GTEST_SKIP() << "Test requires threading support";
+#endif
+  auto dataset_writer = MakeDatasetWriter(/*max_rows=*/5);
+
+  for (int threads = 1; threads < 5; threads++) {
+    for (int iter = 2; iter <= 256; iter *= 2) {
+      for (int batch = 2; batch <= 64; batch *= 2) {
+        std::vector<std::thread> workers;
+        for (int i = 0; i < threads; ++i) {
+          workers.push_back(std::thread([&, i = i]() {
+            for (int j = 0; j < iter; ++j) {
+              while (paused_) {
+                SleepABit();
+              }
+              dataset_writer->WriteRecordBatch(MakeBatch(0, batch + i + 10 * 
j), "");
+            }
+          }));
+        }
+        for (std::thread& t : workers) {
+          if (t.joinable()) {
+            t.join();
+          }
+          while (paused_) {
+            SleepABit();
+          }
+        }
+      }
+    }
+  }
+  EndWriterChecked(dataset_writer.get());
+  ASSERT_EQ(paused_, false);
+}
+
 TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
   write_options_.max_rows_per_file = 10;
   write_options_.max_rows_per_group = 10;

Reply via email to