This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5571b1f93fb [improvement](spill) optimize the spilling logic of hash 
join operator (#32202)
5571b1f93fb is described below

commit 5571b1f93fbd342bc8fd3accda3f18048efaf4d2
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed Mar 20 08:20:40 2024 +0800

    [improvement](spill) optimize the spilling logic of hash join operator 
(#32202)
---
 .../exec/partitioned_hash_join_probe_operator.cpp  | 86 +++++++++++++++++++---
 .../exec/partitioned_hash_join_probe_operator.h    |  8 ++
 .../exec/partitioned_hash_join_sink_operator.cpp   | 33 ++++++++-
 .../exec/partitioned_hash_join_sink_operator.h     |  6 ++
 be/src/pipeline/pipeline_x/dependency.h            |  1 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  2 +-
 6 files changed, 121 insertions(+), 15 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f37f28d59c7..93ce24e8e72 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -58,6 +58,21 @@ Status 
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
     _recovery_probe_blocks =
             ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT, 
"SpillAndPartition");
 
+    _spill_serialize_block_timer = ADD_CHILD_TIMER_WITH_LEVEL(
+            Base::profile(), "SpillSerializeBlockTime", "SpillAndPartition", 
1);
+    _spill_write_disk_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteDiskTime",
+                                                         "SpillAndPartition", 
1);
+    _spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteDataSize",
+                                                    TUnit::BYTES, 
"SpillAndPartition", 1);
+    _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockCount",
+                                                      TUnit::UNIT, 
"SpillAndPartition", 1);
+    _spill_read_data_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadDataTime",
+                                                       "SpillAndPartition", 1);
+    _spill_deserialize_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillDeserializeTime",
+                                                         "SpillAndPartition", 
1);
+    _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillReadDataSize",
+                                                     TUnit::BYTES, 
"SpillAndPartition", 1);
+
     // Build phase
     _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
     _build_rows_counter = ADD_CHILD_COUNTER(profile(), "BuildRows", 
TUnit::UNIT, "BuildPhase");
@@ -141,7 +156,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
                                                              uint32_t 
partition_index) {
     auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
     auto& mutable_block = partitioned_build_blocks[partition_index];
-    if (!mutable_block || mutable_block->rows() == 0) {
+    if (!mutable_block || mutable_block->rows() < state->batch_size()) {
         --_spilling_task_count;
         return Status::OK();
     }
@@ -153,6 +168,8 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
                 _parent->id(), std::numeric_limits<int32_t>::max(),
                 std::numeric_limits<size_t>::max(), _runtime_profile.get()));
         RETURN_IF_ERROR(build_spilling_stream->prepare_spill());
+        
build_spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
+                                                  _spill_data_size, 
_spill_write_disk_timer);
     }
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
@@ -191,18 +208,28 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                 std::numeric_limits<int32_t>::max(), 
std::numeric_limits<size_t>::max(),
                 _runtime_profile.get()));
         RETURN_IF_ERROR(spilling_stream->prepare_spill());
+        spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
+                                            _spill_data_size, 
_spill_write_disk_timer);
     }
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
             spilling_stream->get_spill_root_dir());
 
     auto& blocks = _probe_blocks[partition_index];
