This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 16644eff7f8 [opt](load) optimize the performance of row distribution (#25546) 16644eff7f8 is described below commit 16644eff7f8c882ad330ed88e25b23cdcdcce7ed Author: zclllyybb <zhaochan...@selectdb.com> AuthorDate: Tue Nov 7 10:04:59 2023 +0800 [opt](load) optimize the performance of row distribution (#25546) For non-pipeline non-sinkv2: before: 14s now: 6s- For pipeline + sinkv2: before: 230ms *48 instances now: 38ms *48 instances --- be/src/exec/tablet_info.cpp | 64 +---------- be/src/exec/tablet_info.h | 78 ++++++++++++- be/src/vec/sink/vtablet_finder.cpp | 76 +++++++------ be/src/vec/sink/vtablet_finder.h | 13 ++- be/src/vec/sink/vtablet_sink_v2.cpp | 64 ++++++----- be/src/vec/sink/vtablet_sink_v2.h | 11 +- be/src/vec/sink/writer/vtablet_writer.cpp | 179 +++++++++++++++++------------- be/src/vec/sink/writer/vtablet_writer.h | 11 +- 8 files changed, 285 insertions(+), 211 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 00ec0c2e581..90d43462581 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -17,7 +17,6 @@ #include "exec/tablet_info.h" -#include <butil/fast_rand.h> #include <gen_cpp/Descriptors_types.h> #include <gen_cpp/Exprs_types.h> #include <gen_cpp/Types_types.h> @@ -26,6 +25,7 @@ #include <stddef.h> #include <algorithm> +#include <memory> #include <ostream> #include <tuple> @@ -324,49 +324,20 @@ Status VOlapTablePartitionParam::init() { } } - _partitions_map.reset( - new std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>( - VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs))); + _partitions_map = std::make_unique< + std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>>( + VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs)); if (_t_param.__isset.distributed_columns) { for (auto& col : _t_param.distributed_columns) { RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, "distributed")); } } - if (_distributed_slot_locs.empty()) { - _compute_tablet_index = [](BlockRow* key, - const VOlapTablePartition& partition) -> uint32_t { - if (partition.load_tablet_idx == -1) { - // load_to_single_tablet = false, just do random - return butil::fast_rand() % partition.num_buckets; - } - // load_to_single_tablet = ture, do round-robin - return partition.load_tablet_idx % partition.num_buckets; - }; - } else { - _compute_tablet_index = [this](BlockRow* key, - const VOlapTablePartition& partition) -> uint32_t { - uint32_t hash_val = 0; - for (int i = 0; i < _distributed_slot_locs.size(); ++i) { - auto slot_desc = _slots[_distributed_slot_locs[i]]; - auto& column = key->first->get_by_position(_distributed_slot_locs[i]).column; - auto val = column->get_data_at(key->second); - if (val.data != nullptr) { - hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type, - hash_val); - } else { - hash_val = HashUtil::zlib_crc_hash_null(hash_val); - } - } - return hash_val % partition.num_buckets; - }; - } // for both auto/non-auto partition table. _is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED; // initial partitions - for (int i = 0; i < _t_param.partitions.size(); ++i) { - const TOlapTablePartition& t_part = _t_param.partitions[i]; + for (const auto& t_part : _t_param.partitions) { VOlapTablePartition* part = nullptr; RETURN_IF_ERROR(generate_partition_from(t_part, part)); _partitions.emplace_back(part); @@ -385,26 +356,6 @@ Status VOlapTablePartitionParam::init() { return Status::OK(); } -bool VOlapTablePartitionParam::find_partition(BlockRow* block_row, - const VOlapTablePartition** partition) const { - // block_row is gave by inserting process. So try to use transformed index. - auto it = - _is_in_partition - ? _partitions_map->find(std::tuple {block_row->first, block_row->second, true}) - : _partitions_map->upper_bound( - std::tuple {block_row->first, block_row->second, true}); - // for list partition it might result in default partition - if (_is_in_partition) { - *partition = (it != _partitions_map->end()) ? it->second : _default_partition; - it = _partitions_map->end(); - } - if (it != _partitions_map->end() && - _part_contains(it->second, std::tuple {block_row->first, block_row->second, true})) { - *partition = it->second; - } - return (*partition != nullptr); -} - bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const { // start_key.second == -1 means only single partition @@ -413,11 +364,6 @@ bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part, !comparator(key, std::tuple {part->start_key.first, part->start_key.second, false}); } -uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row, - const VOlapTablePartition& partition) const { - return _compute_tablet_index(block_row, partition); -} - Status VOlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs, BlockRow* part_key) { for (int i = 0; i < t_exprs.size(); i++) { diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index ec12dcbfcd3..bb9fbd8bc60 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -17,6 +17,7 @@ #pragma once +#include <butil/fast_rand.h> #include <gen_cpp/Descriptors_types.h> #include <gen_cpp/descriptors.pb.h> @@ -33,6 +34,8 @@ #include "common/object_pool.h" #include "common/status.h" +#include "runtime/descriptors.h" +#include "runtime/raw_value.h" #include "vec/columns/column.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" @@ -162,9 +165,78 @@ public: int64_t version() const { return _t_param.version; } // return true if we found this block_row in partition - bool find_partition(BlockRow* block_row, const VOlapTablePartition** partition) const; + //TODO: use virtual function to refactor it + ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row, + VOlapTablePartition*& partition) const { + auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true}) + : _partitions_map->upper_bound(std::tuple {block, row, true}); + // for list partition it might result in default partition + if (_is_in_partition) { + partition = (it != _partitions_map->end()) ? it->second : _default_partition; + it = _partitions_map->end(); + } + if (it != _partitions_map->end() && + _part_contains(it->second, std::tuple {block, row, true})) { + partition = it->second; + } + return (partition != nullptr); + } + + ALWAYS_INLINE void find_tablets( + vectorized::Block* block, const std::vector<uint32_t>& indexes, + const std::vector<VOlapTablePartition*>& partitions, + std::vector<uint32_t>& tablet_indexes /*result*/, + /*TODO: check if flat hash map will be better*/ + std::map<int64_t, int64_t>* partition_tablets_buffer = nullptr) const { + std::function<uint32_t(vectorized::Block*, uint32_t, const VOlapTablePartition&)> + compute_function; + if (!_distributed_slot_locs.empty()) { + //TODO: refactor by saving the hash values. then we can calculate in columnwise. + compute_function = [this](vectorized::Block* block, uint32_t row, + const VOlapTablePartition& partition) -> uint32_t { + uint32_t hash_val = 0; + for (unsigned short _distributed_slot_loc : _distributed_slot_locs) { + auto* slot_desc = _slots[_distributed_slot_loc]; + auto& column = block->get_by_position(_distributed_slot_loc).column; + auto val = column->get_data_at(row); + if (val.data != nullptr) { + hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type, + hash_val); + } else { + hash_val = HashUtil::zlib_crc_hash_null(hash_val); + } + } + return hash_val % partition.num_buckets; + }; + } else { // random distribution + compute_function = [](vectorized::Block* block, uint32_t row, + const VOlapTablePartition& partition) -> uint32_t { + if (partition.load_tablet_idx == -1) { + // load_to_single_tablet = false, just do random + return butil::fast_rand() % partition.num_buckets; + } + // load_to_single_tablet = ture, do round-robin + return partition.load_tablet_idx % partition.num_buckets; + }; + } - uint32_t find_tablet(BlockRow* block_row, const VOlapTablePartition& partition) const; + if (partition_tablets_buffer == nullptr) { + for (auto index : indexes) { + tablet_indexes[index] = compute_function(block, index, *partitions[index]); + } + } else { // use buffer + for (auto index : indexes) { + auto& partition_id = partitions[index]->id; + if (auto it = partition_tablets_buffer->find(partition_id); + it != partition_tablets_buffer->end()) { + tablet_indexes[index] = it->second; // tablet + } + // compute and save in buffer + (*partition_tablets_buffer)[partition_id] = tablet_indexes[index] = + compute_function(block, index, *partitions[index]); + } + } + } const std::vector<VOlapTablePartition*>& get_partitions() const { return _partitions; } @@ -193,8 +265,6 @@ private: Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos); - std::function<uint32_t(BlockRow*, const VOlapTablePartition&)> _compute_tablet_index; - // check if this partition contain this key bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const; diff --git a/be/src/vec/sink/vtablet_finder.cpp b/be/src/vec/sink/vtablet_finder.cpp index 421b3ebb11c..f01add4b22e 100644 --- a/be/src/vec/sink/vtablet_finder.cpp +++ b/be/src/vec/sink/vtablet_finder.cpp @@ -41,21 +41,24 @@ #include "vec/functions/simple_function_factory.h" namespace doris::vectorized { +Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows, + std::vector<VOlapTablePartition*>& partitions, + std::vector<uint32_t>& tablet_index, bool& stop_processing, + std::vector<bool>& skip, std::vector<int64_t>* miss_rows) { + for (int index = 0; index < rows; index++) { + _vpartition->find_partition(block, index, partitions[index]); + } + + std::vector<uint32_t> qualified_rows; + qualified_rows.reserve(rows); -Status OlapTabletFinder::find_tablet(RuntimeState* state, Block* block, int row_index, - const VOlapTablePartition** partition, uint32_t& tablet_index, - bool& stop_processing, bool& is_continue, - bool* missing_partition) { - Status status = Status::OK(); - *partition = nullptr; - tablet_index = 0; - BlockRow block_row; - block_row = {block, row_index}; - if (!_vpartition->find_partition(&block_row, partition)) { - if (missing_partition != nullptr) { // auto partition table - *missing_partition = true; - return status; - } else { + for (int row_index = 0; row_index < rows; row_index++) { + if (partitions[row_index] == nullptr) [[unlikely]] { + if (miss_rows != nullptr) { // auto partition table + miss_rows->push_back(row_index); // already reserve memory outside + skip[row_index] = true; + continue; + } RETURN_IF_ERROR(state->append_error_msg_to_file( []() -> std::string { return ""; }, [&]() -> std::string { @@ -70,33 +73,34 @@ Status OlapTabletFinder::find_tablet(RuntimeState* state, Block* block, int row_ if (stop_processing) { return Status::EndOfFile("Encountered unqualified data, stop processing"); } - is_continue = true; - return status; + skip[row_index] = true; + continue; } - } - if (!(*partition)->is_mutable) { - _num_immutable_partition_filtered_rows++; - is_continue = true; - return status; - } - if ((*partition)->num_buckets <= 0) { - std::stringstream ss; - ss << "num_buckets must be greater than 0, num_buckets=" << (*partition)->num_buckets; - return Status::InternalError(ss.str()); - } - _partition_ids.emplace((*partition)->id); - if (_find_tablet_mode != FindTabletMode::FIND_TABLET_EVERY_ROW) { - if (_partition_to_tablet_map.find((*partition)->id) == _partition_to_tablet_map.end()) { - tablet_index = _vpartition->find_tablet(&block_row, **partition); - _partition_to_tablet_map.emplace((*partition)->id, tablet_index); - } else { - tablet_index = _partition_to_tablet_map[(*partition)->id]; + if (!partitions[row_index]->is_mutable) [[unlikely]] { + _num_immutable_partition_filtered_rows++; + skip[row_index] = true; + continue; } + if (partitions[row_index]->num_buckets <= 0) [[unlikely]] { + std::stringstream ss; + ss << "num_buckets must be greater than 0, num_buckets=" + << partitions[row_index]->num_buckets; + return Status::InternalError(ss.str()); + } + + _partition_ids.emplace(partitions[row_index]->id); + + qualified_rows.push_back(row_index); + } + + if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) { + _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index); } else { - tablet_index = _vpartition->find_tablet(&block_row, **partition); + _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index, + &_partition_to_tablet_map); } - return status; + return Status::OK(); } } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h index 28d71c6a1e7..3426f7cb67d 100644 --- a/be/src/vec/sink/vtablet_finder.h +++ b/be/src/vec/sink/vtablet_finder.h @@ -18,10 +18,12 @@ #pragma once #include <map> +#include <unordered_set> #include "common/status.h" #include "exec/tablet_info.h" #include "util/bitmap.h" +#include "vec/common/hash_table/phmap_fwd_decl.h" #include "vec/core/block.h" namespace doris::vectorized { @@ -39,9 +41,10 @@ public: OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode) : _vpartition(vpartition), _find_tablet_mode(mode), _filter_bitmap(1024) {}; - Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, - const VOlapTablePartition** partition, uint32_t& tablet_index, - bool& filtered, bool& is_continue, bool* missing_partition = nullptr); + Status find_tablets(RuntimeState* state, vectorized::Block* block, int rows, + std::vector<VOlapTablePartition*>& partitions, + std::vector<uint32_t>& tablet_index, bool& filtered, + std::vector<bool>& is_continue, std::vector<int64_t>* miss_rows = nullptr); bool is_find_tablet_every_sink() { return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK; @@ -55,7 +58,7 @@ public: bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; } - const std::set<int64_t>& partition_ids() { return _partition_ids; } + const vectorized::flat_hash_set<int64_t>& partition_ids() { return _partition_ids; } int64_t num_filtered_rows() const { return _num_filtered_rows; } @@ -69,7 +72,7 @@ private: VOlapTablePartitionParam* _vpartition; FindTabletMode _find_tablet_mode; std::map<int64_t, int64_t> _partition_to_tablet_map; - std::set<int64_t> _partition_ids; + vectorized::flat_hash_set<int64_t> _partition_ids; int64_t _num_filtered_rows = 0; int64_t _num_immutable_partition_filtered_rows = 0; diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 6ca96ef118b..f38785f53f4 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -221,20 +221,32 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() { } } -void VOlapTableSinkV2::_generate_rows_for_tablet(RowsForTablet& rows_for_tablet, - const VOlapTablePartition* partition, - uint32_t tablet_index, int row_idx) { - // Generate channel payload for sinking data to each tablet - for (const auto& index : partition->indexes) { - auto tablet_id = index.tablets[tablet_index]; - if (rows_for_tablet.count(tablet_id) == 0) { - Rows rows; - rows.partition_id = partition->id; - rows.index_id = index.index_id; - rows_for_tablet.insert({tablet_id, rows}); +void VOlapTableSinkV2::_generate_rows_for_tablet( + RowsForTablet& rows_for_tablet, const std::vector<VOlapTablePartition*>& partitions, + const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>& skip, + size_t row_cnt) { + for (int row_idx = 0; row_idx < row_cnt; row_idx++) { + if (skip[row_idx]) { + continue; + } + + auto& partition = partitions[row_idx]; + auto& tablet_index = tablet_indexes[row_idx]; + + for (const auto& index : partition->indexes) { + auto tablet_id = index.tablets[tablet_index]; + auto it = rows_for_tablet.find(tablet_id); + if (it == rows_for_tablet.end()) { + Rows rows; + rows.partition_id = partition->id; + rows.index_id = index.index_id; + rows.row_idxes.reserve(row_cnt); + auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows}); + it = tmp_it; + } + it->second.row_idxes.push_back(row_idx); + _number_output_rows++; } - rows_for_tablet[tablet_id].row_idxes.push_back(row_idx); - _number_output_rows++; } } @@ -288,20 +300,22 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc _row_distribution_watch.start(); const auto num_rows = input_rows; const auto* __restrict filter_map = _block_convertor->filter_map(); - for (int i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && filter_map[i]) { - continue; - } - const VOlapTablePartition* partition = nullptr; - bool is_continue = false; - uint32_t tablet_index = 0; - RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block.get(), i, &partition, tablet_index, - stop_processing, is_continue)); - if (is_continue) { - continue; + + //reuse vars + _partitions.assign(num_rows, nullptr); + _skip.assign(num_rows, false); + _tablet_indexes.assign(num_rows, 0); + + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, _partitions, + _tablet_indexes, stop_processing, _skip)); + + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + _skip[i] = _skip[i] || filter_map[i]; } - _generate_rows_for_tablet(rows_for_tablet, partition, tablet_index, i); } + _generate_rows_for_tablet(rows_for_tablet, _partitions, _tablet_indexes, _skip, num_rows); + _row_distribution_watch.stop(); // For each tablet, send its input_rows from block to delta writer diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index f70f74b9da6..42103fa03b1 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -60,6 +60,7 @@ #include "util/stopwatch.hpp" #include "vec/columns/column.h" #include "vec/common/allocator.h" +#include "vec/common/hash_table/phmap_fwd_decl.h" #include "vec/core/block.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" @@ -137,8 +138,9 @@ private: void _build_tablet_node_mapping(); void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet, - const VOlapTablePartition* partition, uint32_t tablet_index, - int row_idx); + const std::vector<VOlapTablePartition*>& partitions, + const std::vector<uint32_t>& tablet_indexes, + const std::vector<bool>& skip, size_t row_cnt); Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id, const Rows& rows, const Streams& streams); @@ -184,6 +186,11 @@ private: int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; + // reuse for find_tablet + std::vector<VOlapTablePartition*> _partitions; + std::vector<bool> _skip; + std::vector<uint32_t> _tablet_indexes; + MonotonicStopWatch _row_distribution_watch; RuntimeProfile::Counter* _input_rows_counter = nullptr; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 22005a9ac1c..d0720255695 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -44,6 +44,7 @@ #include <string> #include <unordered_map> #include <utility> +#include <vector> #include "olap/wal_manager.h" #include "util/runtime_profile.h" @@ -421,7 +422,6 @@ Status VNodeChannel::open_wait() { ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( open_closure->cntl.remote_side()); } - _cancelled = true; auto error_code = open_closure->cntl.ErrorCode(); auto error_text = open_closure->cntl.ErrorText(); @@ -1337,50 +1337,80 @@ Status VTabletWriter::_incremental_open_node_channel( return Status::OK(); } +// Generate channel payload for sinking data to differenct node channel +// Payload = std::pair<std::unique_ptr<vectorized::IColumn::Selector>, std::vector<int64_t>>; +// first = row_id, second = vector<tablet_id> void VTabletWriter::_generate_row_distribution_payload( - ChannelDistributionPayload& channel_to_payload, const VOlapTablePartition* partition, - uint32_t tablet_index, int row_idx, size_t row_cnt) { - // Generate channel payload for sinking data to differenct node channel - for (int j = 0; j < partition->indexes.size(); ++j) { - auto tid = partition->indexes[j].tablets[tablet_index]; - auto it = _channels[j]->_channels_by_tablet.find(tid); - DCHECK(it != _channels[j]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - for (const auto& channel : it->second) { - if (channel_to_payload[j].count(channel.get()) < 1) { - channel_to_payload[j].insert( - {channel.get(), Payload {std::unique_ptr<vectorized::IColumn::Selector>( - new vectorized::IColumn::Selector()), - std::vector<int64_t>()}}); + ChannelDistributionPayload& channel_to_payload, + const std::vector<VOlapTablePartition*>& partitions, + const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>& skip, + size_t row_cnt) { + for (int row_idx = 0; row_idx < row_cnt; row_idx++) { + if (skip[row_idx]) { + continue; + } + const auto& partition = partitions[row_idx]; + const auto& tablet_index = tablet_indexes[row_idx]; + + for (int index_num = 0; index_num < partition->indexes.size(); + ++index_num) { // partition->indexes = [index, tablets...] + + auto tablet_id = partition->indexes[index_num].tablets[tablet_index]; + auto it = _channels[index_num]->_channels_by_tablet.find( + tablet_id); // (tablet_id, VNodeChannel) where this tablet locate + + DCHECK(it != _channels[index_num]->_channels_by_tablet.end()) + << "unknown tablet, tablet_id=" << tablet_index; + + std::vector<std::shared_ptr<VNodeChannel>>& tablet_locations = it->second; + std::unordered_map<VNodeChannel*, Payload>& payloads_this_index = + channel_to_payload[index_num]; // payloads of this index in every node + + for (const auto& locate_node : tablet_locations) { + auto payload_it = + payloads_this_index.find(locate_node.get()); // <VNodeChannel*, Payload> + if (payload_it == payloads_this_index.end()) { + auto [tmp_it, _] = payloads_this_index.emplace( + locate_node.get(), + Payload {std::make_unique<vectorized::IColumn::Selector>(), + std::vector<int64_t>()}); + payload_it = tmp_it; + payload_it->second.first->reserve(row_cnt); + payload_it->second.second.reserve(row_cnt); + } + payload_it->second.first->push_back(row_idx); + payload_it->second.second.push_back(tablet_id); } - channel_to_payload[j][channel.get()].first->push_back(row_idx); - channel_to_payload[j][channel.get()].second.push_back(tid); + _number_output_rows++; } - _number_output_rows += row_cnt; } } Status VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized::Block* block, ChannelDistributionPayload& channel_to_payload, size_t num_rows, bool has_filtered_rows) { + // only need to calculate one value for single partition. + std::vector<VOlapTablePartition*> partitions(1, nullptr); + std::vector<bool> skip(1, false); + std::vector<uint32_t> tablet_indexes(1, 0); + bool stop_processing = false; + + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, 1, partitions, tablet_indexes, + stop_processing, skip)); + const VOlapTablePartition* partition = nullptr; uint32_t tablet_index = 0; - bool stop_processing = false; - for (int32_t i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { - continue; - } - bool is_continue = false; - RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block, i, &partition, tablet_index, - stop_processing, is_continue)); - if (is_continue) { - continue; + for (size_t i = 0; i < num_rows; i++) { + if (!skip[i]) { + partition = partitions[i]; + tablet_index = tablet_indexes[i]; + break; } - break; } if (partition == nullptr) { return Status::OK(); } + for (int j = 0; j < partition->indexes.size(); ++j) { auto tid = partition->indexes[j].tablets[tablet_index]; auto it = _channels[j]->_channels_by_tablet.find(tid); @@ -1388,10 +1418,9 @@ Status VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized << "unknown tablet, tablet_id=" << tablet_index; int64_t row_cnt = 0; for (const auto& channel : it->second) { - if (channel_to_payload[j].count(channel.get()) < 1) { + if (!channel_to_payload[j].contains(channel.get())) { channel_to_payload[j].insert( - {channel.get(), Payload {std::unique_ptr<vectorized::IColumn::Selector>( - new vectorized::IColumn::Selector()), + {channel.get(), Payload {std::make_unique<vectorized::IColumn::Selector>(), std::vector<int64_t>()}}); } auto& selector = channel_to_payload[j][channel.get()].first; @@ -1535,10 +1564,15 @@ Status VTabletWriter::close(Status exec_status) { auto status = Status::OK(); // BE id -> add_batch method counter std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map; - int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, actual_consume_ns = 0, - total_add_batch_exec_time_ns = 0, max_add_batch_exec_time_ns = 0, - total_wait_exec_time_ns = 0, max_wait_exec_time_ns = 0, total_add_batch_num = 0, - num_node_channels = 0; + int64_t serialize_batch_ns = 0; + int64_t queue_push_lock_ns = 0; + int64_t actual_consume_ns = 0; + int64_t total_add_batch_exec_time_ns = 0; + int64_t max_add_batch_exec_time_ns = 0; + int64_t total_wait_exec_time_ns = 0; + int64_t max_wait_exec_time_ns = 0; + int64_t total_add_batch_num = 0; + int64_t num_node_channels = 0; VNodeChannelStat channel_stat; for (const auto& index_channel : _channels) { @@ -1665,7 +1699,7 @@ Status VTabletWriter::close(Status exec_status) { [](const std::shared_ptr<VNodeChannel>& ch) { ch->clear_all_blocks(); }); } - if (_wal_writer.get() != nullptr) { + if (_wal_writer != nullptr) { static_cast<void>(_wal_writer->finalize()); } return _close_status; @@ -1703,7 +1737,7 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { SCOPED_RAW_TIMER(&_send_data_ns); // This is just for passing compilation. bool stop_processing = false; - std::vector<std::unordered_map<VNodeChannel*, Payload>> channel_to_payload; + ChannelDistributionPayload channel_to_payload; channel_to_payload.resize(_channels.size()); _tablet_finder->clear_for_new_batch(); _row_distribution_watch.start(); @@ -1737,34 +1771,30 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { missing_map.reserve(partition_col.column->size()); // try to find tablet and save missing value - for (int i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { - continue; - } - const VOlapTablePartition* partition = nullptr; - bool is_continue = false; - uint32_t tablet_index = 0; - bool missing_this = false; - RETURN_IF_ERROR(_tablet_finder->find_tablet(_state, block.get(), i, &partition, - tablet_index, stop_processing, - is_continue, &missing_this)); - if (missing_this) { - missing_map.push_back(i); - } else { - _generate_row_distribution_payload(channel_to_payload, partition, tablet_index, - i, 1); + std::vector<VOlapTablePartition*> partitions(num_rows, nullptr); + std::vector<bool> skip(num_rows, false); + std::vector<uint32_t> tablet_indexes(num_rows, 0); + + //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, + tablet_indexes, stop_processing, skip, + &missing_map)); + + if (missing_map.empty()) { + // we don't calculate it distribution when have missing values + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + skip[i] = skip[i] || _block_convertor->filter_map()[i]; + } } - } - missing_map.shrink_to_fit(); - - // for missing partition keys, calc the missing partition and save in _partitions_need_create - auto type = partition_col.type; - if (missing_map.size() > 0) { + _generate_row_distribution_payload(channel_to_payload, partitions, tablet_indexes, + skip, num_rows); + } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create auto return_type = part_func->data_type(); // expose the data column vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column; - if (auto* nullable = + if (const auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*range_left_col)) { range_left_col = nullable->get_nested_column_ptr(); return_type = @@ -1786,23 +1816,20 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { return Status::NeedSendAgain(""); } // creating done } else { // not auto partition - for (int i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { - continue; - } - const VOlapTablePartition* partition = nullptr; - bool is_continue = false; - uint32_t tablet_index = 0; - RETURN_IF_ERROR(_tablet_finder->find_tablet(_state, block.get(), i, &partition, - tablet_index, stop_processing, - is_continue)); - if (is_continue) { - continue; + std::vector<VOlapTablePartition*> partitions(num_rows, nullptr); + std::vector<bool> skip(num_rows, false); + std::vector<uint32_t> tablet_indexes(num_rows, 0); + + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, + tablet_indexes, stop_processing, skip)); + + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + skip[i] = skip[i] || _block_convertor->filter_map()[i]; } - // each row - _generate_row_distribution_payload(channel_to_payload, partition, tablet_index, i, - 1); } + _generate_row_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip, + num_rows); } } _row_distribution_watch.stop(); diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 4e95b444d79..c8d5d1c2ce9 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -553,10 +553,13 @@ private: using ChannelDistributionPayload = std::vector<std::unordered_map<VNodeChannel*, Payload>>; Status _init(RuntimeState* state, RuntimeProfile* profile); - // payload for each row - void _generate_row_distribution_payload(ChannelDistributionPayload& payload, - const VOlapTablePartition* partition, - uint32_t tablet_index, int row_idx, size_t row_cnt); + + // payload for every row + void _generate_row_distribution_payload(ChannelDistributionPayload& channel_to_payload, + const std::vector<VOlapTablePartition*>& partitions, + const std::vector<uint32_t>& tablet_indexes, + const std::vector<bool>& skip, size_t row_cnt); + Status _single_partition_generate(RuntimeState* state, vectorized::Block* block, ChannelDistributionPayload& channel_to_payload, size_t num_rows, bool has_filtered_rows); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org