This is an automated email from the ASF dual-hosted git repository.
BiteTheDDDDt pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e08498eb559 [fix](pipeline) Fix wake up early without terminate call
(#63539)
e08498eb559 is described below
commit e08498eb5592bc04ef4af5340622349019bce543
Author: Pxl <[email protected]>
AuthorDate: Tue May 26 14:30:18 2026 +0800
[fix](pipeline) Fix wake up early without terminate call (#63539)
### What problem does this PR solve?
Issue Number: None
Related PR: #61679
Problem Summary: Backport #61679 to branch-4.0. A pipeline task can race
with downstream early wake-up: one thread may observe `_wake_up_early`
as false, then another thread sets `_wake_up_early` and unblocks finish
dependencies, and the first thread later sees `_is_pending_finish()` as
false and finishes without calling operator `terminate()`. For hash join
build tasks this can leave runtime filter producers in
`WAITING_FOR_SYNCED_SIZE`; during close/build, `insert()` expects
`WAITING_FOR_DATA` and reports an invalid runtime filter producer state.
This change moves operator termination after the pending-finish check so
the second `_wake_up_early` read observes the early wake-up and
terminates operators before close.
### Release note
None
### Check List (For Author)
- Test: Unit Test
- `build-support/check-format.sh be/src/pipeline/pipeline_task.cpp
be/test/pipeline/pipeline_task_test.cpp`
- `ninja -C be/build_Release
src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.o`
- `ninja -C be/ut_build_ASAN
src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.o
test/CMakeFiles/doris_be_test.dir/exec/pipeline/pipeline_task_test.cpp.o
test/doris_be_test`
- `be/ut_build_ASAN/test/doris_be_test
--gtest_filter=PipelineTaskTest.TEST_TERMINATE_RACE_FIX
--gtest_print_time=true`
- Behavior changed: No
- Does this need documentation: No
---
be/src/pipeline/pipeline_task.cpp | 32 +++++++++++--
be/test/pipeline/pipeline_task_test.cpp | 79 +++++++++++++++++++++++++++++++++
2 files changed, 108 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index dbe52cbabc0..afd40596241 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -409,15 +409,41 @@ Status PipelineTask::execute(bool* done) {
// If task is woke up early, we should terminate all operators, and
this task could be closed immediately.
if (_wake_up_early) {
- terminate();
- THROW_IF_ERROR(_root->terminate(_state));
- THROW_IF_ERROR(_sink->terminate(_state));
_eos = true;
*done = true;
} else if (_eos && !_spilling &&
(fragment_context->is_canceled() || !_is_pending_finish()))
{
+ // Debug point for testing the race condition fix: inject
set_wake_up_early() +
+ // terminate() here to simulate Thread B writing A then B between
Thread A's two
+ // reads of _wake_up_early.
+ DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
+ set_wake_up_early();
+ terminate();
+ });
*done = true;
}
+
+ // NOTE: The terminate() call is intentionally placed AFTER the
_is_pending_finish() check
+ // above, not before. This ordering is critical to avoid a race
condition:
+ //
+ // Pipeline::make_all_runnable() writes in this order:
+ // (A) set_wake_up_early() -> (B) terminate() [sets
finish_dep._always_ready]
+ //
+ // If we checked _wake_up_early (A) before _is_pending_finish() (B),
there would be a
+ // window where Thread A reads _wake_up_early=false, then Thread B
writes both A and B,
+ // then Thread A reads _is_pending_finish()=false (due to
_always_ready). Thread A would
+ // then set *done=true without ever calling operator terminate(),
causing close() to run
+ // on operators that were never properly terminated (e.g.
RuntimeFilterProducer still in
+ // WAITING_FOR_SYNCED_SIZE state when insert() is called).
+ //
+ // By reading _is_pending_finish() (B) before the second read of
_wake_up_early (A),
+ // if Thread A observes B's effect (_always_ready=true), it is
guaranteed to also observe
+ // A's effect (_wake_up_early=true) on this second read, ensuring
terminate() is called.
+ if (_wake_up_early) {
+ terminate();
+ THROW_IF_ERROR(_root->terminate(_state));
+ THROW_IF_ERROR(_sink->terminate(_state));
+ }
}};
const auto query_id = _state->query_id();
// If this task is already EOS and block is empty (which means we already
output all blocks),
diff --git a/be/test/pipeline/pipeline_task_test.cpp
b/be/test/pipeline/pipeline_task_test.cpp
index 8a8e16e3c4d..a629344363a 100644
--- a/be/test/pipeline/pipeline_task_test.cpp
+++ b/be/test/pipeline/pipeline_task_test.cpp
@@ -18,6 +18,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "common/config.h"
#include "common/status.h"
#include "dummy_task_queue.h"
#include "pipeline/dependency.h"
@@ -30,6 +31,7 @@
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
#include "testutil/mock/mock_workload_group_mgr.h"
#include "thrift_builder.h"
+#include "util/debug_points.h"
namespace doris::pipeline {
@@ -1186,4 +1188,81 @@ TEST_F(PipelineTaskTest, TEST_INJECT_SHARED_STATE) {
}
}
+// Test for the race condition fix between _wake_up_early and
_is_pending_finish().
+//
+// The race: Pipeline::make_all_runnable() writes in order (A)
set_wake_up_early -> (B) terminate()
+// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A
reads _wake_up_early=false
+// (A), then Thread B writes A and B, then Thread A reads
_is_pending_finish()=false (due to
+// _always_ready from B), Thread A would set *done=true without calling
operator terminate().
+//
+// The fix: terminate() is called after _is_pending_finish() in the Defer. So
if Thread A sees B's
+// effect (_always_ready=true), it must also see A's effect
(_wake_up_early=true) on the subsequent
+// read, ensuring terminate() is always called.
+//
+// This test uses a debug point injected into the else-if branch to simulate
the exact bad timing:
+// the debug point fires set_wake_up_early() + terminate() after
_is_pending_finish() returns false
+// (due to finish_dep being naturally unblocked) but before the second
_wake_up_early check.
+TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ {
+ OperatorPtr source_op;
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+ }
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ task->_exec_time_slice = 10'000'000'000ULL;
+ {
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ }
+ _query_ctx->get_execution_dependency()->set_ready();
+
+ auto* sink_finish_dep =
+
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
+ EXPECT_NE(sink_finish_dep, nullptr);
+ sink_finish_dep->block();
+
+ task->_operators.front()->cast<DummyOperator>()._eos = true;
+ {
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_TRUE(task->_eos);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ }
+
+ sink_finish_dep->set_ready();
+ config::enable_debug_points = true;
+
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
+ {
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_TRUE(task->_eos);
+ EXPECT_TRUE(done);
+ EXPECT_TRUE(task->_wake_up_early);
+
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
+ EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
+ }
+ DebugPoints::instance()->clear();
+ config::enable_debug_points = false;
+}
+
} // namespace doris::pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]