This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1738a56dc9d [UT](exchanger) Add UT case to test local merge exchanger 
(#47787)
1738a56dc9d is described below

commit 1738a56dc9d316431b63b8d7ab0134c722128533
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Wed Feb 12 10:15:32 2025 +0800

    [UT](exchanger) Add UT case to test local merge exchanger (#47787)
---
 be/src/pipeline/exec/operator.h                    |   6 +
 .../local_exchange/local_exchange_sink_operator.h  |   9 +
 .../local_exchange_source_operator.h               |   3 +
 be/src/pipeline/local_exchange/local_exchanger.cpp |   1 -
 be/src/pipeline/local_exchange/local_exchanger.h   |   5 +-
 be/src/vec/core/sort_cursor.h                      |  21 +-
 be/test/pipeline/local_exchanger_test.cpp          | 284 +++++++++++++++++++++
 7 files changed, 313 insertions(+), 16 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index e0f11049415..241fd42b5c7 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -435,6 +435,9 @@ public:
 
     DataSinkOperatorXBase(const int operator_id, const int node_id, 
std::vector<int>& sources)
             : OperatorBase(), _operator_id(operator_id), _node_id(node_id), 
_dests_id(sources) {}
+#ifdef BE_TEST
+    DataSinkOperatorXBase() : _operator_id(-1), _node_id(0), _dests_id({-1}) 
{};
+#endif
 
     ~DataSinkOperatorXBase() override = default;
 
@@ -537,6 +540,9 @@ public:
 
     DataSinkOperatorX(const int id, const int node_id, std::vector<int> 
sources)
             : DataSinkOperatorXBase(id, node_id, sources) {}
+#ifdef BE_TEST
+    DataSinkOperatorX() = default;
+#endif
     ~DataSinkOperatorX() override = default;
 
     Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override;
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 7ef053af6f7..1620a68d33e 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -92,6 +92,15 @@ public:
               _texprs(texprs),
               _partitioned_exprs_num(texprs.size()),
               _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
+#ifdef BE_TEST
+    LocalExchangeSinkOperatorX(const std::vector<TExpr>& texprs,
+                               const std::map<int, int>& 
bucket_seq_to_instance_idx)
+            : Base(),
+              _num_partitions(0),
+              _texprs(texprs),
+              _partitioned_exprs_num(texprs.size()),
+              _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
+#endif
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override {
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index 3c706d50182..e0d1715afb1 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -65,6 +65,9 @@ class LocalExchangeSourceOperatorX final : public 
OperatorX<LocalExchangeSourceL
 public:
     using Base = OperatorX<LocalExchangeSourceLocalState>;
     LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, id, 
id) {}
+#ifdef BE_TEST
+    LocalExchangeSourceOperatorX() = default;
+#endif
     Status init(ExchangeType type) override {
         _op_name = "LOCAL_EXCHANGE_OPERATOR (" + get_exchange_type_name(type) 
+ ")";
         _exchange_type = type;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 0fb4db625a5..1fc47b2f55d 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -19,7 +19,6 @@
 
 #include "common/cast_set.h"
 #include "common/status.h"
-#include "pipeline/exec/sort_sink_operator.h"
 #include "pipeline/local_exchange/local_exchange_sink_operator.h"
 #include "pipeline/local_exchange/local_exchange_source_operator.h"
 #include "vec/runtime/partitioner.h"
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 7f87289e413..25f090e4208 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -87,7 +87,10 @@ public:
         ~BlockWrapper() {
             if (_shared_state != nullptr) {
                 DCHECK_GT(_allocated_bytes, 0);
-                _shared_state->sub_total_mem_usage(_allocated_bytes, 
_channel_ids.front());
+                // `_channel_ids` may be empty if exchanger is shuffled 
exchanger and channel id is
+                // not used by `sub_total_mem_usage`. So we just pass -1 here.
+                _shared_state->sub_total_mem_usage(
+                        _allocated_bytes, _channel_ids.empty() ? -1 : 
_channel_ids.front());
                 if (_shared_state->exchanger->_free_block_limit == 0 ||
                     _shared_state->exchanger->_free_blocks.size_approx() <
                             _shared_state->exchanger->_free_block_limit *
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index a37b6feb21e..ee4ce22f42d 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -205,23 +205,16 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
             return false;
         }
         block->clear();
-        Status status;
         do {
-            status = _block_supplier(block.get(), &_is_eof);
-        } while (block->empty() && !_is_eof && status.ok());
-        // If status not ok, upper callers could not detect whether it is eof 
or error.
-        // So that fatal here, and should throw exception in the future.
-        if (status.ok() && !block->empty()) {
-            if (!_ordering_expr.empty()) {
-                for (int i = 0; status.ok() && i < desc.size(); ++i) {
-                    // TODO yiguolei: throw exception if status not ok in the 
future
-                    status = _ordering_expr[i]->execute(block.get(), 
&desc[i].column_number);
-                }
+            THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof));
+        } while (block->empty() && !_is_eof);
+        if (!block->empty()) {
+            DCHECK_EQ(_ordering_expr.size(), desc.size());
+            for (int i = 0; i < desc.size(); ++i) {
+                THROW_IF_ERROR(_ordering_expr[i]->execute(block.get(), 
&desc[i].column_number));
             }
             MergeSortCursorImpl::reset();
-            return status.ok();
-        } else if (!status.ok()) {
-            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, 
status.msg());
+            return true;
         }
         return false;
     }
diff --git a/be/test/pipeline/local_exchanger_test.cpp 
b/be/test/pipeline/local_exchanger_test.cpp
index c9a10eb32b5..b0e91f0018f 100644
--- a/be/test/pipeline/local_exchanger_test.cpp
+++ b/be/test/pipeline/local_exchanger_test.cpp
@@ -1098,4 +1098,288 @@ TEST_F(LocalExchangerTest, 
AdaptivePassthroughExchanger) {
     }
 }
 
+TEST_F(LocalExchangerTest, TestShuffleExchangerWrongMap) {
+    int num_sink = 1;
+    int num_sources = 4;
+    int num_partitions = 4;
+    int free_block_limit = 0;
+    std::map<int, int> shuffle_idx_to_instance_idx;
+    for (int i = 0; i < num_partitions; i++) {
+        shuffle_idx_to_instance_idx[i] = i;
+    }
+    // Wrong map lost (0 -> 0) mapping
+    std::map<int, int> wrong_shuffle_idx_to_instance_idx;
+    for (int i = 1; i < num_partitions; i++) {
+        wrong_shuffle_idx_to_instance_idx[i] = i;
+    }
+
+    std::vector<std::pair<std::vector<uint32_t>, int>> hash_vals_and_value;
+    std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> 
_sink_local_states;
+    std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states;
+    _sink_local_states.resize(num_sink);
+    _local_states.resize(num_sources);
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto shared_state = 
LocalExchangeSharedState::create_shared(num_partitions);
+    shared_state->exchanger = ShuffleExchanger::create_unique(num_sink, 
num_sources, num_partitions,
+                                                              
free_block_limit);
+    auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
+    sink_dep->set_shared_state(shared_state.get());
+    shared_state->sink_deps.push_back(sink_dep);
+    shared_state->create_dependencies(0);
+
+    auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get();
+    auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF,
+                                  TTypeDescBuilder()
+                                          .set_types(TTypeNodeBuilder()
+                                                             
.set_type(TTypeNodeType::SCALAR)
+                                                             
.set_scalar_type(TPrimitiveType::INT)
+                                                             .build())
+                                          .build(),
+                                  0)
+                         .set_slot_ref(TSlotRefBuilder(0, 0).build())
+                         .build();
+    std::vector<TExpr> texprs;
+    texprs.push_back(TExpr {});
+    for (size_t i = 0; i < num_sink; i++) {
+        auto compute_hash_value_timer =
+                ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i));
+        auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + 
std::to_string(i));
+        _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, 
nullptr));
+        _sink_local_states[i]->_exchanger = shared_state->exchanger.get();
+        _sink_local_states[i]->_compute_hash_value_timer = 
compute_hash_value_timer;
+        _sink_local_states[i]->_distribute_timer = distribute_timer;
+        _sink_local_states[i]->_partitioner.reset(
+                new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+                        num_partitions));
+        auto slot = doris::vectorized::VSlotRef::create_shared(texpr);
+        slot->_column_id = 0;
+        
((vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>*)_sink_local_states[i]
+                 ->_partitioner.get())
+                ->_partition_expr_ctxs.push_back(
+                        
std::make_shared<doris::vectorized::VExprContext>(slot));
+        _sink_local_states[i]->_channel_id = i;
+        _sink_local_states[i]->_shared_state = shared_state.get();
+        _sink_local_states[i]->_dependency = sink_dep.get();
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        auto get_block_failed_counter =
+                ADD_TIMER(profile, "_get_block_failed_counter" + 
std::to_string(i));
+        auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + 
std::to_string(i));
+        _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, 
nullptr));
+        _local_states[i]->_exchanger = shared_state->exchanger.get();
+        _local_states[i]->_get_block_failed_counter = get_block_failed_counter;
+        _local_states[i]->_copy_data_timer = copy_data_timer;
+        _local_states[i]->_channel_id = i;
+        _local_states[i]->_shared_state = shared_state.get();
+        _local_states[i]->_dependency = 
shared_state->get_dep_by_channel_id(i).front().get();
+        _local_states[i]->_memory_used_counter = 
profile->AddHighWaterMarkCounter(
+                "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+        shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter;
+    }
+    const auto num_blocks = 1;
+    {
+        for (size_t i = 0; i < num_partitions; i++) {
+            hash_vals_and_value.push_back({std::vector<uint32_t> {}, i});
+            for (size_t j = 0; j < num_blocks; j++) {
+                vectorized::Block in_block;
+                vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+                auto int_col0 = vectorized::ColumnInt32::create();
+                int_col0->insert_many_vals(hash_vals_and_value.back().second, 
10);
+
+                auto pre_size = hash_vals_and_value.back().first.size();
+                hash_vals_and_value.back().first.resize(pre_size + 10);
+                std::fill(hash_vals_and_value.back().first.begin() + pre_size,
+                          hash_vals_and_value.back().first.end(), 0);
+                
int_col0->update_crcs_with_value(hash_vals_and_value.back().first.data() + 
pre_size,
+                                                 PrimitiveType::TYPE_INT,
+                                                 
cast_set<uint32_t>(int_col0->size()), 0, nullptr);
+                in_block.insert({std::move(int_col0), int_type, 
"test_int_col0"});
+            }
+        }
+    }
+    {
+        // Enqueue 2 blocks with 10 rows for each data queue.
+        for (size_t i = 0; i < num_partitions; i++) {
+            hash_vals_and_value.push_back({std::vector<uint32_t> {}, i});
+            for (size_t j = 0; j < num_blocks; j++) {
+                vectorized::Block in_block;
+                vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+                auto int_col0 = vectorized::ColumnInt32::create();
+                int_col0->insert_many_vals(hash_vals_and_value[i].second, 10);
+                in_block.insert({std::move(int_col0), int_type, 
"test_int_col0"});
+                bool in_eos = false;
+                EXPECT_EQ(exchanger->sink(
+                                  _runtime_state.get(), &in_block, in_eos,
+                                  
{_sink_local_states[0]->_compute_hash_value_timer,
+                                   _sink_local_states[0]->_distribute_timer, 
nullptr},
+                                  {&_sink_local_states[0]->_channel_id,
+                                   _sink_local_states[0]->_partitioner.get(),
+                                   _sink_local_states[0].get(), 
&shuffle_idx_to_instance_idx}),
+                          Status::OK());
+            }
+        }
+    }
+    {
+        for (size_t i = 0; i < num_sources; i++) {
+            EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 1);
+        }
+    }
+
+    {
+        LocalExchangeSinkOperatorX op(texprs, 
wrong_shuffle_idx_to_instance_idx);
+        _sink_local_states[0]->_parent = &op;
+        EXPECT_EQ(hash_vals_and_value[0].first.front() % num_partitions, 0);
+        vectorized::Block in_block;
+        vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+        auto int_col0 = vectorized::ColumnInt32::create();
+        int_col0->insert_many_vals(hash_vals_and_value[0].second, 10);
+        in_block.insert({std::move(int_col0), int_type, "test_int_col0"});
+        bool in_eos = false;
+        EXPECT_TRUE(
+                exchanger
+                        ->sink(_runtime_state.get(), &in_block, in_eos,
+                               
{_sink_local_states[0]->_compute_hash_value_timer,
+                                _sink_local_states[0]->_distribute_timer, 
nullptr},
+                               {&_sink_local_states[0]->_channel_id,
+                                _sink_local_states[0]->_partitioner.get(),
+                                _sink_local_states[0].get(), 
&wrong_shuffle_idx_to_instance_idx})
+                        .is<ErrorCode::INTERNAL_ERROR>());
+    }
+}
+
+TEST_F(LocalExchangerTest, LocalMergeSortExchanger) {
+    int num_sink = 4;
+    int num_sources = 4;
+    int num_partitions = 4;
+    int free_block_limit = 0;
+    const auto expect_block_bytes = 128;
+    const auto num_blocks = 2;
+    std::vector<bool> is_asc_order;
+    std::vector<bool> nulls_first;
+    vectorized::VExprContextSPtrs ordering_expr_ctxs;
+    auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF,
+                                  TTypeDescBuilder()
+                                          .set_types(TTypeNodeBuilder()
+                                                             
.set_type(TTypeNodeType::SCALAR)
+                                                             
.set_scalar_type(TPrimitiveType::INT)
+                                                             .build())
+                                          .build(),
+                                  0)
+                         .set_slot_ref(TSlotRefBuilder(0, 0).build())
+                         .build();
+    auto slot = doris::vectorized::VSlotRef::create_shared(texpr);
+    slot->_column_id = 0;
+    slot->_prepare_finished = true;
+    
ordering_expr_ctxs.push_back(std::make_shared<doris::vectorized::VExprContext>(slot));
+    is_asc_order.push_back(true);
+    nulls_first.push_back(true);
+    ordering_expr_ctxs[0]->_prepared = true;
+    ordering_expr_ctxs[0]->_opened = true;
+
+    std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> 
_sink_local_states;
+    std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states;
+    _sink_local_states.resize(num_sink);
+    _local_states.resize(num_sources);
+    config::local_exchange_buffer_mem_limit = expect_block_bytes * 
num_partitions;
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto shared_state = 
LocalMergeExchangeSharedState::create_shared(num_partitions);
+    shared_state->exchanger = LocalMergeSortExchanger::create_unique(
+            LocalMergeSortExchanger::MergeInfo {is_asc_order, nulls_first, -1, 
0,
+                                                ordering_expr_ctxs},
+            num_sink, num_partitions, free_block_limit);
+    auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
+    sink_dep->set_shared_state(shared_state.get());
+    shared_state->sink_deps.push_back(sink_dep);
+    shared_state->create_dependencies(0);
+    auto& sink_deps = shared_state->sink_deps;
+    EXPECT_EQ(sink_deps.size(), num_sink);
+
+    auto* exchanger = (LocalMergeSortExchanger*)shared_state->exchanger.get();
+    for (size_t i = 0; i < num_sink; i++) {
+        auto compute_hash_value_timer =
+                ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i));
+        auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + 
std::to_string(i));
+        _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, 
nullptr));
+        _sink_local_states[i]->_exchanger = shared_state->exchanger.get();
+        _sink_local_states[i]->_compute_hash_value_timer = 
compute_hash_value_timer;
+        _sink_local_states[i]->_distribute_timer = distribute_timer;
+        _sink_local_states[i]->_channel_id = i;
+        _sink_local_states[i]->_shared_state = shared_state.get();
+        _sink_local_states[i]->_dependency = sink_deps[i].get();
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        auto get_block_failed_counter =
+                ADD_TIMER(profile, "_get_block_failed_counter" + 
std::to_string(i));
+        auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + 
std::to_string(i));
+        _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, 
nullptr));
+        _local_states[i]->_runtime_profile =
+                std::make_unique<RuntimeProfile>("source_profile " + 
std::to_string(i));
+        _local_states[i]->_exchanger = shared_state->exchanger.get();
+        _local_states[i]->_get_block_failed_counter = get_block_failed_counter;
+        _local_states[i]->_copy_data_timer = copy_data_timer;
+        _local_states[i]->_channel_id = i;
+        _local_states[i]->_shared_state = shared_state.get();
+        _local_states[i]->_dependency = 
shared_state->get_dep_by_channel_id(i).front().get();
+        _local_states[i]->_memory_used_counter = 
profile->AddHighWaterMarkCounter(
+                "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+        shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter;
+    }
+    {
+        // Enqueue 2 blocks with 10 rows for each data queue.
+        for (size_t i = 0; i < num_partitions; i++) {
+            for (size_t j = 0; j < num_blocks; j++) {
+                vectorized::Block in_block;
+                vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+                auto int_col0 = vectorized::ColumnInt32::create();
+                int_col0->insert_many_vals(i, 10);
+
+                in_block.insert({std::move(int_col0), int_type, 
"test_int_col0"});
+                EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes());
+                bool in_eos = j == num_blocks - 1;
+                EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, 
in_eos,
+                                          
{_sink_local_states[i]->_compute_hash_value_timer,
+                                           
_sink_local_states[i]->_distribute_timer, nullptr},
+                                          {&_sink_local_states[i]->_channel_id,
+                                           
_sink_local_states[i]->_partitioner.get(),
+                                           _sink_local_states[i].get(), 
nullptr}),
+                          Status::OK());
+                EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), j == 0);
+                EXPECT_EQ(_sink_local_states[i]->_channel_id, i);
+            }
+        }
+    }
+    for (size_t i = 0; i < num_sink; i++) {
+        shared_state->sub_running_sink_operators();
+    }
+
+    {
+        for (size_t i = 0; i < num_sources; i++) {
+            EXPECT_EQ(shared_state->mem_counters[i]->value(), 
shared_state->_queues_mem_usage[i]);
+            EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+        }
+        // Dequeue from data queue and accumulate rows if rows is smaller than 
batch_size.
+        EXPECT_EQ(exchanger->_merger, nullptr);
+        for (size_t i = 0; i < num_sources; i++) {
+            for (size_t j = 0; j < num_sink * num_blocks + 1; j++) {
+                bool eos = false;
+                vectorized::Block block;
+                EXPECT_EQ(
+                        exchanger->get_block(_runtime_state.get(), &block, 
&eos,
+                                             {nullptr, nullptr, 
_local_states[i]->_copy_data_timer},
+                                             
{cast_set<int>(_local_states[i]->_channel_id),
+                                              _local_states[i].get()}),
+                        Status::OK());
+                EXPECT_TRUE(exchanger->_merger != nullptr);
+                EXPECT_EQ(block.rows(), i > 0 || j == num_sink * num_blocks ? 
0 : 10);
+                EXPECT_EQ(eos, i > 0 || j == num_sink * num_blocks);
+                EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+            }
+        }
+    }
+
+    for (size_t i = 0; i < num_sink; i++) {
+        EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), true);
+    }
+}
+
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to