This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 540ccc606a1b681d090281ebf30c59c63515da25 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Wed Sep 11 18:06:04 2024 +0800 [fix] use faststring in Block::serialize --- be/src/pipeline/dependency.h | 3 +- .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../partitioned_aggregation_source_operator.cpp | 23 ++++++++++-- .../exec/partitioned_hash_join_probe_operator.cpp | 3 ++ .../exec/partitioned_hash_join_sink_operator.cpp | 42 +++++++++++----------- be/src/vec/core/block.cpp | 17 ++++----- 6 files changed, 57 insertions(+), 33 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 863458d3bde..8cb479ccbb0 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -454,7 +454,8 @@ struct PartitionedAggSharedState : public BasicSharedState, std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions; size_t get_partition_index(size_t hash_value) const { - return (hash_value >> (32 - partition_count_bits)) & max_partition_index; + // return (hash_value >> (32 - partition_count_bits)) & max_partition_index; + return hash_value % partition_count; } }; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index a7929337b63..756b686a5b3 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -346,6 +346,6 @@ private: friend class PartitionedAggSinkLocalState; std::unique_ptr<AggSinkOperatorX> _agg_sink_operator; - size_t _spill_partition_count_bits = 6; + size_t _spill_partition_count_bits = 5; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index fa41723beba..bf7ec22793f 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -202,7 +202,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime _is_merging = true; VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << _parent->node_id() - << " merge spilled agg data"; + << ", task id: " << _state->task_id() << " merge spilled agg data"; RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table()); _spill_dependency->Dependency::block(); @@ -213,6 +213,9 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime submit_timer.start(); auto spill_func = [this, state, query_id, submit_timer] { _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + MonotonicStopWatch execution_timer; + execution_timer.start(); + size_t read_size = 0; Defer defer {[&]() { if (!_status.ok() || state->is_cancelled()) { if (!_status.ok()) { @@ -221,9 +224,13 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime << " merge spilled agg data error: " << _status; } _shared_state->close(); - } else if (_shared_state->spill_partitions.empty()) { + } else { VLOG_DEBUG << "query " << print_id(query_id) << " agg node " << _parent->node_id() - << " merge spilled agg data finish"; + << ", task id: " << _state->task_id() + << " merge spilled agg data finish, time used: " + << (execution_timer.elapsed_time() / (1000L * 1000 * 1000)) + << "s, read size: " << read_size << ", " + << _shared_state->spill_partitions.size() << " partitions left"; } Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once(); _is_merging = false; @@ -256,6 +263,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime if (!block.empty()) { has_agg_data = true; + read_size += block.bytes(); _status = parent._agg_source_operator ->merge_with_serialized_key_helper<false>( _runtime_state.get(), &block); @@ -263,6 +271,15 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime } } (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); + + if (!has_agg_data) { + VLOG_DEBUG << "query " << print_id(query_id) << " agg node " + << _parent->node_id() << ", task id: " << _state->task_id() + << " merge spilled agg data finish, time used: " + << execution_timer.elapsed_time() << ", empty partition " + << read_size << ", " << _shared_state->spill_partitions.size() + << " partitions left"; + } } _shared_state->spill_partitions.pop_front(); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index f1ca4b8f7b5..49e170fe78f 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -800,6 +800,9 @@ Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) { << ", task: " << state->task_id() << ", child eos: " << local_state._child_eos; if (local_state._child_eos) { + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() + << ", task: " << state->task_id() << ", child eos: " << local_state._child_eos + << ", will not revoke size: " << revocable_mem_size(state); return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 7e106db5358..fc227978d4c 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -353,21 +353,20 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { } } + std::unique_lock<std::mutex> lock(_spill_lock); if (_spilling_streams_count > 0) { - std::unique_lock<std::mutex> lock(_spill_lock); - if (_spilling_streams_count > 0) { - _spill_dependency->block(); - } else if (_child_eos) { - LOG(INFO) << "hash join sink " << _parent->node_id() << " set_ready_to_read" - << ", task id: " << state->task_id(); - std::for_each(_shared_state->partitioned_build_blocks.begin(), - _shared_state->partitioned_build_blocks.end(), [&](auto& block) { - if (block) { - COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); - } - }); - _dependency->set_ready_to_read(); - } + _spill_dependency->block(); + } else if (_child_eos) { + VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join sink " + << _parent->node_id() << " set_ready_to_read" + << ", task id: " << state->task_id(); + std::for_each(_shared_state->partitioned_build_blocks.begin(), + _shared_state->partitioned_build_blocks.end(), [&](auto& block) { + if (block) { + COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); + } + }); + _dependency->set_ready_to_read(); } return Status::OK(); } @@ -438,8 +437,9 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( std::unique_lock<std::mutex> lock(_spill_lock); _spill_dependency->set_ready(); if (_child_eos) { - LOG(INFO) << "hash join sink " << _parent->node_id() << " set_ready_to_read" - << ", task id: " << state()->task_id(); + VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << ", hash join sink " + << _parent->node_id() << " set_ready_to_read" + << ", task id: " << state()->task_id(); std::for_each(_shared_state->partitioned_build_blocks.begin(), _shared_state->partitioned_build_blocks.end(), [&](auto& block) { if (block) { @@ -550,8 +550,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B const auto need_to_spill = local_state._shared_state->need_to_spill; if (rows == 0) { if (eos) { - LOG(INFO) << "hash join sink " << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id() << ", need spil: " << need_to_spill; + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash join sink " + << node_id() << " sink eos, set_ready_to_read" + << ", task id: " << state->task_id() << ", need spil: " << need_to_spill; if (!need_to_spill) { if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { @@ -610,8 +611,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state._shared_state->inner_runtime_state.get(), in_block, eos)); if (eos) { - LOG(INFO) << "hash join sink " << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id(); + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash join sink " + << node_id() << " sink eos, set_ready_to_read" + << ", task id: " << state->task_id(); local_state._dependency->set_ready_to_read(); } } diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index d4644fca489..6b6dfb483ce 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -88,7 +88,7 @@ Status Block::deserialize(const PBlock& pblock) { RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); const char* buf = nullptr; - std::string compression_scratch; + faststring compression_scratch; if (pblock.compressed()) { // Decompress SCOPED_RAW_TIMER(&_decompress_time_ns); @@ -111,11 +111,11 @@ Status Block::deserialize(const PBlock& pblock) { DCHECK(success) << "snappy::GetUncompressedLength failed"; compression_scratch.resize(uncompressed_size); success = snappy::RawUncompress(compressed_data, compressed_size, - compression_scratch.data()); + reinterpret_cast<char*>(compression_scratch.data())); DCHECK(success) << "snappy::RawUncompress failed"; } _decompressed_bytes = uncompressed_size; - buf = compression_scratch.data(); + buf = reinterpret_cast<char*>(compression_scratch.data()); } else { buf = pblock.column_values().data(); } @@ -925,7 +925,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, // serialize data values // when data type is HLL, content_uncompressed_size maybe larger than real size. - std::string column_values; + faststring column_values; try { // TODO: After support c++23, we should use resize_and_overwrite to replace resize column_values.resize(content_uncompressed_size); @@ -935,13 +935,14 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, LOG(WARNING) << msg; return Status::BufferAllocFailed(msg); } - char* buf = column_values.data(); + char* buf = reinterpret_cast<char*>(column_values.data()); for (const auto& c : *this) { buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version()); } *uncompressed_bytes = content_uncompressed_size; - const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING; + const size_t serialize_bytes = + buf - reinterpret_cast<char*>(column_values.data()) + STREAMVBYTE_PADDING; *compressed_bytes = serialize_bytes; column_values.resize(serialize_bytes); @@ -964,13 +965,13 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, pblock->set_compressed(true); *compressed_bytes = compressed_size; } else { - pblock->set_column_values(std::move(column_values)); + pblock->set_column_values(column_values.data(), column_values.size()); } VLOG_ROW << "uncompressed size: " << content_uncompressed_size << ", compressed size: " << compressed_size; } else { - pblock->set_column_values(std::move(column_values)); + pblock->set_column_values(column_values.data(), column_values.size()); } if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) { return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org