This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git

The following commit(s) were added to refs/heads/master by this push:
     new c3a64aa0b06 [UT](exchanger) Add UT case for shuffle exchanger (#47598)
c3a64aa0b06 is described below

commit c3a64aa0b06e8d1c32fcb483ab9bc1c127e18112
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Sat Feb 8 10:00:53 2025 +0800

    [UT](exchanger) Add UT case for shuffle exchanger (#47598)
---
 .../local_exchange_sink_operator.cpp               |  13 +-
 .../local_exchange/local_exchange_sink_operator.h  |   5 +-
 be/src/pipeline/local_exchange/local_exchanger.cpp |  22 +-
 be/src/pipeline/local_exchange/local_exchanger.h   |   4 +-
 be/test/pipeline/local_exchanger_test.cpp          | 286 +++++++++++++++++++++
 5 files changed, 305 insertions(+), 25 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index b22ee9fd77e..76e92428766 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -41,21 +41,17 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, 
const int num_buckets
     _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + 
")";
     _type = type;
     if (_type == ExchangeType::HASH_SHUFFLE) {
+        _shuffle_idx_to_instance_idx.clear();
         _use_global_shuffle = use_global_hash_shuffle;
         // For shuffle join, if data distribution has been broken by previous 
operator, we
         // should use a HASH_SHUFFLE local exchanger to shuffle data again. To 
be mentioned,
         // we should use map shuffle idx to instance idx because all instances 
will be
         // distributed to all BEs. Otherwise, we should use shuffle idx 
directly.
         if (use_global_hash_shuffle) {
-            std::for_each(shuffle_idx_to_instance_idx.begin(), 
shuffle_idx_to_instance_idx.end(),
-                          [&](const auto& item) {
-                              DCHECK(item.first != -1);
-                              
_shuffle_idx_to_instance_idx.push_back({item.first, item.second});
-                          });
+            _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
         } else {
-            _shuffle_idx_to_instance_idx.resize(_num_partitions);
             for (int i = 0; i < _num_partitions; i++) {
-                _shuffle_idx_to_instance_idx[i] = {i, i};
+                _shuffle_idx_to_instance_idx[i] = i;
             }
         }
         _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
@@ -147,7 +143,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     RETURN_IF_ERROR(local_state._exchanger->sink(
             state, in_block, eos,
             {local_state._compute_hash_value_timer, 
local_state._distribute_timer, nullptr},
-            {&local_state._channel_id, local_state._partitioner.get(), 
&local_state}));
+            {&local_state._channel_id, local_state._partitioner.get(), 
&local_state,
+             &_shuffle_idx_to_instance_idx}));
 
     // If all exchange sources ended due to limit reached, current task should 
also finish
     if (local_state._exchanger->_running_source_operators == 0) {
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index c067f023c8d..7ef053af6f7 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -91,7 +91,7 @@ public:
               _num_partitions(num_partitions),
               _texprs(texprs),
               _partitioned_exprs_num(texprs.size()),
-              _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
+              _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override {
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
@@ -116,8 +116,7 @@ private:
     const std::vector<TExpr>& _texprs;
     const size_t _partitioned_exprs_num;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
-    const std::map<int, int> _bucket_seq_to_instance_idx;
-    std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
+    std::map<int, int> _shuffle_idx_to_instance_idx;
     bool _use_global_shuffle = false;
 };
 
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 06843e18d86..8b55bd6b440 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -72,11 +72,7 @@ bool 
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* local_st
                                          BlockType& block, bool* eos, 
vectorized::Block* data_block,
                                          int channel_id) {
     if (local_state == nullptr) {
-        if (!_dequeue_data(block, eos, data_block, channel_id)) {
-            throw Exception(ErrorCode::INTERNAL_ERROR, "Exchanger has no data: 
{}",
-                            data_queue_debug_string(channel_id));
-        }
-        return true;
+        return _dequeue_data(block, eos, data_block, channel_id);
     }
     bool all_finished = _running_sink_operators == 0;
     if (_data_queue[channel_id].try_dequeue(block)) {
@@ -160,7 +156,8 @@ Status ShuffleExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
     {
         SCOPED_TIMER(profile.distribute_timer);
         RETURN_IF_ERROR(_split_rows(state, 
sink_info.partitioner->get_channel_ids().get<uint32_t>(),
-                                    in_block, *sink_info.channel_id, 
sink_info.local_state));
+                                    in_block, *sink_info.channel_id, 
sink_info.local_state,
+                                    sink_info.shuffle_idx_to_instance_idx));
     }
 
     return Status::OK();
@@ -214,7 +211,8 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
 
 Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* 
__restrict channel_ids,
                                      vectorized::Block* block, int channel_id,
-                                     LocalExchangeSinkLocalState* local_state) 
{
+                                     LocalExchangeSinkLocalState* local_state,
+                                     std::map<int, int>* 
shuffle_idx_to_instance_idx) {
     if (local_state == nullptr) {
         return _split_rows(state, channel_ids, block, channel_id);
     }
@@ -249,8 +247,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
     }
     