+    auto& partitioned_block = _partitioned_blocks[partition_index];
+    if (partitioned_block && partitioned_block->rows() >= state->batch_size()) 
{
+        blocks.emplace_back(partitioned_block->to_block());
+        partitioned_block.reset();
+    }
 
     if (!blocks.empty()) {
         return spill_io_pool->submit_func([state, &blocks, &spilling_stream, 
this] {
             (void)state; // avoid ut compile error
             SCOPED_ATTACH_TASK(state);
-            for (auto& block : blocks) {
+            COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
+            while (!blocks.empty()) {
+                auto block = std::move(blocks.back());
+                blocks.pop_back();
                 if (_spill_status_ok) {
                     auto st = spilling_stream->spill_block(block, false);
                     if (!st.ok()) {
@@ -217,8 +244,6 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                 }
             }
 
-            COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
-            blocks.clear();
             --_spilling_task_count;
 
             if (_spilling_task_count == 0) {
@@ -241,6 +266,8 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
     if (build_spilling_stream) {
         build_spilling_stream->end_spill(Status::OK());
         RETURN_IF_ERROR(build_spilling_stream->spill_eof());
+        build_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
+                                                 _spill_read_bytes);
     }
 
     auto& probe_spilling_stream = _probe_spilling_streams[partition_index];
@@ -248,6 +275,8 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
     if (probe_spilling_stream) {
         probe_spilling_stream->end_spill(Status::OK());
         RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
+        probe_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
+                                                 _spill_read_bytes);
     }
 
     return Status::OK();
@@ -259,6 +288,8 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     auto& spilled_stream = _shared_state->spilled_streams[partition_index];
     has_data = false;
     if (!spilled_stream) {
+        LOG(INFO) << "no data need to recovery for partition: " << 
partition_index
+                  << ", node id: " << _parent->id() << ", task id: " << 
state->task_id();
         return Status::OK();
     }
 
@@ -288,6 +319,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
                 continue;
             }
 
+            DCHECK_EQ(mutable_block->columns(), block.columns());
             if (mutable_block->empty()) {
                 *mutable_block = std::move(block);
             } else {
@@ -301,6 +333,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
             }
         }
 
+        LOG(INFO) << "recovery data done for partition: " << 
spilled_stream->get_spill_dir();
         
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
         spilled_stream.reset();
         _dependency->set_ready();
@@ -350,6 +383,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
         }
 
         if (eos) {
+            LOG(INFO) << "recovery probe data done: " << 
spilled_stream->get_spill_dir();
             
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
             spilled_stream.reset();
         }
@@ -401,10 +435,17 @@ Status PartitionedHashJoinProbeOperatorX::init(const 
TPlanNode& tnode, RuntimeSt
     return _probe_operator->init(tnode_, state);
 }
 Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
-    RETURN_IF_ERROR(OperatorXBase::prepare(state));
+    // here do NOT call `OperatorXBase::prepare(state)`
+    // RETURN_IF_ERROR(OperatorXBase::prepare(state));
+    for (auto& conjunct : _conjuncts) {
+        RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
+    }
+
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, 
intermediate_row_desc()));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, 
*_intermediate_row_desc));
     RETURN_IF_ERROR(_probe_operator->set_child(_child_x));
-    RETURN_IF_ERROR(_probe_operator->set_child(_build_side_child));
+    DCHECK(_build_side_child != nullptr);
+    _probe_operator->set_build_side_child(_build_side_child);
     RETURN_IF_ERROR(_sink_operator->set_child(_build_side_child));
     RETURN_IF_ERROR(_probe_operator->prepare(state));
     RETURN_IF_ERROR(_sink_operator->prepare(state));
@@ -524,6 +565,9 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
         partitioned_block.reset();
     }
     RETURN_IF_ERROR(_sink_operator->sink(local_state._runtime_state.get(), 
&block, true));
+    LOG(INFO) << "internal build operator finished, node id: " << id()
+              << ", task id: " << state->task_id()
+              << ", partition: " << local_state._partition_cursor;
     return Status::OK();
 }
 
