This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_join
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8b3a79cb5fced02a1dccf6242f95558583b95bd5
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Thu Oct 26 13:40:52 2023 +0800

    update
    
    update
    
    update
    
    fix
---
 be/src/exprs/runtime_filter_slots.h                |  5 ++--
 .../local_exchange_sink_operator.cpp               |  4 +--
 be/src/vec/columns/column_vector.cpp               | 13 ++-------
 be/src/vec/common/hash_table/hash_map.h            |  7 ++++-
 be/src/vec/common/hash_table/hash_map_context.h    | 32 ++++++++++++++--------
 .../vec/exec/join/process_hash_table_probe_impl.h  |  2 +-
 be/src/vec/exec/join/vhash_join_node.h             |  6 ++--
 7 files changed, 36 insertions(+), 33 deletions(-)

diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 63c5665d271..0f841e5a60f 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -174,6 +174,7 @@ public:
                 auto column = it->get_by_position(result_column_id).column;
 
                 std::vector<int> indexs;
+                // indexs start from 1 because the first row is mocked for 
join hash map
                 if (const auto* nullable =
                             
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) {
                     column = nullable->get_nested_column_ptr();
@@ -181,14 +182,14 @@ public:
                                                       
nullable->get_null_map_column_ptr().get())
                                                       ->get_data()
                                                       .data();
