This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 16644eff7f8 [opt](load) optimize the performance of row distribution 
(#25546)
16644eff7f8 is described below

commit 16644eff7f8c882ad330ed88e25b23cdcdcce7ed
Author: zclllyybb <zhaochan...@selectdb.com>
AuthorDate: Tue Nov 7 10:04:59 2023 +0800

    [opt](load) optimize the performance of row distribution (#25546)
    
    For non-pipeline non-sinkv2:
    before: 14s
    now: 6s-
    For pipeline + sinkv2:
    before: 230ms *48 instances
    now: 38ms *48 instances
---
 be/src/exec/tablet_info.cpp               |  64 +----------
 be/src/exec/tablet_info.h                 |  78 ++++++++++++-
 be/src/vec/sink/vtablet_finder.cpp        |  76 +++++++------
 be/src/vec/sink/vtablet_finder.h          |  13 ++-
 be/src/vec/sink/vtablet_sink_v2.cpp       |  64 ++++++-----
 be/src/vec/sink/vtablet_sink_v2.h         |  11 +-
 be/src/vec/sink/writer/vtablet_writer.cpp | 179 +++++++++++++++++-------------
 be/src/vec/sink/writer/vtablet_writer.h   |  11 +-
 8 files changed, 285 insertions(+), 211 deletions(-)

diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 00ec0c2e581..90d43462581 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -17,7 +17,6 @@
 
 #include "exec/tablet_info.h"
 
-#include <butil/fast_rand.h>
 #include <gen_cpp/Descriptors_types.h>
 #include <gen_cpp/Exprs_types.h>
 #include <gen_cpp/Types_types.h>
@@ -26,6 +25,7 @@
 #include <stddef.h>
 
 #include <algorithm>
+#include <memory>
 #include <ostream>
 #include <tuple>
 
@@ -324,49 +324,20 @@ Status VOlapTablePartitionParam::init() {
         }
     }
 
-    _partitions_map.reset(
-            new std::map<BlockRowWithIndicator, VOlapTablePartition*, 
VOlapTablePartKeyComparator>(
-                    VOlapTablePartKeyComparator(_partition_slot_locs, 
_transformed_slot_locs)));
+    _partitions_map = std::make_unique<
+            std::map<BlockRowWithIndicator, VOlapTablePartition*, 
VOlapTablePartKeyComparator>>(
+            VOlapTablePartKeyComparator(_partition_slot_locs, 
_transformed_slot_locs));
     if (_t_param.__isset.distributed_columns) {
         for (auto& col : _t_param.distributed_columns) {
             RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, 
"distributed"));
         }
     }
-    if (_distributed_slot_locs.empty()) {
-        _compute_tablet_index = [](BlockRow* key,
-                                   const VOlapTablePartition& partition) -> 
uint32_t {
-            if (partition.load_tablet_idx == -1) {
-                // load_to_single_tablet = false, just do random
-                return butil::fast_rand() % partition.num_buckets;
-            }
-            // load_to_single_tablet = ture, do round-robin
-            return partition.load_tablet_idx % partition.num_buckets;
-        };
-    } else {
-        _compute_tablet_index = [this](BlockRow* key,
-                                       const VOlapTablePartition& partition) 
-> uint32_t {
-            uint32_t hash_val = 0;
-            for (int i = 0; i < _distributed_slot_locs.size(); ++i) {
-                auto slot_desc = _slots[_distributed_slot_locs[i]];
-                auto& column = 
key->first->get_by_position(_distributed_slot_locs[i]).column;
-                auto val = column->get_data_at(key->second);
-                if (val.data != nullptr) {
-                    hash_val = RawValue::zlib_crc32(val.data, val.size, 
slot_desc->type().type,
-                                                    hash_val);
-                } else {
-                    hash_val = HashUtil::zlib_crc_hash_null(hash_val);
-                }
-            }
-            return hash_val % partition.num_buckets;
-        };
-    }
 
     // for both auto/non-auto partition table.
     _is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
 
     // initial partitions