local_state->_shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(),
                                                     channel_id);
-    auto bucket_seq_to_instance_idx =
-            
local_state->_parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
     if (get_type() == ExchangeType::HASH_SHUFFLE) {
         /**
          * If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed 
to all instances of
@@ -258,8 +254,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
          * For example, row 1 get a hash value 1 which means we should 
distribute to instance 1 on
          * BE 1 and row 2 get a hash value 2 which means we should distribute 
to instance 1 on BE 3.
          */
-        const auto& map = 
local_state->_parent->cast<LocalExchangeSinkOperatorX>()
-                                  ._shuffle_idx_to_instance_idx;
+        DCHECK(shuffle_idx_to_instance_idx && 
shuffle_idx_to_instance_idx->size() > 0);
+        const auto& map = *shuffle_idx_to_instance_idx;
         new_block_wrapper->ref(cast_set<int>(map.size()));
         for (const auto& it : map) {
             DCHECK(it.second >= 0 && it.second < _num_partitions)
@@ -274,13 +270,13 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             }
         }
     } else {
-        DCHECK(!bucket_seq_to_instance_idx.empty());
+        DCHECK(shuffle_idx_to_instance_idx && 
shuffle_idx_to_instance_idx->size() > 0);
         new_block_wrapper->ref(_num_partitions);
         for (int i = 0; i < _num_partitions; i++) {
             uint32_t start = partition_rows_histogram[i];
             uint32_t size = partition_rows_histogram[i + 1] - start;
             if (size > 0) {
-                _enqueue_data_and_set_ready(bucket_seq_to_instance_idx[i], 
local_state,
+                _enqueue_data_and_set_ready((*shuffle_idx_to_instance_idx)[i], 
local_state,
                                             {new_block_wrapper, {row_idx, 
start, size}});
             } else {
                 new_block_wrapper->unref(local_state->_shared_state, 
channel_id);
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index d5d53e04170..4aace54e6e3 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -41,6 +41,7 @@ struct SinkInfo {
     int* channel_id;
     vectorized::PartitionerBase* partitioner;
     LocalExchangeSinkLocalState* local_state;
+    std::map<int, int>* shuffle_idx_to_instance_idx;
 };
 
 struct SourceInfo {
@@ -262,7 +263,8 @@ public:
 protected:
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
                        vectorized::Block* block, int channel_id,
-                       LocalExchangeSinkLocalState* local_state);
+                       LocalExchangeSinkLocalState* local_state,
+                       std::map<int, int>* shuffle_idx_to_instance_idx);
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
                        vectorized::Block* block, int channel_id);
     std::vector<std::vector<uint32_t>> _partition_rows_histogram;
diff --git a/be/test/pipeline/local_exchanger_test.cpp 
b/be/test/pipeline/local_exchanger_test.cpp
new file mode 100644
index 00000000000..3db2375866c
--- /dev/null
+++ b/be/test/pipeline/local_exchanger_test.cpp
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "common/status.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/exchange_source_operator.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/local_exchange/local_exchange_sink_operator.h"
+#include "pipeline/local_exchange/local_exchange_source_operator.h"
+#include "thrift_builder.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_vector.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/exprs/vslot_ref.h"
+
+namespace doris::pipeline {
+
+class LocalExchangerTest : public testing::Test {
+public:
+    LocalExchangerTest() = default;
+    ~LocalExchangerTest() override = default;
+    void SetUp() override {
+        _query_options = TQueryOptionsBuilder()
+                                 .set_enable_local_exchange(true)
+                                 .set_enable_local_shuffle(true)
+                                 .set_runtime_filter_max_in_num(15)
+                                 .build();
+        auto fe_address = TNetworkAddress();
+        fe_address.hostname = LOCALHOST;
+        fe_address.port = DUMMY_PORT;
+        _query_ctx =
+                QueryContext::create(_query_id, ExecEnv::GetInstance(), 
_query_options, fe_address,
+                                     true, fe_address, 
QuerySource::INTERNAL_FRONTEND);
+        _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+                TRuntimeFilterParamsBuilder().build());
+        _runtime_state = RuntimeState::create_unique(_query_id, _fragment_id, 
_query_options,
+                                                     _query_ctx->query_globals,
+                                                     ExecEnv::GetInstance(), 
_query_ctx.get());
+    }
+    void TearDown() override {}
+
+private:
+    std::unique_ptr<RuntimeState> _runtime_state;
+    TUniqueId _query_id = TUniqueId();
+    int _fragment_id = 0;
+    TQueryOptions _query_options;
+    std::shared_ptr<QueryContext> _query_ctx;
+
+    const std::string LOCALHOST = BackendOptions::get_localhost();
+    const int DUMMY_PORT = config::brpc_port;
+};
+
+TEST_F(LocalExchangerTest, ShuffleExchanger) {
+    int num_sink = 4;
+    int num_sources = 4;
+    int num_partitions = 4;
+    int free_block_limit = 0;
+    std::map<int, int> shuffle_idx_to_instance_idx;
+    for (int i = 0; i < num_partitions; i++) {
+        shuffle_idx_to_instance_idx[i] = i;
+    }
+
+    std::vector<std::pair<std::vector<uint32_t>, int>> hash_vals_and_value;
+    std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> 
_sink_local_states;
+    std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states;
+    _sink_local_states.resize(num_sink);
+    _local_states.resize(num_sources);
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto shared_state = 
LocalExchangeSharedState::create_shared(num_partitions);
+    shared_state->exchanger = ShuffleExchanger::create_unique(num_sink, 
num_sources, num_partitions,
+                                                              
free_block_limit);
+    auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
+    sink_dep->set_shared_state(shared_state.get());
+    shared_state->sink_deps.push_back(sink_dep);
+    shared_state->create_dependencies(0);
+
+    auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get();
+    for (size_t i = 0; i < num_sink; i++) {
+        auto compute_hash_value_timer =
+                ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i));
+        auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + 
std::to_string(i));
+        _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, 
nullptr));
+        _sink_local_states[i]->_exchanger = shared_state->exchanger.get();
+        _sink_local_states[i]->_compute_hash_value_timer = 
compute_hash_value_timer;
+        _sink_local_states[i]->_distribute_timer = distribute_timer;
+        _sink_local_states[i]->_partitioner.reset(
+                new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+                        num_partitions));
+        _sink_local_states[i]->_channel_id = i;
+        _sink_local_states[i]->_shared_state = shared_state.get();
+        _sink_local_states[i]->_dependency = sink_dep.get();
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        auto get_block_failed_counter =
+                ADD_TIMER(profile, "_get_block_failed_counter" + 
std::to_string(i));
+        auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + 
std::to_string(i));
+        _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, 
nullptr));
+        _local_states[i]->_exchanger = shared_state->exchanger.get();
+        _local_states[i]->_get_block_failed_counter = get_block_failed_counter;
+        _local_states[i]->_copy_data_timer = copy_data_timer;
+        _local_states[i]->_channel_id = i;
+        _local_states[i]->_shared_state = shared_state.get();
+        _local_states[i]->_dependency = 
shared_state->get_dep_by_channel_id(i).front().get();
+        _local_states[i]->_memory_used_counter = 
profile->AddHighWaterMarkCounter(
+                "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+        shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter;
+    }
+
+    {
+        // Enqueue 2 blocks with 10 rows for each data queue.
+        for (size_t i = 0; i < num_partitions; i++) {
+            hash_vals_and_value.push_back({std::vector<uint32_t> {}, i});
+            for (size_t j = 0; j < 2; j++) {
+                vectorized::Block in_block;
+                vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+                auto int_col0 = vectorized::ColumnInt32::create();
+                int_col0->insert_many_vals(hash_vals_and_value.back().second, 
10);
+
+                auto pre_size = hash_vals_and_value.back().first.size();
+                hash_vals_and_value.back().first.resize(pre_size + 10);
+                std::fill(hash_vals_and_value.back().first.begin() + pre_size,
+                          hash_vals_and_value.back().first.end(), 0);
+                
int_col0->update_crcs_with_value(hash_vals_and_value.back().first.data() + 
pre_size,
+                                                 PrimitiveType::TYPE_INT,
+                                                 
cast_set<uint32_t>(int_col0->size()), 0, nullptr);
+                in_block.insert({std::move(int_col0), int_type, 
"test_int_col0"});
+                bool in_eos = false;
+                auto texpr =
+                        TExprNodeBuilder(
+                                TExprNodeType::SLOT_REF,
+                                TTypeDescBuilder()
+                                        .set_types(TTypeNodeBuilder()
+                                                           
.set_type(TTypeNodeType::SCALAR)
+                                                           
.set_scalar_type(TPrimitiveType::INT)
+                                                           .build())
+                                        .build(),
+                                0)
+                                .set_slot_ref(TSlotRefBuilder(0, 0).build())
+                                .build();
+                auto slot = doris::vectorized::VSlotRef::create_shared(texpr);
+                slot->_column_id = 0;
+                
((vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>*)
+                         _sink_local_states[i]
+                                 ->_partitioner.get())
+                        ->_partition_expr_ctxs.push_back(
+                                
std::make_shared<doris::vectorized::VExprContext>(slot));
+                EXPECT_EQ(exchanger->sink(
+                                  _runtime_state.get(), &in_block, in_eos,
+                                  
{_sink_local_states[i]->_compute_hash_value_timer,
+                                   _sink_local_states[i]->_distribute_timer, 
nullptr},
+                                  {&_sink_local_states[i]->_channel_id,
+                                   _sink_local_states[i]->_partitioner.get(),
+                                   _sink_local_states[i].get(), 
&shuffle_idx_to_instance_idx}),
+                          Status::OK());
+                EXPECT_EQ(_sink_local_states[i]->_channel_id, i);
+            }
+        }
+    }
+
+    {
+        int64_t mem_usage = 0;
+        for (const auto& it : hash_vals_and_value) {
+            auto channel_id = it.first.back() % num_partitions;
+            EXPECT_GT(shared_state->mem_counters[channel_id]->value(), 0);
+            mem_usage += shared_state->mem_counters[channel_id]->value();
+            EXPECT_EQ(_local_states[channel_id]->_dependency->ready(), true);
+        }
+        EXPECT_EQ(shared_state->mem_usage, mem_usage);
+        // Dequeue from data queue and accumulate rows if rows is smaller than 
batch_size.
+        for (const auto& it : hash_vals_and_value) {
+            bool eos = false;
+            auto channel_id = it.first.back() % num_partitions;
+            vectorized::Block block;
+            EXPECT_EQ(exchanger->get_block(
+                              _runtime_state.get(), &block, &eos,
+                              {nullptr, nullptr, 
_local_states[channel_id]->_copy_data_timer},
+                              
{cast_set<int>(_local_states[channel_id]->_channel_id),
+                               _local_states[channel_id].get()}),
+                      Status::OK());
+            EXPECT_EQ(block.rows(), 20);
+            EXPECT_EQ(eos, false);
+            EXPECT_EQ(_local_states[channel_id]->_dependency->ready(), false);
+        }
+        EXPECT_EQ(shared_state->mem_usage, 0);
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        EXPECT_EQ(exchanger->_data_queue[i].eos, false);
+        EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+    }
+    for (size_t i = 0; i < num_sink; i++) {
+        shared_state->sub_running_sink_operators();
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        bool eos = false;
+        vectorized::Block block;
+        EXPECT_EQ(exchanger->get_block(
+                          _runtime_state.get(), &block, &eos,
+                          {nullptr, nullptr, 
_local_states[i]->_copy_data_timer},
+                          {cast_set<int>(_local_states[i]->_channel_id), 
_local_states[i].get()}),
+                  Status::OK());
+        EXPECT_EQ(block.rows(), 0);
+        EXPECT_EQ(eos, true);
+        EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        exchanger->close({cast_set<int>(i), nullptr});
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        shared_state->sub_running_source_operators();
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        EXPECT_EQ(exchanger->_data_queue[i].eos, true);
+        EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+    }
+
+    {
+        // After exchanger closed, data will never push into data queue again.
+        hash_vals_and_value.clear();
+        for (size_t i = 0; i < num_partitions; i++) {
+            hash_vals_and_value.push_back({std::vector<uint32_t> {}, i});
+            vectorized::Block in_block;
+            vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+            auto int_col0 = vectorized::ColumnInt32::create();
+            int_col0->insert_many_vals(hash_vals_and_value.back().second, 10);
+
+            auto pre_size = hash_vals_and_value.back().first.size();
+            hash_vals_and_value.back().first.resize(pre_size + 10);
+            std::fill(hash_vals_and_value.back().first.begin() + pre_size,
+                      hash_vals_and_value.back().first.end(), 0);
+            
int_col0->update_crcs_with_value(hash_vals_and_value.back().first.data() + 
pre_size,
+                                             PrimitiveType::TYPE_INT,
+                                             
cast_set<uint32_t>(int_col0->size()), 0, nullptr);
+            in_block.insert({std::move(int_col0), int_type, "test_int_col0"});
+            bool in_eos = false;
+            auto texpr = TExprNodeBuilder(
+                                 TExprNodeType::SLOT_REF,
+                                 TTypeDescBuilder()
+                                         .set_types(TTypeNodeBuilder()
+                                                            
.set_type(TTypeNodeType::SCALAR)
+                                                            
.set_scalar_type(TPrimitiveType::INT)
+                                                            .build())
+                                         .build(),
+                                 0)
+                                 .set_slot_ref(TSlotRefBuilder(0, 0).build())
+                                 .build();
+            auto slot = doris::vectorized::VSlotRef::create_shared(texpr);
+            slot->_column_id = 0;
+            
((vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>*)_sink_local_states[i]
+                     ->_partitioner.get())
+                    ->_partition_expr_ctxs.push_back(
+                            
std::make_shared<doris::vectorized::VExprContext>(slot));
+            EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos,
+                                      
{_sink_local_states[i]->_compute_hash_value_timer,
+                                       
_sink_local_states[i]->_distribute_timer, nullptr},
+                                      {&_sink_local_states[i]->_channel_id,
+                                       
_sink_local_states[i]->_partitioner.get(),
+                                       _sink_local_states[i].get(), 
&shuffle_idx_to_instance_idx}),
+                      Status::OK());
+            EXPECT_EQ(_sink_local_states[i]->_channel_id, i);
+        }
+        for (size_t i = 0; i < num_sources; i++) {
+            EXPECT_EQ(exchanger->_data_queue[i].eos, true);
+            EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+        }
+    }
+}
+
+} // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to