This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 540ccc606a1b681d090281ebf30c59c63515da25
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed Sep 11 18:06:04 2024 +0800

    [fix] use faststring in Block::serialize
---
 be/src/pipeline/dependency.h                       |  3 +-
 .../exec/partitioned_aggregation_sink_operator.h   |  2 +-
 .../partitioned_aggregation_source_operator.cpp    | 23 ++++++++++--
 .../exec/partitioned_hash_join_probe_operator.cpp  |  3 ++
 .../exec/partitioned_hash_join_sink_operator.cpp   | 42 +++++++++++-----------
 be/src/vec/core/block.cpp                          | 17 ++++-----
 6 files changed, 57 insertions(+), 33 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 863458d3bde..8cb479ccbb0 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -454,7 +454,8 @@ struct PartitionedAggSharedState : public BasicSharedState,
     std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
 
     size_t get_partition_index(size_t hash_value) const {
-        return (hash_value >> (32 - partition_count_bits)) & 
max_partition_index;
+        // return (hash_value >> (32 - partition_count_bits)) & 
max_partition_index;
+        return hash_value % partition_count;
     }
 };
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index a7929337b63..756b686a5b3 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -346,6 +346,6 @@ private:
     friend class PartitionedAggSinkLocalState;
     std::unique_ptr<AggSinkOperatorX> _agg_sink_operator;
 
-    size_t _spill_partition_count_bits = 6;
+    size_t _spill_partition_count_bits = 5;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index fa41723beba..bf7ec22793f 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -202,7 +202,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
 
     _is_merging = true;
     VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << 
_parent->node_id()
-               << " merge spilled agg data";
+               << ", task id: " << _state->task_id() << " merge spilled agg 
data";
 
     
RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table());
     _spill_dependency->Dependency::block();
@@ -213,6 +213,9 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
     submit_timer.start();
     auto spill_func = [this, state, query_id, submit_timer] {
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+        MonotonicStopWatch execution_timer;
+        execution_timer.start();
+        size_t read_size = 0;
         Defer defer {[&]() {
             if (!_status.ok() || state->is_cancelled()) {
                 if (!_status.ok()) {
@@ -221,9 +224,13 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                                  << " merge spilled agg data error: " << 
_status;
                 }
                 _shared_state->close();
-            } else if (_shared_state->spill_partitions.empty()) {
+            } else {
                 VLOG_DEBUG << "query " << print_id(query_id) << " agg node " 
<< _parent->node_id()
-                           << " merge spilled agg data finish";
+                           << ", task id: " << _state->task_id()
+                           << " merge spilled agg data finish, time used: "
+                           << (execution_timer.elapsed_time() / (1000L * 1000 
* 1000))
+                           << "s, read size: " << read_size << ", "
+                           << _shared_state->spill_partitions.size() << " 
partitions left";
             }
             
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
             _is_merging = false;
@@ -256,6 +263,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
 
                     if (!block.empty()) {
                         has_agg_data = true;
+                        read_size += block.bytes();
                         _status = parent._agg_source_operator
                                           
->merge_with_serialized_key_helper<false>(
                                                   _runtime_state.get(), 
&block);
@@ -263,6 +271,15 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                     }
                 }
                 
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+
+                if (!has_agg_data) {
+                    VLOG_DEBUG << "query " << print_id(query_id) << " agg node 
"
+                               << _parent->node_id() << ", task id: " << 
_state->task_id()
+                               << " merge spilled agg data finish, time used: "
+                               << execution_timer.elapsed_time() << ", empty 
partition "
+                               << read_size << ", " << 
_shared_state->spill_partitions.size()
+                               << " partitions left";
+                }
             }
             _shared_state->spill_partitions.pop_front();
         }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f1ca4b8f7b5..49e170fe78f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -800,6 +800,9 @@ Status 
PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
                << ", task: " << state->task_id() << ", child eos: " << 
local_state._child_eos;
 
     if (local_state._child_eos) {
+        VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash 
probe node: " << node_id()
+                   << ", task: " << state->task_id() << ", child eos: " << 
local_state._child_eos
+                   << ", will not revoke size: " << revocable_mem_size(state);
         return Status::OK();
     }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 7e106db5358..fc227978d4c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -353,21 +353,20 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
         }
     }
 
+    std::unique_lock<std::mutex> lock(_spill_lock);
     if (_spilling_streams_count > 0) {
-        std::unique_lock<std::mutex> lock(_spill_lock);
-        if (_spilling_streams_count > 0) {
-            _spill_dependency->block();
-        } else if (_child_eos) {
-            LOG(INFO) << "hash join sink " << _parent->node_id() << " 
set_ready_to_read"
-                      << ", task id: " << state->task_id();
-            std::for_each(_shared_state->partitioned_build_blocks.begin(),
-                          _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
-                              if (block) {
-                                  COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
-                              }
-                          });
-            _dependency->set_ready_to_read();
-        }
+        _spill_dependency->block();
+    } else if (_child_eos) {
+        VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join 
sink "
+                   << _parent->node_id() << " set_ready_to_read"
+                   << ", task id: " << state->task_id();
+        std::for_each(_shared_state->partitioned_build_blocks.begin(),
+                      _shared_state->partitioned_build_blocks.end(), [&](auto& 
block) {
+                          if (block) {
+                              COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
+                          }
+                      });
+        _dependency->set_ready_to_read();
     }
     return Status::OK();
 }