-    for (int i = 0; i < _t_param.partitions.size(); ++i) {
-        const TOlapTablePartition& t_part = _t_param.partitions[i];
+    for (const auto& t_part : _t_param.partitions) {
         VOlapTablePartition* part = nullptr;
         RETURN_IF_ERROR(generate_partition_from(t_part, part));
         _partitions.emplace_back(part);
@@ -385,26 +356,6 @@ Status VOlapTablePartitionParam::init() {
     return Status::OK();
 }
 
-bool VOlapTablePartitionParam::find_partition(BlockRow* block_row,
-                                              const VOlapTablePartition** 
partition) const {
-    // block_row is gave by inserting process. So try to use transformed index.
-    auto it =
-            _is_in_partition
-                    ? _partitions_map->find(std::tuple {block_row->first, 
block_row->second, true})
-                    : _partitions_map->upper_bound(
-                              std::tuple {block_row->first, block_row->second, 
true});
-    // for list partition it might result in default partition
-    if (_is_in_partition) {
-        *partition = (it != _partitions_map->end()) ? it->second : 
_default_partition;
-        it = _partitions_map->end();
-    }
-    if (it != _partitions_map->end() &&
-        _part_contains(it->second, std::tuple {block_row->first, 
block_row->second, true})) {
-        *partition = it->second;
-    }
-    return (*partition != nullptr);
-}
-
 bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
                                               BlockRowWithIndicator key) const 
{
     // start_key.second == -1 means only single partition
@@ -413,11 +364,6 @@ bool 
VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
            !comparator(key, std::tuple {part->start_key.first, 
part->start_key.second, false});
 }
 
-uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row,
-                                               const VOlapTablePartition& 
partition) const {
-    return _compute_tablet_index(block_row, partition);
-}
-
 Status VOlapTablePartitionParam::_create_partition_keys(const 
std::vector<TExprNode>& t_exprs,
                                                         BlockRow* part_key) {
     for (int i = 0; i < t_exprs.size(); i++) {
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index ec12dcbfcd3..bb9fbd8bc60 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <butil/fast_rand.h>
 #include <gen_cpp/Descriptors_types.h>
 #include <gen_cpp/descriptors.pb.h>
 
@@ -33,6 +34,8 @@
 
 #include "common/object_pool.h"
 #include "common/status.h"
+#include "runtime/descriptors.h"
+#include "runtime/raw_value.h"
 #include "vec/columns/column.h"
 #include "vec/core/block.h"
 #include "vec/core/column_with_type_and_name.h"
@@ -162,9 +165,78 @@ public:
     int64_t version() const { return _t_param.version; }
 
     // return true if we found this block_row in partition
-    bool find_partition(BlockRow* block_row, const VOlapTablePartition** 
partition) const;
+    //TODO: use virtual function to refactor it
+    ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row,
+                                      VOlapTablePartition*& partition) const {
+        auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, 
row, true})
+                                   : _partitions_map->upper_bound(std::tuple 
{block, row, true});
+        // for list partition it might result in default partition
+        if (_is_in_partition) {
+            partition = (it != _partitions_map->end()) ? it->second : 
_default_partition;
+            it = _partitions_map->end();
+        }
+        if (it != _partitions_map->end() &&
+            _part_contains(it->second, std::tuple {block, row, true})) {
+            partition = it->second;
+        }
+        return (partition != nullptr);
+    }
+
+    ALWAYS_INLINE void find_tablets(
+            vectorized::Block* block, const std::vector<uint32_t>& indexes,
+            const std::vector<VOlapTablePartition*>& partitions,
+            std::vector<uint32_t>& tablet_indexes /*result*/,
+            /*TODO: check if flat hash map will be better*/
+            std::map<int64_t, int64_t>* partition_tablets_buffer = nullptr) 
const {
+        std::function<uint32_t(vectorized::Block*, uint32_t, const 
VOlapTablePartition&)>
+                compute_function;
+        if (!_distributed_slot_locs.empty()) {
+            //TODO: refactor by saving the hash values. then we can calculate 
in columnwise.
+            compute_function = [this](vectorized::Block* block, uint32_t row,
+                                      const VOlapTablePartition& partition) -> 
uint32_t {
+                uint32_t hash_val = 0;
+                for (unsigned short _distributed_slot_loc : 
_distributed_slot_locs) {
+                    auto* slot_desc = _slots[_distributed_slot_loc];
+                    auto& column = 
block->get_by_position(_distributed_slot_loc).column;
+                    auto val = column->get_data_at(row);
+                    if (val.data != nullptr) {
+                        hash_val = RawValue::zlib_crc32(val.data, val.size, 
slot_desc->type().type,
+                                                        hash_val);
+                    } else {
+                        hash_val = HashUtil::zlib_crc_hash_null(hash_val);
+                    }
+                }
+                return hash_val % partition.num_buckets;
+            };
+        } else { // random distribution
+            compute_function = [](vectorized::Block* block, uint32_t row,
+                                  const VOlapTablePartition& partition) -> 
uint32_t {
+                if (partition.load_tablet_idx == -1) {
+                    // load_to_single_tablet = false, just do random
+                    return butil::fast_rand() % partition.num_buckets;
+                }
+                // load_to_single_tablet = ture, do round-robin
+                return partition.load_tablet_idx % partition.num_buckets;
+            };
+        }
 
-    uint32_t find_tablet(BlockRow* block_row, const VOlapTablePartition& 
partition) const;
+        if (partition_tablets_buffer == nullptr) {
+            for (auto index : indexes) {
+                tablet_indexes[index] = compute_function(block, index, 
*partitions[index]);
+            }
+        } else { // use buffer
+            for (auto index : indexes) {
+                auto& partition_id = partitions[index]->id;
+                if (auto it = partition_tablets_buffer->find(partition_id);
+                    it != partition_tablets_buffer->end()) {
+                    tablet_indexes[index] = it->second; // tablet
+                }
+                // compute and save in buffer
+                (*partition_tablets_buffer)[partition_id] = 
tablet_indexes[index] =
+                        compute_function(block, index, *partitions[index]);
+            }
+        }
+    }
 
     const std::vector<VOlapTablePartition*>& get_partitions() const { return 
_partitions; }
 
