jtanx opened a new issue, #46227:
URL: https://github.com/apache/arrow/issues/46227

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   We have been intermittently noticing deadlocks in a fairly simple graph that 
uses the aggregation node. We've had difficulty in reliably reproducing the 
deadlock (where the main thread ends up indefinitely waiting on the exec plan's 
finished future while all the other threads are idle).
   
   In trying to make a smaller test case, I have noticed what appears to be 
some pretty significant threading issues in the implementation of the 
GroupByNode.
   
   Referring to the attached test case built with arrow at this revision: 
18e8f50729c1f7e615f9647f2c4501efef0e7b26:
   [test.zip](https://github.com/user-attachments/files/19905554/test.zip)
   
   This performs the following aggregation:
   
   ```c++
   ac::TableSourceNodeOptions srcOpts(tb);
     ac::AggregateNodeOptions aggOpts(
         {{"hash_min", nullptr, "avg_d_kbps", "min_avg_d_kbps"}}, {"year", 
"devices"});
   
     auto decl = ac::Declaration::Sequence({
         {"table_source", std::move(srcOpts)},
         {"aggregate", std::move(aggOpts)},
     });
   ``
   
   Running this under helgrind with `valgrind --log-file=helgrind.log 
--tool=helgrind --history-backtrace-size=100 ./testit`, it reports the 
following trace (among many others):
   
   ```
   ==42024== ----------------------------------------------------------------
   ==42024== 
   ==42024== Possible data race during read of size 8 at 0x57DEA30 by thread #4
   ==42024== Locks held: none
   ==42024==    at 0x406E71: _M_ptr (unique_ptr.h:199)
   ==42024==    by 0x406E71: get (unique_ptr.h:470)
   ==42024==    by 0x406E71: operator bool (unique_ptr.h:487)
   ==42024==    by 0x406E71: 
arrow::acero::aggregate::GroupByNode::OutputResult(bool) 
(groupby_aggregate_node.cc:342)
   ==42024==    by 0x4073E7: 
arrow::acero::aggregate::GroupByNode::InputFinished(arrow::acero::ExecNode*, 
int) (groupby_aggregate_node.cc:396)
   ==42024==    by 0x38CEAF: operator() (source_node.cc:226)
   ==42024==    by 0x38CEAF: __invoke_impl<arrow::Status, 
arrow::acero::(anonymous 
namespace)::SourceNode::StartProducing()::<lambda(arrow::Result<int>)> 
mutable::<lambda()>&> (invoke.h:61)
   ==42024==    by 0x38CEAF: __invoke_r<arrow::Status, arrow::acero::(anonymous 
namespace)::SourceNode::StartProducing()::<lambda(arrow::Result<int>)> 
mutable::<lambda()>&> (invoke.h:116)
   ==42024==    by 0x38CEAF: std::_Function_handler<arrow::Status (), 
arrow::acero::(anonymous 
namespace)::SourceNode::StartProducing()::{lambda(arrow::Result<int>)#1}::operator()(arrow::Result<int>)::{lambda()#1}>::_M_invoke(std::_Any_data
 const&) (std_function.h:291)
   ==42024==    by 0x3730AF: operator() (std_function.h:591)
   ==42024==    by 0x3730AF: operator()<std::function<arrow::Status()>&> 
(future.h:150)
   ==42024==    by 0x3730AF: __invoke_impl<void, 
arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, 
std::function<arrow::Status()>&> (invoke.h:61)
   ==42024==    by 0x3730AF: __invoke<arrow::detail::ContinueFuture&, 
arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status()>&> 
(invoke.h:96)
   ==42024==    by 0x3730AF: __call<void, 0, 1> (functional:506)
   ==42024==    by 0x3730AF: operator()<> (functional:591)
   ==42024==    by 0x3730AF: arrow::internal::FnOnce<void 
()>::FnImpl<std::_Bind<arrow::detail::ContinueFuture 
(arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)> 
>::invoke() (functional.h:152)
   ==42024==    by 0x14B8F6C: operator() (functional.h:140)
   ==42024==    by 0x14B8F6C: WorkerLoop (thread_pool.cc:478)
   ==42024==    by 0x14B8F6C: operator() (thread_pool.cc:643)
   ==42024==    by 0x14B8F6C: __invoke_impl<void, 
arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > 
(invoke.h:61)
   ==42024==    by 0x14B8F6C: 
__invoke<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > 
(invoke.h:96)
   ==42024==    by 0x14B8F6C: _M_invoke<0> (std_thread.h:292)
   ==42024==    by 0x14B8F6C: operator() (std_thread.h:299)
   ==42024==    by 0x14B8F6C: 
std::thread::_State_impl<std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::{lambda()#1}>
 > >::_M_run() (std_thread.h:244)
   ==42024==    by 0x4B69DB3: ??? (in 
/usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.33)
   ==42024==    by 0x4854B7A: ??? (in 
/usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
   ==42024==    by 0x4EAEAA3: start_thread (pthread_create.c:447)
   ==42024==    by 0x4F3BA33: clone (clone.S:100)
   ==42024== 
   ==42024== This conflicts with a previous write of size 8 by thread #3
   ==42024== Locks held: none
   ==42024==    at 0x405C8A: reset (unique_ptr.h:209)
   ==42024==    by 0x405C8A: operator= (unique_ptr.h:191)
   ==42024==    by 0x405C8A: operator= (unique_ptr.h:243)
   ==42024==    by 0x405C8A: operator= (unique_ptr.h:414)
   ==42024==    by 0x405C8A: 
arrow::acero::aggregate::GroupByNode::InitLocalStateIfNeeded(arrow::acero::aggregate::GroupByNode::ThreadLocalState*)
 (groupby_aggregate_node.cc:428)
   ==42024==    by 0x407C45: 
arrow::acero::aggregate::GroupByNode::Consume(arrow::compute::ExecSpan) 
(groupby_aggregate_node.cc:218)
   ==42024==    by 0x408A2C: operator() (groupby_aggregate_node.cc:376)
   ==42024==    by 0x408A2C: 
HandleSegments<arrow::acero::aggregate::GroupByNode::InputReceived(arrow::acero::ExecNode*,
 arrow::compute::ExecBatch)::<lambda(const arrow::compute::ExecBatch&, const 
arrow::compute::Segment&)> > (aggregate_internal.h:139)
   ==42024==    by 0x408A2C: 
arrow::acero::aggregate::GroupByNode::InputReceived(arrow::acero::ExecNode*, 
arrow::compute::ExecBatch) (groupby_aggregate_node.cc:382)
   ==42024==    by 0x3943D5: operator() (source_node.cc:158)
   ==42024==    by 0x3943D5: __invoke_impl<arrow::Status, 
arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(const 
arrow::compute::ExecBatch&)::<lambda()>&> (invoke.h:61)
   ==42024==    by 0x3943D5: __invoke_r<arrow::Status, arrow::acero::(anonymous 
namespace)::SourceNode::SliceAndDeliverMorsel(const 
arrow::compute::ExecBatch&)::<lambda()>&> (invoke.h:116)
   ==42024==    by 0x3943D5: std::_Function_handler<arrow::Status (), 
arrow::acero::(anonymous 
namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch 
const&)::{lambda()#1}>::_M_invoke(std::_Any_data const&) (std_function.h:291)
   ==42024==    by 0x3730AF: operator() (std_function.h:591)
   ==42024==    by 0x3730AF: operator()<std::function<arrow::Status()>&> 
(future.h:150)
   ==42024==    by 0x3730AF: __invoke_impl<void, 
arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, 
std::function<arrow::Status()>&> (invoke.h:61)
   ==42024==    by 0x3730AF: __invoke<arrow::detail::ContinueFuture&, 
arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status()>&> 
(invoke.h:96)
   ==42024==    by 0x3730AF: __call<void, 0, 1> (functional:506)
   ==42024==    by 0x3730AF: operator()<> (functional:591)
   ==42024==    by 0x3730AF: arrow::internal::FnOnce<void 
()>::FnImpl<std::_Bind<arrow::detail::ContinueFuture 
(arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)> 
>::invoke() (functional.h:152)
   ==42024==    by 0x14B8F6C: operator() (functional.h:140)
   ==42024==    by 0x14B8F6C: WorkerLoop (thread_pool.cc:478)
   ==42024==    by 0x14B8F6C: operator() (thread_pool.cc:643)
   ==42024==    by 0x14B8F6C: __invoke_impl<void, 
arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > 
(invoke.h:61)
   ==42024==    by 0x14B8F6C: 
__invoke<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > 
(invoke.h:96)
   ==42024==    by 0x14B8F6C: _M_invoke<0> (std_thread.h:292)
   ==42024==    by 0x14B8F6C: operator() (std_thread.h:299)
   ==42024==    by 0x14B8F6C: 
std::thread::_State_impl<std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::{lambda()#1}>
 > >::_M_run() (std_thread.h:244)
   ==42024==    by 0x4B69DB3: ??? (in 
/usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.33)
   ==42024==    by 0x4854B7A: ??? (in 
/usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
   ==42024==    by 0x4EAEAA3: start_thread (pthread_create.c:447)
   ==42024==    by 0x4F3BA33: clone (clone.S:100)
   ==42024==  Address 0x57dea30 is 0 bytes inside a block of size 352 alloc'd
   ==42024==    at 0x4849023: operator new(unsigned long) (in 
/usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
   ==42024==    by 0x40CF76: allocate (new_allocator.h:151)
   ==42024==    by 0x40CF76: allocate (alloc_traits.h:482)
   ==42024==    by 0x40CF76: _M_allocate (stl_vector.h:381)
   ==42024==    by 0x40CF76: _M_allocate (stl_vector.h:378)
   ==42024==    by 0x40CF76: 
std::vector<arrow::acero::aggregate::GroupByNode::ThreadLocalState, 
std::allocator<arrow::acero::aggregate::GroupByNode::ThreadLocalState> 
>::_M_default_append(unsigned long) (vector.tcc:663)
   ==42024==    by 0x40D1D2: resize (stl_vector.h:1016)
   ==42024==    by 0x40D1D2: 
arrow::acero::aggregate::GroupByNode::StartProducing() 
(aggregate_internal.h:289)
   ==42024==    by 0x32B8EA: operator() (exec_plan.cc:175)
   ==42024==    by 0x32B8EA: arrow::internal::FnOnce<arrow::Status 
(arrow::util::AsyncTaskScheduler*)>::FnImpl<arrow::acero::(anonymous 
namespace)::ExecPlanImpl::StartProducing()::{lambda(arrow::util::AsyncTaskScheduler*)#1}>::invoke(arrow::util::AsyncTaskScheduler*&&)
 (functional.h:152)
   ==42024==    by 0x143A479: operator() (functional.h:140)
   ==42024==    by 0x143A479: 
arrow::util::AsyncTaskScheduler::Make(arrow::internal::FnOnce<arrow::Status 
(arrow::util::AsyncTaskScheduler*)>, arrow::internal::FnOnce<void 
(arrow::Status const&)>, arrow::StopToken) (async_util.cc:471)
   ==42024==    by 0x329640: arrow::acero::(anonymous 
namespace)::ExecPlanImpl::StartProducing() (exec_plan.cc:193)
   ==42024==    by 0x332AAB: StartProducing (exec_plan.cc:439)
   ==42024==    by 0x332AAB: arrow::acero::(anonymous 
namespace)::DeclarationToTableImpl(arrow::acero::Declaration, 
arrow::acero::QueryOptions, arrow::internal::Executor*) (exec_plan.cc:662)
   ==42024==    by 0x333433: operator() (exec_plan.cc:787)
   ==42024==    by 0x333433: 
arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > 
(arrow::internal::Executor*)>::FnImpl<arrow::acero::DeclarationToTable(arrow::acero::Declaration,
 bool, arrow::MemoryPool*, 
arrow::compute::FunctionRegistry*)::{lambda(arrow::internal::Executor*)#1}>::invoke(arrow::internal::Executor*&&)
 (functional.h:152)
   ==42024==    by 0x342B55: operator() (functional.h:140)
   ==42024==    by 0x342B55: arrow::Future<std::shared_ptr<arrow::Table> 
>::SyncType 
arrow::internal::RunSynchronously<arrow::Future<std::shared_ptr<arrow::Table> 
>, std::shared_ptr<arrow::Table> 
>(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > 
(arrow::internal::Executor*)>, bool) (thread_pool.h:587)
   ==42024==    by 0x334F63: 
arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, 
arrow::MemoryPool*, arrow::compute::FunctionRegistry*) (exec_plan.cc:789)
   ==42024==    by 0x320A8F: RunTest() (testit.cc:32)
   ==42024==    by 0x30B824: main (testit.cc:39)
   ==42024==  Block was alloc'd by thread #1
   ==42024== 
   ==42024== ----------------------------------------------------------------
   ```
   
   Inspecting how 
[`InputRecieved`](https://github.com/apache/arrow/blob/4d566e60c5a12f047e1c0f36b9733a56513a7d43/cpp/src/arrow/acero/groupby_aggregate_node.cc#L391)
 and 
[`InputFinished`](https://github.com/apache/arrow/blob/4d566e60c5a12f047e1c0f36b9733a56513a7d43/cpp/src/arrow/acero/groupby_aggregate_node.cc#L401)
 are implemented in the groupby node, along with what the source node does - it 
uses the task scheduler to call [`InputReceived()` on the downstream 
node](https://github.com/apache/arrow/blob/4d566e60c5a12f047e1c0f36b9733a56513a7d43/cpp/src/arrow/acero/source_node.cc#L134)
 and similarly so with 
[`InputFinished()`](https://github.com/apache/arrow/blob/4d566e60c5a12f047e1c0f36b9733a56513a7d43/cpp/src/arrow/acero/source_node.cc#L224-L227)
 - I can see how this race can happen.
   
   Presumably the task scheduler does not guarantee any sort of ordering over 
which is called first between InputReceived() or InputFinished() - furthermore 
they can be called simultaneously off different threads. This implies that both 
of these methods in the groupby node must be threadsafe - which therefore 
implies that 
[OutputResult](https://github.com/apache/arrow/blob/4d566e60c5a12f047e1c0f36b9733a56513a7d43/cpp/src/arrow/acero/groupby_aggregate_node.cc#L346-L354)
 must also be threadsafe. But the way that `OutputResult` is currently 
implemented is clearly not - it first modifies the thread-local state 
(potentially swapping entries; these entries could also potentially be 
populated during this iteration if the other threads have just spawned) and 
further merges the thread-local state. 
   
   I think this explains the above race - InputFinished is being called from 
one thread while another thread is in the middle of calling InputReceived - and 
both end up in OutputResult when they should not. This is also causing other 
issues such as the execution plan to be marked as finished even before the 
source node has called InputFinished on the aggregation node - which means that 
valgrind is also observing that InputFinished could be called well after things 
have been destroyed.
   
   So it looks like there is a clear issue here - but I am not familiar enough 
with the intended execution/threading model to say what the fix should be. Some 
suggestions here would be appreciated
   
   ### Component(s)
   
   C++


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to