@@ -438,8 +437,9 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
         std::unique_lock<std::mutex> lock(_spill_lock);
         _spill_dependency->set_ready();
         if (_child_eos) {
-            LOG(INFO) << "hash join sink " << _parent->node_id() << " 
set_ready_to_read"
-                      << ", task id: " << state()->task_id();
+            VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << 
", hash join sink "
+                       << _parent->node_id() << " set_ready_to_read"
+                       << ", task id: " << state()->task_id();
             std::for_each(_shared_state->partitioned_build_blocks.begin(),
                           _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
                               if (block) {
@@ -550,8 +550,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
     const auto need_to_spill = local_state._shared_state->need_to_spill;
     if (rows == 0) {
         if (eos) {
-            LOG(INFO) << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
-                      << ", task id: " << state->task_id() << ", need spil: " 
<< need_to_spill;
+            VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash 
join sink "
+                       << node_id() << " sink eos, set_ready_to_read"
+                       << ", task id: " << state->task_id() << ", need spil: " 
<< need_to_spill;
 
             if (!need_to_spill) {
                 if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) 
{
@@ -610,8 +611,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                 local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
 
         if (eos) {
-            LOG(INFO) << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
-                      << ", task id: " << state->task_id();
+            VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash 
join sink "
+                       << node_id() << " sink eos, set_ready_to_read"
+                       << ", task id: " << state->task_id();
             local_state._dependency->set_ready_to_read();
         }
     }
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index d4644fca489..6b6dfb483ce 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -88,7 +88,7 @@ Status Block::deserialize(const PBlock& pblock) {
     
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
 
     const char* buf = nullptr;
-    std::string compression_scratch;
+    faststring compression_scratch;
     if (pblock.compressed()) {
         // Decompress
         SCOPED_RAW_TIMER(&_decompress_time_ns);
@@ -111,11 +111,11 @@ Status Block::deserialize(const PBlock& pblock) {
             DCHECK(success) << "snappy::GetUncompressedLength failed";
             compression_scratch.resize(uncompressed_size);
             success = snappy::RawUncompress(compressed_data, compressed_size,
-                                            compression_scratch.data());
+                                            
reinterpret_cast<char*>(compression_scratch.data()));
             DCHECK(success) << "snappy::RawUncompress failed";
         }
         _decompressed_bytes = uncompressed_size;
-        buf = compression_scratch.data();
+        buf = reinterpret_cast<char*>(compression_scratch.data());
     } else {
         buf = pblock.column_values().data();
     }
@@ -925,7 +925,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
 
     // serialize data values
     // when data type is HLL, content_uncompressed_size maybe larger than real 
size.
-    std::string column_values;
+    faststring column_values;
     try {
         // TODO: After support c++23, we should use resize_and_overwrite to 
replace resize
         column_values.resize(content_uncompressed_size);
@@ -935,13 +935,14 @@ Status Block::serialize(int be_exec_version, PBlock* 
pblock,
         LOG(WARNING) << msg;
         return Status::BufferAllocFailed(msg);
     }
-    char* buf = column_values.data();
+    char* buf = reinterpret_cast<char*>(column_values.data());
 
     for (const auto& c : *this) {
         buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version());
     }
     *uncompressed_bytes = content_uncompressed_size;
-    const size_t serialize_bytes = buf - column_values.data() + 
STREAMVBYTE_PADDING;
+    const size_t serialize_bytes =
+            buf - reinterpret_cast<char*>(column_values.data()) + 
STREAMVBYTE_PADDING;
     *compressed_bytes = serialize_bytes;
     column_values.resize(serialize_bytes);
 
@@ -964,13 +965,13 @@ Status Block::serialize(int be_exec_version, PBlock* 
pblock,
             pblock->set_compressed(true);
             *compressed_bytes = compressed_size;
         } else {
-            pblock->set_column_values(std::move(column_values));
+            pblock->set_column_values(column_values.data(), 
column_values.size());
         }
 
         VLOG_ROW << "uncompressed size: " << content_uncompressed_size
                  << ", compressed size: " << compressed_size;
     } else {
-        pblock->set_column_values(std::move(column_values));
+        pblock->set_column_values(column_values.data(), column_values.size());
     }
     if (!allow_transfer_large_data && *compressed_bytes >= 
std::numeric_limits<int32_t>::max()) {
         return Status::InternalError("The block is large than 2GB({}), can not 
send by Protobuf.",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to