This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ef9559dbe63 [Chore](compile) add some compile_check_begin (#42333) ef9559dbe63 is described below commit ef9559dbe63cb1c3ae7f5a6b579125b3b5a6bdaf Author: Pxl <pxl...@qq.com> AuthorDate: Mon Oct 28 11:15:13 2024 +0800 [Chore](compile) add some compile_check_begin (#42333) ## Proposed changes add some compile_check_begin --- be/src/pipeline/dependency.cpp | 6 ++--- be/src/pipeline/dependency.h | 10 ++++---- be/src/pipeline/exec/exchange_sink_operator.cpp | 6 ++--- be/src/pipeline/exec/exchange_sink_operator.h | 8 +++--- be/src/pipeline/exec/hashjoin_probe_operator.cpp | 19 +++++++------- .../pipeline/exec/join/process_hash_table_probe.h | 10 ++++---- .../exec/join/process_hash_table_probe_impl.h | 30 ++++++++++++---------- be/src/pipeline/exec/set_sink_operator.cpp | 9 ++++--- be/src/pipeline/exec/set_sink_operator.h | 4 ++- be/src/pipeline/exec/set_source_operator.cpp | 4 +-- be/src/pipeline/exec/set_source_operator.h | 6 ++--- be/src/pipeline/local_exchange/local_exchanger.cpp | 19 +++++++------- be/src/pipeline/local_exchange/local_exchanger.h | 3 ++- be/src/pipeline/pipeline.h | 10 +++++--- be/src/pipeline/pipeline_fragment_context.cpp | 24 +++++++++++------ be/src/pipeline/task_queue.cpp | 4 +-- be/src/pipeline/task_queue.h | 9 ++++--- be/src/pipeline/task_scheduler.cpp | 6 ++--- be/src/pipeline/task_scheduler.h | 2 +- .../vec/common/hash_table/hash_table_set_build.h | 4 +-- be/src/vec/common/hash_table/join_hash_table.h | 14 +++++----- be/src/vec/sink/vdata_stream_sender.cpp | 6 ++--- be/src/vec/sink/vdata_stream_sender.h | 6 ++--- 23 files changed, 120 insertions(+), 99 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 1d450d164a1..8d82c340e2d 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -32,7 +32,7 @@ #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { - +#include "common/compile_check_begin.h" Dependency* BasicSharedState::create_source_dependency(int operator_id, int node_id, std::string name) { source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY")); @@ -267,8 +267,8 @@ bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t num_rows, need_computes.data()); } - auto set_computes_arr = [](auto* __restrict res, auto* __restrict computes, int rows) { - for (int i = 0; i < rows; ++i) { + auto set_computes_arr = [](auto* __restrict res, auto* __restrict computes, size_t rows) { + for (size_t i = 0; i < rows; ++i) { computes[i] = computes[i] == res[i]; } }; diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 8060ee8362d..a035d57a837 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -46,7 +46,7 @@ class VSlotRef; } // namespace doris::vectorized namespace doris::pipeline { - +#include "common/compile_check_begin.h" class Dependency; class PipelineTask; struct BasicSharedState; @@ -504,7 +504,7 @@ struct SpillSortSharedState : public BasicSharedState, ~SpillSortSharedState() override = default; // This number specifies the maximum size of sub blocks - static constexpr int SORT_BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024; + static constexpr size_t SORT_BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024; void update_spill_block_batch_row_count(const vectorized::Block* block) { auto rows = block->rows(); if (rows > 0 && 0 == avg_row_bytes) { @@ -525,7 +525,7 @@ struct SpillSortSharedState : public BasicSharedState, std::deque<vectorized::SpillStreamSPtr> sorted_streams; size_t avg_row_bytes = 0; - int spill_block_batch_row_count; + size_t spill_block_batch_row_count; }; struct UnionSharedState : public BasicSharedState { @@ -677,7 +677,7 @@ public: std::vector<vectorized::VExprContextSPtrs> child_exprs_lists; /// init in build side - int child_quantity; + size_t child_quantity; vectorized::VExprContextSPtrs build_child_exprs; std::vector<Dependency*> probe_finished_children_dependency; @@ -867,5 +867,5 @@ private: std::vector<std::atomic_int64_t> _queues_mem_usage; const int64_t _each_queue_limit; }; - +#include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 3f12b4458cd..a3b6f8da7e9 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -38,7 +38,7 @@ #include "vec/exprs/vexpr.h" namespace doris::pipeline { - +#include "common/compile_check_begin.h" Status ExchangeSinkLocalState::serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers) { return _parent->cast<ExchangeSinkOperatorX>().serialize_block(*this, src, dest, num_receivers); @@ -661,7 +661,7 @@ void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buf Status ExchangeSinkOperatorX::channel_add_rows( RuntimeState* state, std::vector<std::shared_ptr<vectorized::Channel>>& channels, - int num_channels, const uint32_t* __restrict channel_ids, int rows, + size_t num_channels, const uint32_t* __restrict channel_ids, size_t rows, vectorized::Block* block, bool eos) { std::vector<std::vector<uint32_t>> channel2rows; channel2rows.resize(num_channels); @@ -676,7 +676,7 @@ Status ExchangeSinkOperatorX::channel_add_rows( Status ExchangeSinkOperatorX::channel_add_rows_with_idx( RuntimeState* state, std::vector<std::shared_ptr<vectorized::Channel>>& channels, - int num_channels, std::vector<std::vector<uint32_t>>& channel2rows, + size_t num_channels, std::vector<std::vector<uint32_t>>& channel2rows, vectorized::Block* block, bool eos) { Status status = Status::OK(); for (int i = 0; i < num_channels; ++i) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 6b936d4b12c..141693eb820 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -178,7 +178,7 @@ private: */ std::vector<std::shared_ptr<Dependency>> _local_channels_dependency; std::unique_ptr<vectorized::PartitionerBase> _partitioner; - int _partition_count; + size_t _partition_count; std::shared_ptr<Dependency> _finish_dependency; @@ -234,12 +234,12 @@ private: Status channel_add_rows(RuntimeState* state, std::vector<std::shared_ptr<vectorized::Channel>>& channels, - int num_channels, const uint32_t* channel_ids, int rows, - vectorized::Block* block, bool eos); + size_t num_channels, const uint32_t* __restrict channel_ids, + size_t rows, vectorized::Block* block, bool eos); Status channel_add_rows_with_idx(RuntimeState* state, std::vector<std::shared_ptr<vectorized::Channel>>& channels, - int num_channels, + size_t num_channels, std::vector<std::vector<uint32_t>>& channel2rows, vectorized::Block* block, bool eos); RuntimeState* _state = nullptr; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 8ee041f5759..bb869ee3257 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -19,6 +19,7 @@ #include <string> +#include "common/cast_set.h" #include "common/logging.h" #include "pipeline/exec/operator.h" #include "runtime/descriptors.h" @@ -295,15 +296,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) { using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - st = process_hashtable_ctx - .template process<need_null_map_for_probe, ignore_null>( - arg, - need_null_map_for_probe - ? &local_state._null_map_column->get_data() - : nullptr, - mutable_join_block, &temp_block, - local_state._probe_block.rows(), _is_mark_join, - _have_other_join_conjunct); + st = process_hashtable_ctx.template process<need_null_map_for_probe, + ignore_null>( + arg, + need_null_map_for_probe + ? &local_state._null_map_column->get_data() + : nullptr, + mutable_join_block, &temp_block, + cast_set<uint32_t>(local_state._probe_block.rows()), + _is_mark_join, _have_other_join_conjunct); } else { st = Status::InternalError("uninited hash table"); } diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h b/be/src/pipeline/exec/join/process_hash_table_probe.h index bf4a4d5763c..692b91f6a01 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe.h @@ -58,7 +58,7 @@ struct ProcessHashTableProbe { template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, - size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); + uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); // Only process the join with no other join conjunct, because of no other join conjunt // the output block struct is same with mutable block. we can do more opt on it and simplify @@ -68,7 +68,7 @@ struct ProcessHashTableProbe { bool with_other_conjuncts, bool is_mark_join> Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, - size_t probe_rows); + uint32_t probe_rows); // In the presence of other join conjunct, the process of join become more complicated. // each matching join column need to be processed by other join conjunct. so the struct of mutable block // and output block may be different @@ -93,7 +93,7 @@ struct ProcessHashTableProbe { /// For null aware join with other conjuncts, if the probe key of one row on left side is null, /// we should make this row match with all rows in build side. - size_t _process_probe_null_key(uint32_t probe_idx); + uint32_t _process_probe_null_key(uint32_t probe_idx); pipeline::HashJoinProbeLocalState* _parent = nullptr; const int _batch_size; @@ -138,8 +138,8 @@ struct ProcessHashTableProbe { RuntimeProfile::Counter* _probe_side_output_timer = nullptr; RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr; - int _right_col_idx; - int _right_col_len; + size_t _right_col_idx; + size_t _right_col_len; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 667c7a468d7..7fc639b47a4 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -19,6 +19,7 @@ #include <gen_cpp/PlanNodes_types.h> +#include "common/cast_set.h" #include "common/status.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "process_hash_table_probe.h" @@ -29,7 +30,7 @@ #include "vec/exprs/vexpr_context.h" namespace doris::pipeline { - +#include "common/compile_check_begin.h" template <int JoinOpType> ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState* parent, int batch_size) @@ -192,7 +193,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c vectorized::ConstNullMapPtr null_map, vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, - size_t probe_rows) { + uint32_t probe_rows) { if (_right_col_len && !_build_block) { return Status::InternalError("build block is nullptr"); } @@ -216,7 +217,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c auto& mcol = mutable_block.mutable_columns(); const bool has_mark_join_conjunct = !_parent->_mark_join_conjuncts.empty(); - int current_offset = 0; + uint32_t current_offset = 0; if constexpr ((JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && with_other_conjuncts) { @@ -259,8 +260,9 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c with_other_conjuncts, is_mark_join, need_null_map_for_probe && ignore_null > (hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), - probe_index, build_index, probe_rows, _probe_indexs.data(), - _probe_visited, _build_indexs.data(), has_mark_join_conjunct); + probe_index, build_index, cast_set<int32_t>(probe_rows), + _probe_indexs.data(), _probe_visited, _build_indexs.data(), + has_mark_join_conjunct); probe_index = new_probe_idx; build_index = new_build_idx; current_offset = new_current_offset; @@ -304,12 +306,12 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c } template <int JoinOpType> -size_t ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t probe_index) { +uint32_t ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t probe_index) { const auto rows = _build_block->rows(); DCHECK_LT(_build_index_for_null_probe_key, rows); DCHECK_LT(0, _build_index_for_null_probe_key); - size_t matched_cnt = 0; + uint32_t matched_cnt = 0; for (; _build_index_for_null_probe_key < rows && matched_cnt < _batch_size; ++matched_cnt) { _probe_indexs[matched_cnt] = probe_index; _build_indexs[matched_cnt] = _build_index_for_null_probe_key++; @@ -503,7 +505,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl } SCOPED_TIMER(_parent->_process_other_join_conjunct_timer); - int orig_columns = output_block->columns(); + size_t orig_columns = output_block->columns(); vectorized::IColumn::Filter other_conjunct_filter(row_count, 1); { bool can_be_filter_all = false; @@ -678,7 +680,7 @@ Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx, vectorized::ConstNullMapPtr null_map, vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, - size_t probe_rows, bool is_mark_join, + uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct) { Status res; std::visit( @@ -705,22 +707,22 @@ struct ExtractType<T(U)> { ProcessHashTableProbe<JoinOpType>::process<false, false, ExtractType<void(T)>::Type>( \ ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ - size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ + uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ template Status \ ProcessHashTableProbe<JoinOpType>::process<false, true, ExtractType<void(T)>::Type>( \ ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ - size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ + uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ template Status \ ProcessHashTableProbe<JoinOpType>::process<true, false, ExtractType<void(T)>::Type>( \ ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ - size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ + uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ template Status \ ProcessHashTableProbe<JoinOpType>::process<true, true, ExtractType<void(T)>::Type>( \ ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ - size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ + uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ \ template Status \ ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>( \ @@ -746,5 +748,5 @@ struct ExtractType<T(U)> { INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<true>)); \ INSTANTIATION(JoinOpType, (MethodOneString)); \ INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<false>)); - +#include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index e2f684d19f5..9a81333efae 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -24,6 +24,7 @@ #include "vec/core/materialize_block.h" namespace doris::pipeline { +#include "common/compile_check_begin.h" template <bool is_intersect> Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Block* in_block, @@ -87,14 +88,14 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block( vectorized::materialize_block_inplace(block); vectorized::ColumnRawPtrs raw_ptrs(_child_exprs.size()); RETURN_IF_ERROR(_extract_build_column(local_state, block, raw_ptrs, rows)); - + auto st = Status::OK(); std::visit( [&](auto&& arg) { using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { vectorized::HashTableBuild<HashTableCtxType, is_intersect> hash_table_build_process(&local_state, rows, raw_ptrs, state); - static_cast<void>(hash_table_build_process(arg, local_state._arena)); + st = hash_table_build_process(arg, local_state._arena); } else { LOG(FATAL) << "FATAL: uninited hash table"; __builtin_unreachable(); @@ -102,7 +103,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block( }, local_state._shared_state->hash_table_variants->method_variant); - return Status::OK(); + return st; } template <bool is_intersect> @@ -119,7 +120,7 @@ Status SetSinkOperatorX<is_intersect>::_extract_build_column( rows = is_all_const ? 1 : rows; for (size_t i = 0; i < _child_exprs.size(); ++i) { - int result_col_id = result_locs[i]; + size_t result_col_id = result_locs[i]; if (is_all_const) { block.get_by_position(result_col_id).column = diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 1c08eddc141..65c33795e5d 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -23,6 +23,7 @@ #include "operator.h" namespace doris { +#include "common/compile_check_begin.h" namespace vectorized { template <class HashTableContext, bool is_intersected> @@ -106,13 +107,14 @@ private: size_t& rows); const int _cur_child_id; - const int _child_quantity; + const size_t _child_quantity; // every child has its result expr list vectorized::VExprContextSPtrs _child_exprs; const bool _is_colocate; const std::vector<TExpr> _partition_exprs; using OperatorBase::_child; }; +#include "common/compile_check_end.h" } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index 278e2bb7014..58958462c2f 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -23,7 +23,7 @@ #include "pipeline/exec/operator.h" namespace doris::pipeline { - +#include "common/compile_check_begin.h" template <bool is_intersect> Status SetSourceLocalState<is_intersect>::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); @@ -115,7 +115,7 @@ template <typename HashTableContext> Status SetSourceOperatorX<is_intersect>::_get_data_in_hashtable( SetSourceLocalState<is_intersect>& local_state, HashTableContext& hash_table_ctx, vectorized::Block* output_block, const int batch_size, bool* eos) { - int left_col_len = local_state._left_table_data_types.size(); + size_t left_col_len = local_state._left_table_data_types.size(); hash_table_ctx.init_iterator(); auto& iter = hash_table_ctx.iterator; auto block_size = 0; diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h index 5157a2f9c97..ce3d0c52edf 100644 --- a/be/src/pipeline/exec/set_source_operator.h +++ b/be/src/pipeline/exec/set_source_operator.h @@ -26,7 +26,7 @@ namespace doris { class RuntimeState; namespace pipeline { - +#include "common/compile_check_begin.h" template <bool is_intersect> class SetSourceOperatorX; @@ -82,8 +82,8 @@ private: void _add_result_columns(SetSourceLocalState<is_intersect>& local_state, RowRefListWithFlags& value, int& block_size); - const int _child_quantity; + const size_t _child_quantity; }; - +#include "common/compile_check_end.h" } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 34b7fb503b5..da27a39772d 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -17,6 +17,7 @@ #include "pipeline/local_exchange/local_exchanger.h" +#include "common/cast_set.h" #include "common/status.h" #include "pipeline/exec/sort_sink_operator.h" #include "pipeline/exec/sort_source_operator.h" @@ -25,7 +26,7 @@ #include "vec/runtime/partitioner.h" namespace doris::pipeline { - +#include "common/compile_check_begin.h" template <typename BlockType> void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState& local_state, @@ -170,11 +171,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, LocalExchangeSinkLocalState& local_state) { - const auto rows = block->rows(); + const auto rows = cast_set<int32_t>(block->rows()); auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows); { local_state._partition_rows_histogram.assign(_num_partitions + 1, 0); - for (size_t i = 0; i < rows; ++i) { + for (int32_t i = 0; i < rows; ++i) { local_state._partition_rows_histogram[channel_ids[i]]++; } for (int32_t i = 1; i <= _num_partitions; ++i) { @@ -212,7 +213,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest */ const auto& map = local_state._parent->cast<LocalExchangeSinkOperatorX>() ._shuffle_idx_to_instance_idx; - new_block_wrapper->ref(map.size()); + new_block_wrapper->ref(cast_set<int>(map.size())); for (const auto& it : map) { DCHECK(it.second >= 0 && it.second < _num_partitions) << it.first << " : " << it.second << " " << _num_partitions; @@ -241,7 +242,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest } else { DCHECK(!bucket_seq_to_instance_idx.empty()); new_block_wrapper->ref(_num_partitions); - for (size_t i = 0; i < _num_partitions; i++) { + for (int i = 0; i < _num_partitions; i++) { uint32_t start = local_state._partition_rows_histogram[i]; uint32_t size = local_state._partition_rows_histogram[i + 1] - start; if (size > 0) { @@ -426,7 +427,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes(), local_state._channel_id); wrapper->ref(_num_partitions); - for (size_t i = 0; i < _num_partitions; i++) { + for (int i = 0; i < _num_partitions; i++) { _enqueue_data_and_set_ready(i, local_state, {wrapper, {0, wrapper->data_block.rows()}}); } @@ -500,11 +501,11 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, LocalExchangeSinkLocalState& local_state) { - const auto rows = block->rows(); + const auto rows = cast_set<int32_t>(block->rows()); auto row_idx = std::make_shared<std::vector<uint32_t>>(rows); { local_state._partition_rows_histogram.assign(_num_partitions + 1, 0); - for (size_t i = 0; i < rows; ++i) { + for (int32_t i = 0; i < rows; ++i) { local_state._partition_rows_histogram[channel_ids[i]]++; } for (int32_t i = 1; i <= _num_partitions; ++i) { @@ -517,7 +518,7 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, local_state._partition_rows_histogram[channel_ids[i]]--; } } - for (size_t i = 0; i < _num_partitions; i++) { + for (int32_t i = 0; i < _num_partitions; i++) { const size_t start = local_state._partition_rows_histogram[i]; const size_t size = local_state._partition_rows_histogram[i + 1] - start; if (size > 0) { diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 01b55816ba8..b3731638cb3 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -21,6 +21,7 @@ #include "pipeline/exec/operator.h" namespace doris::pipeline { +#include "common/compile_check_begin.h" class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; @@ -351,5 +352,5 @@ private: std::atomic_bool _is_pass_through = false; std::atomic_int32_t _total_block = 0; }; - +#include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index ef0ae9e9a75..9554537ca16 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -25,12 +25,13 @@ #include <utility> #include <vector> +#include "common/cast_set.h" #include "common/status.h" #include "pipeline/exec/operator.h" #include "util/runtime_profile.h" namespace doris::pipeline { - +#include "common/compile_check_begin.h" class PipelineFragmentContext; class Pipeline; @@ -119,10 +120,11 @@ public: fmt::format_to(debug_string_buffer, "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: {}]", _pipeline_id, _num_tasks, _num_tasks_created); - for (size_t i = 0; i < _operators.size(); i++) { + for (int i = 0; i < _operators.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}", _operators[i]->debug_string(i)); } - fmt::format_to(debug_string_buffer, "\n{}", _sink->debug_string(_operators.size())); + fmt::format_to(debug_string_buffer, "\n{}", + _sink->debug_string(cast_set<int>(_operators.size()))); return fmt::to_string(debug_string_buffer); } @@ -168,5 +170,5 @@ private: // Parallelism of parent pipeline. const int _num_tasks_of_parent; }; - +#include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index fd3baefa76f..ef856da5135 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -778,7 +778,8 @@ Status PipelineFragmentContext::_add_local_exchange_impl( std::max(cur_pipe->num_tasks(), _num_instances), use_global_hash_shuffle ? _total_instances : _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit - ? _runtime_state->query_options().local_exchange_free_blocks_limit + ? cast_set<int>( + _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; case ExchangeType::BUCKET_HASH_SHUFFLE: @@ -786,21 +787,24 @@ Status PipelineFragmentContext::_add_local_exchange_impl( std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets, ignore_data_hash_distribution, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit - ? _runtime_state->query_options().local_exchange_free_blocks_limit + ? cast_set<int>( + _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; case ExchangeType::PASSTHROUGH: shared_state->exchanger = PassthroughExchanger::create_unique( cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit - ? _runtime_state->query_options().local_exchange_free_blocks_limit + ? cast_set<int>( + _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; case ExchangeType::BROADCAST: shared_state->exchanger = BroadcastExchanger::create_unique( cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit - ? _runtime_state->query_options().local_exchange_free_blocks_limit + ? cast_set<int>( + _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; case ExchangeType::PASS_TO_ONE: @@ -809,13 +813,15 @@ Status PipelineFragmentContext::_add_local_exchange_impl( shared_state->exchanger = PassToOneExchanger::create_unique( cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit - ? _runtime_state->query_options().local_exchange_free_blocks_limit + ? cast_set<int>(_runtime_state->query_options() + .local_exchange_free_blocks_limit) : 0); } else { shared_state->exchanger = BroadcastExchanger::create_unique( cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit - ? _runtime_state->query_options().local_exchange_free_blocks_limit + ? cast_set<int>(_runtime_state->query_options() + .local_exchange_free_blocks_limit) : 0); } break; @@ -830,7 +836,8 @@ Status PipelineFragmentContext::_add_local_exchange_impl( shared_state->exchanger = LocalMergeSortExchanger::create_unique( sort_source, cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit - ? _runtime_state->query_options().local_exchange_free_blocks_limit + ? cast_set<int>( + _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; } @@ -838,7 +845,8 @@ Status PipelineFragmentContext::_add_local_exchange_impl( shared_state->exchanger = AdaptivePassthroughExchanger::create_unique( cur_pipe->num_tasks(), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit - ? _runtime_state->query_options().local_exchange_free_blocks_limit + ? cast_set<int>( + _runtime_state->query_options().local_exchange_free_blocks_limit) : 0); break; default: diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index ea9fb09e260..ade960650d7 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -27,7 +27,7 @@ #include "runtime/workload_group/workload_group.h" namespace doris::pipeline { - +#include "common/compile_check_begin.h" TaskQueue::~TaskQueue() = default; PipelineTask* SubTaskQueue::try_take(bool is_steal) { @@ -121,7 +121,7 @@ Status PriorityTaskQueue::push(PipelineTask* task) { // update empty queue's runtime, to avoid too high priority if (_sub_queues[level].empty() && - _queue_level_min_vruntime > _sub_queues[level].get_vruntime()) { + double(_queue_level_min_vruntime) > _sub_queues[level].get_vruntime()) { _sub_queues[level].adjust_runtime(_queue_level_min_vruntime); } diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index b389ebc2c51..c9e74248c7a 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -32,6 +32,7 @@ #include "pipeline_task.h" namespace doris::pipeline { +#include "common/compile_check_begin.h" class TaskQueue { public: @@ -70,11 +71,13 @@ public: // note: // runtime is the time consumed by the actual execution of the task // vruntime(means virtual runtime) = runtime / _level_factor - double get_vruntime() { return _runtime / _level_factor; } + double get_vruntime() { return double(_runtime) / _level_factor; } void inc_runtime(uint64_t delta_time) { _runtime += delta_time; } - void adjust_runtime(uint64_t vruntime) { this->_runtime = uint64_t(vruntime * _level_factor); } + void adjust_runtime(uint64_t vruntime) { + this->_runtime = uint64_t(double(vruntime) * _level_factor); + } bool empty() { return _queue.empty(); } @@ -150,5 +153,5 @@ private: std::atomic<uint32_t> _next_core = 0; std::atomic<bool> _closed; }; - +#include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 475d3a8065f..5a4f8819bcb 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -44,7 +44,7 @@ #include "vec/runtime/vdatetime_value.h" namespace doris::pipeline { - +#include "common/compile_check_begin.h" TaskScheduler::~TaskScheduler() { stop(); LOG(INFO) << "Task scheduler " << _name << " shutdown"; @@ -60,7 +60,7 @@ Status TaskScheduler::start() { .build(&_fix_thread_pool)); LOG_INFO("TaskScheduler set cores").tag("size", cores); _markers.resize(cores, true); - for (size_t i = 0; i < cores; ++i) { + for (int i = 0; i < cores; ++i) { RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); })); } return Status::OK(); @@ -97,7 +97,7 @@ void _close_task(PipelineTask* task, Status exec_status) { task->fragment_context()->close_a_pipeline(task->pipeline_id()); } -void TaskScheduler::_do_work(size_t index) { +void TaskScheduler::_do_work(int index) { while (_markers[index]) { auto* task = _task_queue->take(index); if (!task) { diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 9a20807ea26..6fc6ad8d6f2 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -71,6 +71,6 @@ private: std::string _name; CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; - void _do_work(size_t index); + void _do_work(int index); }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h b/be/src/vec/common/hash_table/hash_table_set_build.h index f9aeeeef14c..b90cafc0883 100644 --- a/be/src/vec/common/hash_table/hash_table_set_build.h +++ b/be/src/vec/common/hash_table/hash_table_set_build.h @@ -24,7 +24,7 @@ constexpr size_t CHECK_FRECUENCY = 65536; template <class HashTableContext, bool is_intersect> struct HashTableBuild { template <typename Parent> - HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, RuntimeState* state) + HashTableBuild(Parent* parent, size_t rows, ColumnRawPtrs& build_raw_ptrs, RuntimeState* state) : _rows(rows), _build_raw_ptrs(build_raw_ptrs), _state(state) {} Status operator()(HashTableContext& hash_table_ctx, Arena& arena) { @@ -50,7 +50,7 @@ struct HashTableBuild { } private: - const int _rows; + const size_t _rows; ColumnRawPtrs& _build_raw_ptrs; RuntimeState* _state = nullptr; }; diff --git a/be/src/vec/common/hash_table/join_hash_table.h b/be/src/vec/common/hash_table/join_hash_table.h index 317987541cd..485c5f7b3b2 100644 --- a/be/src/vec/common/hash_table/join_hash_table.h +++ b/be/src/vec/common/hash_table/join_hash_table.h @@ -142,7 +142,7 @@ public: JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { return _find_batch_right_semi_anti(keys, build_idx_map, probe_idx, probe_rows); } - return std::tuple {0, 0U, 0}; + return std::tuple {0, 0U, 0U}; } /** @@ -163,7 +163,7 @@ public: uint32_t* __restrict build_idxs, uint8_t* __restrict null_flags, bool picking_null_keys) { - auto matched_cnt = 0; + uint32_t matched_cnt = 0; const auto batch_size = max_batch_size; auto do_the_probe = [&]() { @@ -274,7 +274,7 @@ private: uint32_t* __restrict build_idxs) { static_assert(JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN); - auto matched_cnt = 0; + uint32_t matched_cnt = 0; const auto batch_size = max_batch_size; while (probe_idx < probe_rows && matched_cnt < batch_size) { @@ -300,14 +300,14 @@ private: } probe_idx++; } - return std::tuple {probe_idx, 0U, 0}; + return std::tuple {probe_idx, 0U, 0U}; } template <int JoinOpType, bool need_judge_null> auto _find_batch_left_semi_anti(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs) { - auto matched_cnt = 0; + uint32_t matched_cnt = 0; const auto batch_size = max_batch_size; while (probe_idx < probe_rows && matched_cnt < batch_size) { @@ -334,7 +334,7 @@ private: auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx, uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs, uint32_t* __restrict build_idxs) { - auto matched_cnt = 0; + uint32_t matched_cnt = 0; const auto batch_size = max_batch_size; auto do_the_probe = [&]() { @@ -405,7 +405,7 @@ private: uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs, bool& probe_visited, uint32_t* __restrict build_idxs) { - auto matched_cnt = 0; + uint32_t matched_cnt = 0; const auto batch_size = max_batch_size; auto do_the_probe = [&]() { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 9405ed2e43e..ac820bcab29 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -230,7 +230,7 @@ Status Channel::close(RuntimeState* state) { BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool is_local) : _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {} -Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int num_receivers, +Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t num_receivers, bool* serialized, bool eos, const std::vector<uint32_t>* rows) { if (_mutable_block == nullptr) { @@ -261,7 +261,7 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int nu return Status::OK(); } -Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) { +Status BlockSerializer::serialize_block(PBlock* dest, size_t num_receivers) { if (_mutable_block && _mutable_block->rows() > 0) { auto block = _mutable_block->to_block(); RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers)); @@ -272,7 +272,7 @@ Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) { return Status::OK(); } -Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int num_receivers) { +Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, size_t num_receivers) { SCOPED_TIMER(_parent->_serialize_batch_timer); dest->Clear(); size_t uncompressed_bytes = 0, compressed_bytes = 0; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index da0ee22ac14..88bb804fd80 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -76,10 +76,10 @@ namespace vectorized { class BlockSerializer { public: BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool is_local = true); - Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, bool* serialized, + Status next_serialized_block(Block* src, PBlock* dest, size_t num_receivers, bool* serialized, bool eos, const std::vector<uint32_t>* rows = nullptr); - Status serialize_block(PBlock* dest, int num_receivers = 1); - Status serialize_block(const Block* src, PBlock* dest, int num_receivers = 1); + Status serialize_block(PBlock* dest, size_t num_receivers = 1); + Status serialize_block(const Block* src, PBlock* dest, size_t num_receivers = 1); MutableBlock* get_block() const { return _mutable_block.get(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org