@@ -193,8 +265,6 @@ private:
 
     Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, 
uint16_t pos);
 
-    std::function<uint32_t(BlockRow*, const VOlapTablePartition&)> 
_compute_tablet_index;
-
     // check if this partition contain this key
     bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) 
const;
 
diff --git a/be/src/vec/sink/vtablet_finder.cpp 
b/be/src/vec/sink/vtablet_finder.cpp
index 421b3ebb11c..f01add4b22e 100644
--- a/be/src/vec/sink/vtablet_finder.cpp
+++ b/be/src/vec/sink/vtablet_finder.cpp
@@ -41,21 +41,24 @@
 #include "vec/functions/simple_function_factory.h"
 
 namespace doris::vectorized {
+Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int 
rows,
+                                      std::vector<VOlapTablePartition*>& 
partitions,
+                                      std::vector<uint32_t>& tablet_index, 
bool& stop_processing,
+                                      std::vector<bool>& skip, 
std::vector<int64_t>* miss_rows) {
+    for (int index = 0; index < rows; index++) {
+        _vpartition->find_partition(block, index, partitions[index]);
+    }
+
+    std::vector<uint32_t> qualified_rows;
+    qualified_rows.reserve(rows);
 
-Status OlapTabletFinder::find_tablet(RuntimeState* state, Block* block, int 
row_index,
-                                     const VOlapTablePartition** partition, 
uint32_t& tablet_index,
-                                     bool& stop_processing, bool& is_continue,
-                                     bool* missing_partition) {
-    Status status = Status::OK();
-    *partition = nullptr;
-    tablet_index = 0;
-    BlockRow block_row;
-    block_row = {block, row_index};
-    if (!_vpartition->find_partition(&block_row, partition)) {
-        if (missing_partition != nullptr) { // auto partition table
-            *missing_partition = true;
-            return status;
-        } else {
+    for (int row_index = 0; row_index < rows; row_index++) {
+        if (partitions[row_index] == nullptr) [[unlikely]] {
+            if (miss_rows != nullptr) {          // auto partition table
+                miss_rows->push_back(row_index); // already reserve memory 
outside
+                skip[row_index] = true;
+                continue;
+            }
             RETURN_IF_ERROR(state->append_error_msg_to_file(
                     []() -> std::string { return ""; },
                     [&]() -> std::string {
@@ -70,33 +73,34 @@ Status OlapTabletFinder::find_tablet(RuntimeState* state, 
Block* block, int row_
             if (stop_processing) {
                 return Status::EndOfFile("Encountered unqualified data, stop 
processing");
             }
-            is_continue = true;
-            return status;
+            skip[row_index] = true;
+            continue;
         }
-    }
-    if (!(*partition)->is_mutable) {
-        _num_immutable_partition_filtered_rows++;
-        is_continue = true;
-        return status;
-    }
-    if ((*partition)->num_buckets <= 0) {
-        std::stringstream ss;
-        ss << "num_buckets must be greater than 0, num_buckets=" << 
(*partition)->num_buckets;
-        return Status::InternalError(ss.str());
-    }
-    _partition_ids.emplace((*partition)->id);
-    if (_find_tablet_mode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
-        if (_partition_to_tablet_map.find((*partition)->id) == 
_partition_to_tablet_map.end()) {
-            tablet_index = _vpartition->find_tablet(&block_row, **partition);
-            _partition_to_tablet_map.emplace((*partition)->id, tablet_index);
-        } else {
-            tablet_index = _partition_to_tablet_map[(*partition)->id];
+        if (!partitions[row_index]->is_mutable) [[unlikely]] {
+            _num_immutable_partition_filtered_rows++;
+            skip[row_index] = true;
+            continue;
         }
+        if (partitions[row_index]->num_buckets <= 0) [[unlikely]] {
+            std::stringstream ss;
+            ss << "num_buckets must be greater than 0, num_buckets="
+               << partitions[row_index]->num_buckets;
+            return Status::InternalError(ss.str());
+        }
+
+        _partition_ids.emplace(partitions[row_index]->id);
+
+        qualified_rows.push_back(row_index);
+    }
+
+    if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) {
+        _vpartition->find_tablets(block, qualified_rows, partitions, 
tablet_index);
     } else {
-        tablet_index = _vpartition->find_tablet(&block_row, **partition);
+        _vpartition->find_tablets(block, qualified_rows, partitions, 
tablet_index,
+                                  &_partition_to_tablet_map);
     }
 
-    return status;
+    return Status::OK();
 }
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h
index 28d71c6a1e7..3426f7cb67d 100644
--- a/be/src/vec/sink/vtablet_finder.h
+++ b/be/src/vec/sink/vtablet_finder.h
@@ -18,10 +18,12 @@
 #pragma once
 
 #include <map>
+#include <unordered_set>
 
 #include "common/status.h"
 #include "exec/tablet_info.h"
 #include "util/bitmap.h"
+#include "vec/common/hash_table/phmap_fwd_decl.h"
 #include "vec/core/block.h"
 
 namespace doris::vectorized {
@@ -39,9 +41,10 @@ public:
     OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode)
             : _vpartition(vpartition), _find_tablet_mode(mode), 
_filter_bitmap(1024) {};
 
-    Status find_tablet(RuntimeState* state, vectorized::Block* block, int 
row_index,
-                       const VOlapTablePartition** partition, uint32_t& 
tablet_index,
-                       bool& filtered, bool& is_continue, bool* 
missing_partition = nullptr);
+    Status find_tablets(RuntimeState* state, vectorized::Block* block, int 
rows,
+                        std::vector<VOlapTablePartition*>& partitions,
+                        std::vector<uint32_t>& tablet_index, bool& filtered,
+                        std::vector<bool>& is_continue, std::vector<int64_t>* 
miss_rows = nullptr);
 
     bool is_find_tablet_every_sink() {
         return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
@@ -55,7 +58,7 @@ public:
 
     bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; }
 
-    const std::set<int64_t>& partition_ids() { return _partition_ids; }
+    const vectorized::flat_hash_set<int64_t>& partition_ids() { return 
_partition_ids; }
 
     int64_t num_filtered_rows() const { return _num_filtered_rows; }
 
@@ -69,7 +72,7 @@ private:
     VOlapTablePartitionParam* _vpartition;
     FindTabletMode _find_tablet_mode;
     std::map<int64_t, int64_t> _partition_to_tablet_map;
-    std::set<int64_t> _partition_ids;
+    vectorized::flat_hash_set<int64_t> _partition_ids;
 
     int64_t _num_filtered_rows = 0;
     int64_t _num_immutable_partition_filtered_rows = 0;
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 6ca96ef118b..f38785f53f4 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -221,20 +221,32 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() {
     }
 }
 
-void VOlapTableSinkV2::_generate_rows_for_tablet(RowsForTablet& 
rows_for_tablet,
-                                                 const VOlapTablePartition* 
partition,
-                                                 uint32_t tablet_index, int 
row_idx) {
-    // Generate channel payload for sinking data to each tablet
-    for (const auto& index : partition->indexes) {
-        auto tablet_id = index.tablets[tablet_index];
-        if (rows_for_tablet.count(tablet_id) == 0) {
-            Rows rows;
-            rows.partition_id = partition->id;
-            rows.index_id = index.index_id;
-            rows_for_tablet.insert({tablet_id, rows});
+void VOlapTableSinkV2::_generate_rows_for_tablet(
+        RowsForTablet& rows_for_tablet, const 
std::vector<VOlapTablePartition*>& partitions,
+        const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>& 
skip,
+        size_t row_cnt) {
+    for (int row_idx = 0; row_idx < row_cnt; row_idx++) {
+        if (skip[row_idx]) {
+            continue;
+        }
+
+        auto& partition = partitions[row_idx];
+        auto& tablet_index = tablet_indexes[row_idx];
+
+        for (const auto& index : partition->indexes) {
+            auto tablet_id = index.tablets[tablet_index];
+            auto it = rows_for_tablet.find(tablet_id);
+            if (it == rows_for_tablet.end()) {
+                Rows rows;
+                rows.partition_id = partition->id;
+                rows.index_id = index.index_id;
+                rows.row_idxes.reserve(row_cnt);
+                auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
+                it = tmp_it;
+            }
+            it->second.row_idxes.push_back(row_idx);
+            _number_output_rows++;
         }
-        rows_for_tablet[tablet_id].row_idxes.push_back(row_idx);
-        _number_output_rows++;
     }
 }
 
@@ -288,20 +300,22 @@ Status VOlapTableSinkV2::send(RuntimeState* state, 
vectorized::Block* input_bloc
     _row_distribution_watch.start();
     const auto num_rows = input_rows;
     const auto* __restrict filter_map = _block_convertor->filter_map();
-    for (int i = 0; i < num_rows; ++i) {
-        if (UNLIKELY(has_filtered_rows) && filter_map[i]) {
-            continue;
-        }
-        const VOlapTablePartition* partition = nullptr;
-        bool is_continue = false;
-        uint32_t tablet_index = 0;
-        RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block.get(), i, 
&partition, tablet_index,
-                                                    stop_processing, 
is_continue));
-        if (is_continue) {
-            continue;
+
+    //reuse vars
+    _partitions.assign(num_rows, nullptr);
+    _skip.assign(num_rows, false);
+    _tablet_indexes.assign(num_rows, 0);
+
+    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), 
num_rows, _partitions,
+                                                 _tablet_indexes, 
stop_processing, _skip));
+
+    if (has_filtered_rows) {
+        for (int i = 0; i < num_rows; i++) {
+            _skip[i] = _skip[i] || filter_map[i];
         }
-        _generate_rows_for_tablet(rows_for_tablet, partition, tablet_index, i);
     }
+    _generate_rows_for_tablet(rows_for_tablet, _partitions, _tablet_indexes, 
_skip, num_rows);
+
     _row_distribution_watch.stop();
 
     // For each tablet, send its input_rows from block to delta writer
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index f70f74b9da6..42103fa03b1 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -60,6 +60,7 @@
 #include "util/stopwatch.hpp"
 #include "vec/columns/column.h"
 #include "vec/common/allocator.h"
+#include "vec/common/hash_table/phmap_fwd_decl.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
@@ -137,8 +138,9 @@ private:
     void _build_tablet_node_mapping();
 
     void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet,
-                                   const VOlapTablePartition* partition, 
uint32_t tablet_index,
-                                   int row_idx);
+                                   const std::vector<VOlapTablePartition*>& 
partitions,
+                                   const std::vector<uint32_t>& tablet_indexes,
+                                   const std::vector<bool>& skip, size_t 
row_cnt);
 
     Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t 
tablet_id,
                            const Rows& rows, const Streams& streams);
@@ -184,6 +186,11 @@ private:
     int64_t _number_input_rows = 0;
     int64_t _number_output_rows = 0;
 
+    // reuse for find_tablet
+    std::vector<VOlapTablePartition*> _partitions;
+    std::vector<bool> _skip;
+    std::vector<uint32_t> _tablet_indexes;
+
     MonotonicStopWatch _row_distribution_watch;
 
     RuntimeProfile::Counter* _input_rows_counter = nullptr;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 22005a9ac1c..d0720255695 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -44,6 +44,7 @@
 #include <string>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "olap/wal_manager.h"
 #include "util/runtime_profile.h"
@@ -421,7 +422,6 @@ Status VNodeChannel::open_wait() {
                 ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
                         open_closure->cntl.remote_side());
             }
-
             _cancelled = true;
             auto error_code = open_closure->cntl.ErrorCode();
             auto error_text = open_closure->cntl.ErrorText();
@@ -1337,50 +1337,80 @@ Status VTabletWriter::_incremental_open_node_channel(
     return Status::OK();
 }
 
+// Generate channel payload for sinking data to differenct node channel
+// Payload = std::pair<std::unique_ptr<vectorized::IColumn::Selector>, 
std::vector<int64_t>>;
+//   first = row_id, second = vector<tablet_id>
 void VTabletWriter::_generate_row_distribution_payload(
-        ChannelDistributionPayload& channel_to_payload, const 
VOlapTablePartition* partition,
-        uint32_t tablet_index, int row_idx, size_t row_cnt) {
-    // Generate channel payload for sinking data to differenct node channel
-    for (int j = 0; j < partition->indexes.size(); ++j) {
-        auto tid = partition->indexes[j].tablets[tablet_index];
-        auto it = _channels[j]->_channels_by_tablet.find(tid);
-        DCHECK(it != _channels[j]->_channels_by_tablet.end())
-                << "unknown tablet, tablet_id=" << tablet_index;
-        for (const auto& channel : it->second) {
-            if (channel_to_payload[j].count(channel.get()) < 1) {
-                channel_to_payload[j].insert(
-                        {channel.get(), Payload 
{std::unique_ptr<vectorized::IColumn::Selector>(
-                                                         new 
vectorized::IColumn::Selector()),
-                                                 std::vector<int64_t>()}});
+        ChannelDistributionPayload& channel_to_payload,
+        const std::vector<VOlapTablePartition*>& partitions,
+        const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>& 
skip,
+        size_t row_cnt) {
+    for (int row_idx = 0; row_idx < row_cnt; row_idx++) {
+        if (skip[row_idx]) {
+            continue;
+        }
+        const auto& partition = partitions[row_idx];
+        const auto& tablet_index = tablet_indexes[row_idx];
+
+        for (int index_num = 0; index_num < partition->indexes.size();
+             ++index_num) { // partition->indexes = [index, tablets...]
+
+            auto tablet_id = 
partition->indexes[index_num].tablets[tablet_index];
+            auto it = _channels[index_num]->_channels_by_tablet.find(
+                    tablet_id); // (tablet_id, VNodeChannel) where this tablet 
locate
+
+            DCHECK(it != _channels[index_num]->_channels_by_tablet.end())
+                    << "unknown tablet, tablet_id=" << tablet_index;
+
+            std::vector<std::shared_ptr<VNodeChannel>>& tablet_locations = 
it->second;
+            std::unordered_map<VNodeChannel*, Payload>& payloads_this_index =
+                    channel_to_payload[index_num]; // payloads of this index 
in every node
+
+            for (const auto& locate_node : tablet_locations) {
+                auto payload_it =
+                        payloads_this_index.find(locate_node.get()); // 
<VNodeChannel*, Payload>
+                if (payload_it == payloads_this_index.end()) {
+                    auto [tmp_it, _] = payloads_this_index.emplace(
+                            locate_node.get(),
+                            Payload 
{std::make_unique<vectorized::IColumn::Selector>(),
+                                     std::vector<int64_t>()});
+                    payload_it = tmp_it;
+                    payload_it->second.first->reserve(row_cnt);
+                    payload_it->second.second.reserve(row_cnt);
+                }
+                payload_it->second.first->push_back(row_idx);
+                payload_it->second.second.push_back(tablet_id);
             }
-            channel_to_payload[j][channel.get()].first->push_back(row_idx);
-            channel_to_payload[j][channel.get()].second.push_back(tid);
+            _number_output_rows++;
         }
-        _number_output_rows += row_cnt;
     }
 }
 
 Status VTabletWriter::_single_partition_generate(RuntimeState* state, 
vectorized::Block* block,
                                                  ChannelDistributionPayload& 
channel_to_payload,
                                                  size_t num_rows, bool 
has_filtered_rows) {
+    // only need to calculate one value for single partition.
+    std::vector<VOlapTablePartition*> partitions(1, nullptr);
+    std::vector<bool> skip(1, false);
+    std::vector<uint32_t> tablet_indexes(1, 0);
+    bool stop_processing = false;
+
+    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, 1, partitions, 
tablet_indexes,
+                                                 stop_processing, skip));
+
     const VOlapTablePartition* partition = nullptr;
     uint32_t tablet_index = 0;
-    bool stop_processing = false;
-    for (int32_t i = 0; i < num_rows; ++i) {
-        if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) {
-            continue;
-        }
-        bool is_continue = false;
-        RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block, i, 
&partition, tablet_index,
-                                                    stop_processing, 
is_continue));
-        if (is_continue) {
-            continue;
+    for (size_t i = 0; i < num_rows; i++) {
+        if (!skip[i]) {
+            partition = partitions[i];
+            tablet_index = tablet_indexes[i];
+            break;
         }
-        break;
     }
     if (partition == nullptr) {
         return Status::OK();
     }
+
     for (int j = 0; j < partition->indexes.size(); ++j) {
         auto tid = partition->indexes[j].tablets[tablet_index];
         auto it = _channels[j]->_channels_by_tablet.find(tid);
@@ -1388,10 +1418,9 @@ Status 
VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized
                 << "unknown tablet, tablet_id=" << tablet_index;
         int64_t row_cnt = 0;
         for (const auto& channel : it->second) {
-            if (channel_to_payload[j].count(channel.get()) < 1) {
+            if (!channel_to_payload[j].contains(channel.get())) {
                 channel_to_payload[j].insert(
-                        {channel.get(), Payload 
{std::unique_ptr<vectorized::IColumn::Selector>(
-                                                         new 
vectorized::IColumn::Selector()),
+                        {channel.get(), Payload 
{std::make_unique<vectorized::IColumn::Selector>(),
                                                  std::vector<int64_t>()}});
             }
             auto& selector = channel_to_payload[j][channel.get()].first;
@@ -1535,10 +1564,15 @@ Status VTabletWriter::close(Status exec_status) {
         auto status = Status::OK();
         // BE id -> add_batch method counter
         std::unordered_map<int64_t, AddBatchCounter> 
node_add_batch_counter_map;
-        int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, 
actual_consume_ns = 0,
-                total_add_batch_exec_time_ns = 0, max_add_batch_exec_time_ns = 
0,
-                total_wait_exec_time_ns = 0, max_wait_exec_time_ns = 0, 
total_add_batch_num = 0,
-                num_node_channels = 0;
+        int64_t serialize_batch_ns = 0;
+        int64_t queue_push_lock_ns = 0;
+        int64_t actual_consume_ns = 0;
+        int64_t total_add_batch_exec_time_ns = 0;
+        int64_t max_add_batch_exec_time_ns = 0;
+        int64_t total_wait_exec_time_ns = 0;
+        int64_t max_wait_exec_time_ns = 0;
+        int64_t total_add_batch_num = 0;
+        int64_t num_node_channels = 0;
         VNodeChannelStat channel_stat;
 
         for (const auto& index_channel : _channels) {
@@ -1665,7 +1699,7 @@ Status VTabletWriter::close(Status exec_status) {
                 [](const std::shared_ptr<VNodeChannel>& ch) { 
ch->clear_all_blocks(); });
     }
 
-    if (_wal_writer.get() != nullptr) {
+    if (_wal_writer != nullptr) {
         static_cast<void>(_wal_writer->finalize());
     }
     return _close_status;
@@ -1703,7 +1737,7 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
     SCOPED_RAW_TIMER(&_send_data_ns);
     // This is just for passing compilation.
     bool stop_processing = false;
-    std::vector<std::unordered_map<VNodeChannel*, Payload>> channel_to_payload;
+    ChannelDistributionPayload channel_to_payload;
     channel_to_payload.resize(_channels.size());
     _tablet_finder->clear_for_new_batch();
     _row_distribution_watch.start();
@@ -1737,34 +1771,30 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
             missing_map.reserve(partition_col.column->size());
 
             // try to find tablet and save missing value
-            for (int i = 0; i < num_rows; ++i) {
-                if (UNLIKELY(has_filtered_rows) && 
_block_convertor->filter_map()[i]) {
-                    continue;
-                }
-                const VOlapTablePartition* partition = nullptr;
-                bool is_continue = false;
-                uint32_t tablet_index = 0;
-                bool missing_this = false;
-                RETURN_IF_ERROR(_tablet_finder->find_tablet(_state, 
block.get(), i, &partition,
-                                                            tablet_index, 
stop_processing,
-                                                            is_continue, 
&missing_this));
-                if (missing_this) {
-                    missing_map.push_back(i);
-                } else {
-                    _generate_row_distribution_payload(channel_to_payload, 
partition, tablet_index,
-                                                       i, 1);
+            std::vector<VOlapTablePartition*> partitions(num_rows, nullptr);
+            std::vector<bool> skip(num_rows, false);
+            std::vector<uint32_t> tablet_indexes(num_rows, 0);
+
+            //TODO: we could use the buffer to save tablets we found so that 
no need to find them again when we created partitions and try to append block 
next time.
+            RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), 
num_rows, partitions,
+                                                         tablet_indexes, 
stop_processing, skip,
+                                                         &missing_map));
+
+            if (missing_map.empty()) {
+                // we don't calculate it distribution when have missing values
+                if (has_filtered_rows) {
+                    for (int i = 0; i < num_rows; i++) {
+                        skip[i] = skip[i] || _block_convertor->filter_map()[i];
+                    }
                 }
-            }
-            missing_map.shrink_to_fit();
-
-            // for missing partition keys, calc the missing partition and save 
in _partitions_need_create
-            auto type = partition_col.type;
-            if (missing_map.size() > 0) {
+                _generate_row_distribution_payload(channel_to_payload, 
partitions, tablet_indexes,
+                                                   skip, num_rows);
+            } else { // for missing partition keys, calc the missing partition 
and save in _partitions_need_create
                 auto return_type = part_func->data_type();
 
                 // expose the data column
                 vectorized::ColumnPtr range_left_col = 
block->get_by_position(result_idx).column;
-                if (auto* nullable =
+                if (const auto* nullable =
                             
check_and_get_column<vectorized::ColumnNullable>(*range_left_col)) {
                     range_left_col = nullable->get_nested_column_ptr();
                     return_type =
@@ -1786,23 +1816,20 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
                 return Status::NeedSendAgain("");
             }    // creating done
         } else { // not auto partition
-            for (int i = 0; i < num_rows; ++i) {
-                if (UNLIKELY(has_filtered_rows) && 
_block_convertor->filter_map()[i]) {
-                    continue;
-                }
-                const VOlapTablePartition* partition = nullptr;
-                bool is_continue = false;
-                uint32_t tablet_index = 0;
-                RETURN_IF_ERROR(_tablet_finder->find_tablet(_state, 
block.get(), i, &partition,
-                                                            tablet_index, 
stop_processing,
-                                                            is_continue));
-                if (is_continue) {
-                    continue;
+            std::vector<VOlapTablePartition*> partitions(num_rows, nullptr);
+            std::vector<bool> skip(num_rows, false);
+            std::vector<uint32_t> tablet_indexes(num_rows, 0);
+
+            RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), 
num_rows, partitions,
+                                                         tablet_indexes, 
stop_processing, skip));
+
+            if (has_filtered_rows) {
+                for (int i = 0; i < num_rows; i++) {
+                    skip[i] = skip[i] || _block_convertor->filter_map()[i];
                 }
-                // each row
-                _generate_row_distribution_payload(channel_to_payload, 
partition, tablet_index, i,
-                                                   1);
             }
+            _generate_row_distribution_payload(channel_to_payload, partitions, 
tablet_indexes, skip,
+                                               num_rows);
         }
     }
     _row_distribution_watch.stop();
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 4e95b444d79..c8d5d1c2ce9 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -553,10 +553,13 @@ private:
     using ChannelDistributionPayload = 
std::vector<std::unordered_map<VNodeChannel*, Payload>>;
 
     Status _init(RuntimeState* state, RuntimeProfile* profile);
-    // payload for each row
-    void _generate_row_distribution_payload(ChannelDistributionPayload& 
payload,
-                                            const VOlapTablePartition* 
partition,
-                                            uint32_t tablet_index, int 
row_idx, size_t row_cnt);
+
+    // payload for every row
+    void _generate_row_distribution_payload(ChannelDistributionPayload& 
channel_to_payload,
+                                            const 
std::vector<VOlapTablePartition*>& partitions,
+                                            const std::vector<uint32_t>& 
tablet_indexes,
+                                            const std::vector<bool>& skip, 
size_t row_cnt);
+
     Status _single_partition_generate(RuntimeState* state, vectorized::Block* 
block,
                                       ChannelDistributionPayload& 
channel_to_payload,
                                       size_t num_rows, bool has_filtered_rows);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to