This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 17bc0cc7ee6 [fix](spill) inner local state should be initialized in
init phase (#47211)
17bc0cc7ee6 is described below
commit 17bc0cc7ee6e9a436c87ce164f0989d8759d1dd6
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Jan 24 05:17:30 2025 +0800
[fix](spill) inner local state should be initialized in init phase (#47211)
---
.../exec/partitioned_hash_join_sink_operator.cpp | 26 +++++++++++-----------
.../exec/partitioned_hash_join_sink_operator.h | 4 ++--
2 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 635b642cbf3..2df474f42eb 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -61,6 +61,7 @@ Status
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
_in_mem_rows_counter = ADD_COUNTER_WITH_LEVEL(profile(), "SpillInMemRow",
TUnit::UNIT, 1);
_memory_usage_reserved =
ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved",
TUnit::BYTES, 1);
+ RETURN_IF_ERROR(_setup_internal_operator(state));
return Status::OK();
}
@@ -70,7 +71,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState*
state) {
_shared_state->setup_shared_profile(_profile);
RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
- RETURN_IF_ERROR(p._setup_internal_operator(state));
+ _shared_state->inner_runtime_state->set_task(state->get_task());
for (uint32_t i = 0; i != p._partition_count; ++i) {
auto& spilling_stream = _shared_state->spilled_streams[i];
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
@@ -513,9 +514,7 @@ Status PartitionedHashJoinSinkOperatorX::open(RuntimeState*
state) {
return _inner_sink_operator->open(state);
}
-Status
PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState* state)
{
- auto& local_state = get_local_state(state);
-
+Status
PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState*
state) {
auto inner_runtime_state = RuntimeState::create_unique(
state->fragment_instance_id(), state->query_id(),
state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(),
state->get_query_ctx());
@@ -527,29 +526,30 @@ Status
PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState*
inner_runtime_state->resize_op_id_to_local_state(-1);
inner_runtime_state->set_runtime_filter_mgr(state->local_runtime_filter_mgr());
+ auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
auto inner_shared_state = std::dynamic_pointer_cast<HashJoinSharedState>(
- _inner_sink_operator->create_shared_state());
- LocalSinkStateInfo info {
- 0, local_state._internal_runtime_profile.get(), -1,
inner_shared_state.get(), {}, {}};
+ p._inner_sink_operator->create_shared_state());
+ LocalSinkStateInfo info {0, _internal_runtime_profile.get(), -1,
inner_shared_state.get(), {},
+ {}};
-
RETURN_IF_ERROR(_inner_sink_operator->setup_local_state(inner_runtime_state.get(),
info));
+
RETURN_IF_ERROR(p._inner_sink_operator->setup_local_state(inner_runtime_state.get(),
info));
auto* sink_local_state = inner_runtime_state->get_sink_local_state();
DCHECK(sink_local_state != nullptr);
LocalStateInfo state_info {
- local_state._internal_runtime_profile.get(), {},
inner_shared_state.get(), {}, 0};
+ _internal_runtime_profile.get(), {}, inner_shared_state.get(), {},
0};
RETURN_IF_ERROR(
-
_inner_probe_operator->setup_local_state(inner_runtime_state.get(),
state_info));
+
p._inner_probe_operator->setup_local_state(inner_runtime_state.get(),
state_info));
auto* probe_local_state =
-
inner_runtime_state->get_local_state(_inner_probe_operator->operator_id());
+
inner_runtime_state->get_local_state(p._inner_probe_operator->operator_id());
DCHECK(probe_local_state != nullptr);
RETURN_IF_ERROR(probe_local_state->open(state));
RETURN_IF_ERROR(sink_local_state->open(state));
/// Set these two values after all the work is ready.
- local_state._shared_state->inner_shared_state =
std::move(inner_shared_state);
- local_state._shared_state->inner_runtime_state =
std::move(inner_runtime_state);
+ _shared_state->inner_shared_state = std::move(inner_shared_state);
+ _shared_state->inner_runtime_state = std::move(inner_runtime_state);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 73955932427..492c98fa637 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -69,6 +69,8 @@ protected:
Status _finish_spilling();
+ Status _setup_internal_operator(RuntimeState* state);
+
friend class PartitionedHashJoinSinkOperatorX;
bool _child_eos {false};
@@ -140,8 +142,6 @@ public:
private:
friend class PartitionedHashJoinSinkLocalState;
- Status _setup_internal_operator(RuntimeState* state);
-
const TJoinDistributionType::type _join_distribution;
std::vector<TExpr> _build_exprs;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]