@@ -615,18 +659,25 @@ bool 
PartitionedHashJoinProbeOperatorX::need_data_from_children(RuntimeState* st
 size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* 
state) const {
     auto& local_state = get_local_state(state);
     size_t mem_size = 0;
+    uint32_t spilling_start = local_state._child_eos ? 
local_state._partition_cursor + 1 : 0;
+    DCHECK_GE(spilling_start, local_state._partition_cursor);
 
     auto& partitioned_build_blocks = 
local_state._shared_state->partitioned_build_blocks;
     auto& probe_blocks = local_state._probe_blocks;
-    for (uint32_t i = local_state._partition_cursor + 1; i < _partition_count; 
++i) {
+    for (uint32_t i = spilling_start; i < _partition_count; ++i) {
         auto& build_block = partitioned_build_blocks[i];
-        if (build_block && build_block->rows() > 0) {
+        if (build_block && build_block->rows() >= state->batch_size()) {
             mem_size += build_block->allocated_bytes();
         }
 
         for (auto& block : probe_blocks[i]) {
             mem_size += block.allocated_bytes();
         }
+
+        auto& partitioned_block = local_state._partitioned_blocks[i];
+        if (partitioned_block && partitioned_block->rows() >= 
state->batch_size()) {
+            mem_size += partitioned_block->allocated_bytes();
+        }
     }
     return mem_size;
 }
@@ -634,14 +685,16 @@ size_t 
PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
 Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, 
bool& wait_for_io) {
     auto& local_state = get_local_state(state);
     wait_for_io = false;
-    if (_partition_count > (local_state._partition_cursor + 1)) {
-        local_state._spilling_task_count =
-                (_partition_count - local_state._partition_cursor - 1) * 2;
+    uint32_t spilling_start = local_state._child_eos ? 
local_state._partition_cursor + 1 : 0;
+    DCHECK_GE(spilling_start, local_state._partition_cursor);
+
+    if (_partition_count > spilling_start) {
+        local_state._spilling_task_count = (_partition_count - spilling_start) 
* 2;
     } else {
         return Status::OK();
     }
 
-    for (uint32_t i = local_state._partition_cursor + 1; i < _partition_count; 
++i) {
+    for (uint32_t i = spilling_start; i < _partition_count; ++i) {
         RETURN_IF_ERROR(local_state.spill_build_block(state, i));
         RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
     }
@@ -657,6 +710,14 @@ Status 
PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo
 }
 
 bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* 
state) const {
+    auto& local_state = get_local_state(state);
+
+    if (local_state._shared_state->need_to_spill) {
+        const auto revocable_size = revocable_mem_size(state);
+        const auto min_revocable_size = state->min_revocable_mem();
+        return revocable_size > min_revocable_size;
+    }
+
     auto sys_mem_available = MemInfo::sys_mem_available();
     auto sys_mem_warning_water_mark = 
doris::MemInfo::sys_mem_available_warning_water_mark();
 
@@ -692,6 +753,7 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
             bool wait_for_io = false;
             RETURN_IF_ERROR(_revoke_memory(state, wait_for_io));
             if (wait_for_io) {
+                local_state._shared_state->need_to_spill = true;
                 return Status::OK();
             }
         }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 27c4a9e0bb2..adbaf19314f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -100,6 +100,14 @@ private:
     RuntimeProfile::Counter* _recovery_probe_rows = nullptr;
     RuntimeProfile::Counter* _recovery_probe_blocks = nullptr;
 
+    RuntimeProfile::Counter* _spill_read_data_time = nullptr;
+    RuntimeProfile::Counter* _spill_deserialize_time = nullptr;
+    RuntimeProfile::Counter* _spill_read_bytes = nullptr;
+    RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
+    RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
+    RuntimeProfile::Counter* _spill_data_size = nullptr;
+    RuntimeProfile::Counter* _spill_block_count = nullptr;
+
     RuntimeProfile::Counter* _build_phase_label = nullptr;
     RuntimeProfile::Counter* _build_rows_counter = nullptr;
     RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 5cbf2b15ec1..20b3531c0cd 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -36,6 +36,11 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
     _partition_timer = ADD_TIMER(profile(), "PartitionTime");
     _partition_shuffle_timer = ADD_TIMER(profile(), "PartitionShuffleTime");
 
+    _spill_serialize_block_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillSerializeBlockTime", 1);
+    _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillWriteDiskTime", 1);
+    _spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize", 
TUnit::BYTES, 1);
+    _spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(), 
"SpillWriteBlockCount", TUnit::UNIT, 1);
+
     return _partitioner->prepare(state, p._child_x->row_desc());
 }
 
@@ -51,7 +56,8 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
         vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
         auto& mutable_block = _shared_state->partitioned_build_blocks[i];
 
-        if (!mutable_block || mutable_block->rows() == 0) {
+        if (!mutable_block || mutable_block->rows() < state->batch_size()) {
+            --_spilling_streams_count;
             continue;
         }
 
@@ -61,6 +67,8 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
                     _parent->id(), std::numeric_limits<int32_t>::max(),
                     std::numeric_limits<size_t>::max(), _profile));
             RETURN_IF_ERROR(spilling_stream->prepare_spill());