-                    for (int i = 0; i < column->size(); i++) {
+                    for (int i = 1; i < column->size(); i++) {
                         if (null_map[i]) {
                             continue;
                         }
                         indexs.push_back(i);
                     }
                 } else {
-                    for (int i = 0; i < column->size(); i++) {
+                    for (int i = 1; i < column->size(); i++) {
                         indexs.push_back(i);
                     }
                 }
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index ec959b20cea..c2fec25e995 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -37,7 +37,7 @@ Status 
LocalExchangeSinkLocalState::channel_add_rows(RuntimeState* state,
                                                      vectorized::Block* block,
                                                      SourceState source_state) 
{
     auto& data_queue = _shared_state->data_queue;
-    std::vector<int> channel2rows[data_queue.size()];
+    std::vector<uint32_t> channel2rows[data_queue.size()];
 
     auto rows = block->rows();
     for (int i = 0; i < rows; i++) {
@@ -48,7 +48,7 @@ Status 
LocalExchangeSinkLocalState::channel_add_rows(RuntimeState* state,
             _mutable_block[i] = 
vectorized::MutableBlock::create_unique(block->clone_empty());
         }
 
-        const int* begin = channel2rows[i].data();
+        const auto* begin = channel2rows[i].data();
         _mutable_block[i]->add_rows(block, begin, begin + 
channel2rows[i].size());
         if (_mutable_block[i]->rows() > state->batch_size() ||
             source_state == SourceState::FINISHED) {
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index c4ca97df7a2..69c09dcde80 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -375,17 +375,8 @@ void ColumnVector<T>::insert_indices_from(const IColumn& 
src,
 
     const T* src_data = reinterpret_cast<const T*>(src.get_raw_data().data);
 
-    if constexpr (std::is_same_v<T, UInt8>) {
-        // nullmap : indices_begin[i] == 0 means is null at the here, set true 
here
-        for (int i = 0; i < new_size; ++i) {
-            data[origin_size + i] =
-                    (indices_begin[i] == 0) + (indices_begin[i] != 0) * 
src_data[indices_begin[i]];
-        }
-    } else {
-        // real data : indices_begin[i] == 0 what at is meaningless
-        for (int i = 0; i < new_size; ++i) {
-            data[origin_size + i] = src_data[indices_begin[i]];
-        }
+    for (int i = 0; i < new_size; ++i) {
+        data[origin_size + i] = src_data[indices_begin[i]];
     }
 }
 
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 1c132fc99ed..cedcb0515a6 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -336,6 +336,10 @@ private:
         uint32_t build_idx = 0;
         const auto batch_size = max_batch_size;
 
+        if (!build_keys) {
+            probe_idx = probe_rows;
+        }
+
         auto do_the_probe = [&]() {
             while (build_idx && LIKELY(matched_cnt < batch_size)) {
                 if (keys[probe_idx] == build_keys[build_idx]) {
@@ -381,7 +385,8 @@ private:
         return std::pair {probe_idx, matched_cnt};
     }
 
-    const Key* __restrict build_keys;
+    const Key* __restrict build_keys = nullptr;
+
     std::vector<uint8_t> visited;
 
     int max_batch_size = 0;
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 549301ae477..01bdd88fc74 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -153,6 +153,10 @@ struct MethodSerialized : public MethodBase<TData> {
     using State = ColumnsHashing::HashMethodSerialized<typename Base::Value, 
typename Base::Mapped>;
     using Base::try_presis_key;
 
+    // need keep until the hash probe end.
+    std::vector<StringRef> build_stored_keys;
+    Arena build_arena;
+    // refresh each time probe
     std::vector<StringRef> stored_keys;
 
     StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size,
@@ -167,10 +171,10 @@ struct MethodSerialized : public MethodBase<TData> {
         return {begin, sum_size};
     }
 
-    void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                              const uint8_t* null_map = nullptr, bool is_build 
= false) override {
-        Base::arena.clear();
-        stored_keys.resize(num_rows);
+    void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                                   std::vector<StringRef>& keys, Arena& arena) 
{
+        arena.clear();
+        keys.resize(num_rows);
 
         size_t max_one_row_byte_size = 0;
         for (const auto& column : key_columns) {
@@ -182,24 +186,28 @@ struct MethodSerialized : public MethodBase<TData> {
             // reach mem limit, don't serialize in batch
             size_t keys_size = key_columns.size();
             for (size_t i = 0; i < num_rows; ++i) {
-                stored_keys[i] =
-                        serialize_keys_to_pool_contiguous(i, keys_size, 
key_columns, Base::arena);
+                keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, 
key_columns, arena);
             }
         } else {
-            auto* serialized_key_buffer =
-                    reinterpret_cast<uint8_t*>(Base::arena.alloc(total_bytes));
+            auto* serialized_key_buffer = 
reinterpret_cast<uint8_t*>(arena.alloc(total_bytes));
 
             for (size_t i = 0; i < num_rows; ++i) {
-                stored_keys[i].data =
+                keys[i].data =
                         reinterpret_cast<char*>(serialized_key_buffer + i * 
max_one_row_byte_size);
-                stored_keys[i].size = 0;
+                keys[i].size = 0;
             }
 
             for (const auto& column : key_columns) {
-                column->serialize_vec(stored_keys, num_rows, 
max_one_row_byte_size);
+                column->serialize_vec(keys, num_rows, max_one_row_byte_size);
             }
         }
-        Base::keys = stored_keys.data();
+        Base::keys = keys.data();
+    }
+
+    void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                              const uint8_t* null_map = nullptr, bool is_build 
= false) override {
+        init_serialized_keys_impl(key_columns, num_rows, is_build ? 
build_stored_keys : stored_keys,
+                                  is_build ? build_arena : Base::arena);
         Base::init_hash_values(num_rows, null_map);
     }
 
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index a267a96d55f..8827d51783d 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -68,7 +68,7 @@ void ProcessHashTableProbe<JoinOpType, 
Parent>::build_side_output_column(
     constexpr auto probe_all =
             JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN;
 
-    if (!is_semi_anti_join || have_other_join_conjunct) {
+    if ((!is_semi_anti_join || have_other_join_conjunct) && size) {
         for (int i = 0; i < _right_col_len; i++) {
             const auto& column = *_build_block->safe_get_by_position(i).column;
             if (output_slot_flags[i]) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 6fb1703120e..a514f454214 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -87,10 +87,8 @@ struct ProcessRuntimeFilterBuild {
                 state, hash_table_ctx.hash_table->size(), 
parent->_build_rf_cardinality));
 
         if (!parent->_runtime_filter_slots->empty() && 
!parent->_inserted_blocks.empty()) {
-            {
-                SCOPED_TIMER(parent->_push_compute_timer);
-                
parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
-            }
+            SCOPED_TIMER(parent->_push_compute_timer);
+            parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
         }
         {
             SCOPED_TIMER(parent->_push_down_timer);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to