jtanx opened a new issue, #46227: URL: https://github.com/apache/arrow/issues/46227
### Describe the bug, including details regarding any error messages, version, and platform. We have been intermittently noticing deadlocks in a fairly simple graph that uses the aggregation node. We've had difficulty in reliably reproducing the deadlock (where the main thread ends up indefinitely waiting on the exec plan's finished future while all the other threads are idle). In trying to make a smaller test case, I have noticed what appears to be some pretty significant threading issues in the implementation of the GroupByNode. Referring to the attached test case built with arrow at this revision: 18e8f50729c1f7e615f9647f2c4501efef0e7b26: [test.zip](https://github.com/user-attachments/files/19905554/test.zip) This performs the following aggregation: ```c++ ac::TableSourceNodeOptions srcOpts(tb); ac::AggregateNodeOptions aggOpts( {{"hash_min", nullptr, "avg_d_kbps", "min_avg_d_kbps"}}, {"year", "devices"}); auto decl = ac::Declaration::Sequence({ {"table_source", std::move(srcOpts)}, {"aggregate", std::move(aggOpts)}, }); `` Running this under helgrind with `valgrind --log-file=helgrind.log --tool=helgrind --history-backtrace-size=100 ./testit`, it reports the following trace (among many others): ``` ==42024== ---------------------------------------------------------------- ==42024== ==42024== Possible data race during read of size 8 at 0x57DEA30 by thread #4 ==42024== Locks held: none ==42024== at 0x406E71: _M_ptr (unique_ptr.h:199) ==42024== by 0x406E71: get (unique_ptr.h:470) ==42024== by 0x406E71: operator bool (unique_ptr.h:487) ==42024== by 0x406E71: arrow::acero::aggregate::GroupByNode::OutputResult(bool) (groupby_aggregate_node.cc:342) ==42024== by 0x4073E7: arrow::acero::aggregate::GroupByNode::InputFinished(arrow::acero::ExecNode*, int) (groupby_aggregate_node.cc:396) ==42024== by 0x38CEAF: operator() (source_node.cc:226) ==42024== by 0x38CEAF: __invoke_impl<arrow::Status, arrow::acero::(anonymous namespace)::SourceNode::StartProducing()::<lambda(arrow::Result<int>)> mutable::<lambda()>&> (invoke.h:61) ==42024== by 0x38CEAF: __invoke_r<arrow::Status, arrow::acero::(anonymous namespace)::SourceNode::StartProducing()::<lambda(arrow::Result<int>)> mutable::<lambda()>&> (invoke.h:116) ==42024== by 0x38CEAF: std::_Function_handler<arrow::Status (), arrow::acero::(anonymous namespace)::SourceNode::StartProducing()::{lambda(arrow::Result<int>)#1}::operator()(arrow::Result<int>)::{lambda()#1}>::_M_invoke(std::_Any_data const&) (std_function.h:291) ==42024== by 0x3730AF: operator() (std_function.h:591) ==42024== by 0x3730AF: operator()<std::function<arrow::Status()>&> (future.h:150) ==42024== by 0x3730AF: __invoke_impl<void, arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status()>&> (invoke.h:61) ==42024== by 0x3730AF: __invoke<arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status()>&> (invoke.h:96) ==42024== by 0x3730AF: __call<void, 0, 1> (functional:506) ==42024== by 0x3730AF: operator()<> (functional:591) ==42024== by 0x3730AF: arrow::internal::FnOnce<void ()>::FnImpl<std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)> >::invoke() (functional.h:152) ==42024== by 0x14B8F6C: operator() (functional.h:140) ==42024== by 0x14B8F6C: WorkerLoop (thread_pool.cc:478) ==42024== by 0x14B8F6C: operator() (thread_pool.cc:643) ==42024== by 0x14B8F6C: __invoke_impl<void, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > (invoke.h:61) ==42024== by 0x14B8F6C: __invoke<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > (invoke.h:96) ==42024== by 0x14B8F6C: _M_invoke<0> (std_thread.h:292) ==42024== by 0x14B8F6C: operator() (std_thread.h:299) ==42024== by 0x14B8F6C: std::thread::_State_impl<std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::{lambda()#1}> > >::_M_run() (std_thread.h:244) ==42024== by 0x4B69DB3: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.33) ==42024== by 0x4854B7A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==42024== by 0x4EAEAA3: start_thread (pthread_create.c:447) ==42024== by 0x4F3BA33: clone (clone.S:100) ==42024== ==42024== This conflicts with a previous write of size 8 by thread #3 ==42024== Locks held: none ==42024== at 0x405C8A: reset (unique_ptr.h:209) ==42024== by 0x405C8A: operator= (unique_ptr.h:191) ==42024== by 0x405C8A: operator= (unique_ptr.h:243) ==42024== by 0x405C8A: operator= (unique_ptr.h:414) ==42024== by 0x405C8A: arrow::acero::aggregate::GroupByNode::InitLocalStateIfNeeded(arrow::acero::aggregate::GroupByNode::ThreadLocalState*) (groupby_aggregate_node.cc:428) ==42024== by 0x407C45: arrow::acero::aggregate::GroupByNode::Consume(arrow::compute::ExecSpan) (groupby_aggregate_node.cc:218) ==42024== by 0x408A2C: operator() (groupby_aggregate_node.cc:376) ==42024== by 0x408A2C: HandleSegments<arrow::acero::aggregate::GroupByNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch)::<lambda(const arrow::compute::ExecBatch&, const arrow::compute::Segment&)> > (aggregate_internal.h:139) ==42024== by 0x408A2C: arrow::acero::aggregate::GroupByNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch) (groupby_aggregate_node.cc:382) ==42024== by 0x3943D5: operator() (source_node.cc:158) ==42024== by 0x3943D5: __invoke_impl<arrow::Status, arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(const arrow::compute::ExecBatch&)::<lambda()>&> (invoke.h:61) ==42024== by 0x3943D5: __invoke_r<arrow::Status, arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(const arrow::compute::ExecBatch&)::<lambda()>&> (invoke.h:116) ==42024== by 0x3943D5: std::_Function_handler<arrow::Status (), arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::{lambda()#1}>::_M_invoke(std::_Any_data const&) (std_function.h:291) ==42024== by 0x3730AF: operator() (std_function.h:591) ==42024== by 0x3730AF: operator()<std::function<arrow::Status()>&> (future.h:150) ==42024== by 0x3730AF: __invoke_impl<void, arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status()>&> (invoke.h:61) ==42024== by 0x3730AF: __invoke<arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status()>&> (invoke.h:96) ==42024== by 0x3730AF: __call<void, 0, 1> (functional:506) ==42024== by 0x3730AF: operator()<> (functional:591) ==42024== by 0x3730AF: arrow::internal::FnOnce<void ()>::FnImpl<std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)> >::invoke() (functional.h:152) ==42024== by 0x14B8F6C: operator() (functional.h:140) ==42024== by 0x14B8F6C: WorkerLoop (thread_pool.cc:478) ==42024== by 0x14B8F6C: operator() (thread_pool.cc:643) ==42024== by 0x14B8F6C: __invoke_impl<void, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > (invoke.h:61) ==42024== by 0x14B8F6C: __invoke<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > (invoke.h:96) ==42024== by 0x14B8F6C: _M_invoke<0> (std_thread.h:292) ==42024== by 0x14B8F6C: operator() (std_thread.h:299) ==42024== by 0x14B8F6C: std::thread::_State_impl<std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::{lambda()#1}> > >::_M_run() (std_thread.h:244) ==42024== by 0x4B69DB3: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.33) ==42024== by 0x4854B7A: ??? (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==42024== by 0x4EAEAA3: start_thread (pthread_create.c:447) ==42024== by 0x4F3BA33: clone (clone.S:100) ==42024== Address 0x57dea30 is 0 bytes inside a block of size 352 alloc'd ==42024== at 0x4849023: operator new(unsigned long) (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so) ==42024== by 0x40CF76: allocate (new_allocator.h:151) ==42024== by 0x40CF76: allocate (alloc_traits.h:482) ==42024== by 0x40CF76: _M_allocate (stl_vector.h:381) ==42024== by 0x40CF76: _M_allocate (stl_vector.h:378) ==42024== by 0x40CF76: std::vector<arrow::acero::aggregate::GroupByNode::ThreadLocalState, std::allocator<arrow::acero::aggregate::GroupByNode::ThreadLocalState> >::_M_default_append(unsigned long) (vector.tcc:663) ==42024== by 0x40D1D2: resize (stl_vector.h:1016) ==42024== by 0x40D1D2: arrow::acero::aggregate::GroupByNode::StartProducing() (aggregate_internal.h:289) ==42024== by 0x32B8EA: operator() (exec_plan.cc:175) ==42024== by 0x32B8EA: arrow::internal::FnOnce<arrow::Status (arrow::util::AsyncTaskScheduler*)>::FnImpl<arrow::acero::(anonymous namespace)::ExecPlanImpl::StartProducing()::{lambda(arrow::util::AsyncTaskScheduler*)#1}>::invoke(arrow::util::AsyncTaskScheduler*&&) (functional.h:152) ==42024== by 0x143A479: operator() (functional.h:140) ==42024== by 0x143A479: arrow::util::AsyncTaskScheduler::Make(arrow::internal::FnOnce<arrow::Status (arrow::util::AsyncTaskScheduler*)>, arrow::internal::FnOnce<void (arrow::Status const&)>, arrow::StopToken) (async_util.cc:471) ==42024== by 0x329640: arrow::acero::(anonymous namespace)::ExecPlanImpl::StartProducing() (exec_plan.cc:193) ==42024== by 0x332AAB: StartProducing (exec_plan.cc:439) ==42024== by 0x332AAB: arrow::acero::(anonymous namespace)::DeclarationToTableImpl(arrow::acero::Declaration, arrow::acero::QueryOptions, arrow::internal::Executor*) (exec_plan.cc:662) ==42024== by 0x333433: operator() (exec_plan.cc:787) ==42024== by 0x333433: arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>::FnImpl<arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*)::{lambda(arrow::internal::Executor*)#1}>::invoke(arrow::internal::Executor*&&) (functional.h:152) ==42024== by 0x342B55: operator() (functional.h:140) ==42024== by 0x342B55: arrow::Future<std::shared_ptr<arrow::Table> >::SyncType arrow::internal::RunSynchronously<arrow::Future<std::shared_ptr<arrow::Table> >, std::shared_ptr<arrow::Table> >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>, bool) (thread_pool.h:587) ==42024== by 0x334F63: arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*) (exec_plan.cc:789) ==42024== by 0x320A8F: RunTest() (testit.cc:32) ==42024== by 0x30B824: main (testit.cc:39) ==42024== Block was alloc'd by thread #1 ==42024== ==42024== ---------------------------------------------------------------- ``` Inspecting how [`InputRecieved`](https://github.com/apache/arrow/blob/4d566e60c5a12f047e1c0f36b9733a56513a7d43/cpp/src/arrow/acero/groupby_aggregate_node.cc#L391) and [`InputFinished`](https://github.com/apache/arrow/blob/4d566e60c5a12f047e1c0f36b9733a56513a7d43/cpp/src/arrow/acero/groupby_aggregate_node.cc#L401) are implemented in the groupby node, along with what the source node does - it uses the task scheduler to call [`InputReceived()` on the downstream node](https://github.com/apache/arrow/blob/4d566e60c5a12f047e1c0f36b9733a56513a7d43/cpp/src/arrow/acero/source_node.cc#L134) and similarly so with [`InputFinished()`](https://github.com/apache/arrow/blob/4d566e60c5a12f047e1c0f36b9733a56513a7d43/cpp/src/arrow/acero/source_node.cc#L224-L227) - I can see how this race can happen. Presumably the task scheduler does not guarantee any sort of ordering over which is called first between InputReceived() or InputFinished() - furthermore they can be called simultaneously off different threads. This implies that both of these methods in the groupby node must be threadsafe - which therefore implies that [OutputResult](https://github.com/apache/arrow/blob/4d566e60c5a12f047e1c0f36b9733a56513a7d43/cpp/src/arrow/acero/groupby_aggregate_node.cc#L346-L354) must also be threadsafe. But the way that `OutputResult` is currently implemented is clearly not - it first modifies the thread-local state (potentially swapping entries; these entries could also potentially be populated during this iteration if the other threads have just spawned) and further merges the thread-local state. I think this explains the above race - InputFinished is being called from one thread while another thread is in the middle of calling InputReceived - and both end up in OutputResult when they should not. This is also causing other issues such as the execution plan to be marked as finished even before the source node has called InputFinished on the aggregation node - which means that valgrind is also observing that InputFinished could be called well after things have been destroyed. So it looks like there is a clear issue here - but I am not familiar enough with the intended execution/threading model to say what the fix should be. Some suggestions here would be appreciated ### Component(s) C++ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org