+            spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
+                                                _spill_data_size, 
_spill_write_disk_timer);
         }
 
         auto* spill_io_pool =
@@ -79,9 +87,14 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
     }
 
     if (_spilling_streams_count > 0) {
+        _shared_state->need_to_spill = true;
         std::unique_lock<std::mutex> lock(_spill_lock);
         if (_spilling_streams_count > 0) {
             _dependency->block();
+        } else if (_child_eos) {
+            LOG(INFO) << "sink eos, set_ready_to_read, node id: " << 
_parent->id()
+                      << ", task id: " << state->task_id();
+            _dependency->set_ready_to_read();
         }
     }
     return Status::OK();
@@ -108,6 +121,11 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
     if (_spilling_streams_count == 0) {
         std::unique_lock<std::mutex> lock(_spill_lock);
         _dependency->set_ready();
+        if (_child_eos) {
+            LOG(INFO) << "sink eos, set_ready_to_read, node id: " << 
_parent->id()
+                      << ", task id: " << state()->task_id();
+            _dependency->set_ready_to_read();
+        }
     }
 }
 
@@ -157,6 +175,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
         return local_state._spill_status;
     }
 
+    local_state._child_eos = eos;
+
     const auto rows = in_block->rows();
 
     if (rows > 0) {
@@ -190,9 +210,18 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
             partitioned_blocks[i]->add_rows(in_block, 
&(partition_indexes[i][0]),
                                             &(partition_indexes[i][count]));
         }
+
+        if (local_state._shared_state->need_to_spill) {
+            const auto revocable_size = revocable_mem_size(state);
+            if (revocable_size > state->min_revocable_mem()) {
+                return local_state.revoke_memory(state);
+            }
+        }
     }
 
     if (eos) {
+        LOG(INFO) << "sink eos, set_ready_to_read, node id: " << id()
+                  << ", task id: " << state->task_id();
         local_state._dependency->set_ready_to_read();
     }
 
@@ -207,7 +236,7 @@ size_t 
PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state)
     size_t mem_size = 0;
     for (uint32_t i = 0; i != _partition_count; ++i) {
         auto& block = partitioned_blocks[i];
-        if (block) {
+        if (block && block->rows() >= state->batch_size()) {
             mem_size += block->allocated_bytes();
         }
     }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 152b36459b3..f2d5ca3e140 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -61,6 +61,8 @@ protected:
     std::atomic<bool> _spill_status_ok {true};
     std::mutex _spill_lock;
 
+    bool _child_eos {false};
+
     Status _spill_status;
     std::mutex _spill_status_lock;
 
@@ -68,6 +70,10 @@ protected:
 
     RuntimeProfile::Counter* _partition_timer = nullptr;
     RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
+    RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
+    RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
+    RuntimeProfile::Counter* _spill_data_size = nullptr;
+    RuntimeProfile::Counter* _spill_block_count = nullptr;
 };
 
 class PartitionedHashJoinSinkOperatorX
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index db2b0341fe4..7815d4a9ce0 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -578,6 +578,7 @@ struct HashJoinSharedState : public JoinSharedState {
 struct PartitionedHashJoinSharedState : public HashJoinSharedState {
     std::vector<std::unique_ptr<vectorized::MutableBlock>> 
partitioned_build_blocks;
     std::vector<vectorized::SpillStreamSPtr> spilled_streams;
+    bool need_to_spill = false;
 };
 
 struct NestedLoopJoinSharedState : public JoinSharedState {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1df715176ac..ecb7023be87 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1031,7 +1031,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
                                        tnode.hash_join_node.is_broadcast_join;
         const auto enable_join_spill = _runtime_state->enable_join_spill();
         if (enable_join_spill && !is_broadcast_join) {
-            const uint32_t partition_count = 16;
+            const uint32_t partition_count = 32;
             op.reset(new PartitionedHashJoinProbeOperatorX(pool, tnode, 
next_operator_id(), descs,
                                                            partition_count));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to