github-actions[bot] commented on code in PR #63588:
URL: https://github.com/apache/doris/pull/63588#discussion_r3296465078


##########
be/test/exec/pipeline/local_exchanger_test.cpp:
##########
@@ -1162,6 +1162,139 @@ TEST_F(LocalExchangerTest, 
AdaptivePassthroughExchanger) {
     }
 }
 
+TEST_F(LocalExchangerTest, 
AdaptivePassthroughExchangerReturnsCopiedSliceWithSharedOutputColumn) {
+    int num_sink = 1;
+    int num_sources = 4;
+    int free_block_limit = 0;
+    const auto num_rows_per_block = num_sources * 3;
+    const auto expected_rows_per_source = num_rows_per_block / num_sources;
+    config::local_exchange_buffer_mem_limit = 1024 * 1024;
+
+    std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> 
sink_local_states;
+    std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> 
source_local_states;
+    sink_local_states.resize(num_sink);
+    source_local_states.resize(num_sources);
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto shared_state = LocalExchangeSharedState::create_shared(num_sources);
+    shared_state->exchanger =
+            AdaptivePassthroughExchanger::create_unique(num_sink, num_sources, 
free_block_limit);
+    auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
+    sink_dep->set_shared_state(shared_state.get());
+    shared_state->sink_deps.push_back(sink_dep);
+    shared_state->create_source_dependencies(num_sources, 0, 0, "TEST");
+
+    auto* exchanger = 
(AdaptivePassthroughExchanger*)shared_state->exchanger.get();
+    for (size_t i = 0; i < num_sink; i++) {
+        sink_local_states[i] = 
std::make_unique<LocalExchangeSinkLocalState>(nullptr, nullptr);
+        sink_local_states[i]->_exchanger = shared_state->exchanger.get();
+        sink_local_states[i]->_compute_hash_value_timer =
+                ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i));
+        sink_local_states[i]->_distribute_timer =
+                ADD_TIMER(profile, "distribute_timer" + std::to_string(i));
+        sink_local_states[i]->_channel_id = i;
+        sink_local_states[i]->_ins_idx = i;
+        sink_local_states[i]->_shared_state = shared_state.get();
+        sink_local_states[i]->_dependency = sink_dep.get();
+        sink_local_states[i]->_memory_used_counter = 
profile->AddHighWaterMarkCounter(
+                "SinkMemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        source_local_states[i] =
+                
std::make_unique<LocalExchangeSourceLocalState>(_runtime_state.get(), nullptr);
+        source_local_states[i]->_exchanger = shared_state->exchanger.get();
+        source_local_states[i]->_get_block_failed_counter =
+                ADD_TIMER(profile, "_get_block_failed_counter" + 
std::to_string(i));
+        source_local_states[i]->_copy_data_timer =
+                ADD_TIMER(profile, "_copy_data_timer" + std::to_string(i));
+        source_local_states[i]->_channel_id = i;
+        source_local_states[i]->_shared_state = shared_state.get();
+        source_local_states[i]->_dependency = 
shared_state->get_dep_by_channel_id(i).front().get();
+        source_local_states[i]->_memory_used_counter = 
profile->AddHighWaterMarkCounter(
+                "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+        shared_state->mem_counters[i] = 
source_local_states[i]->_memory_used_counter;
+    }
+
+    Block in_block;
+    DataTypePtr int_type = std::make_shared<DataTypeInt32>();
+    auto int_col0 = ColumnInt32::create();
+    for (int i = 0; i < num_rows_per_block; ++i) {
+        int_col0->insert_value(i);
+    }
+    in_block.insert({std::move(int_col0), int_type, "test_int_col0"});
+    bool in_eos = false;
+    SinkInfo sink_info = {.channel_id = &sink_local_states[0]->_channel_id,
+                          .partitioner = 
sink_local_states[0]->_partitioner.get(),
+                          .local_state = sink_local_states[0].get(),
+                          .shuffle_idx_to_instance_idx = nullptr,
+                          .ins_idx = 0};
+    ASSERT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos,
+                              {sink_local_states[0]->_compute_hash_value_timer,
+                               sink_local_states[0]->_distribute_timer, 
nullptr},
+                              sink_info),
+              Status::OK());
+
+    Block output_block;
+    output_block.insert({ColumnInt32::create(), int_type, "test_int_col0"});
+    ColumnPtr shared_empty_column = output_block.get_by_position(0).column;
+    ASSERT_EQ(shared_empty_column->size(), 0);
+
+    bool eos = false;
+    ASSERT_EQ(exchanger->get_block(_runtime_state.get(), &output_block, &eos,
+                                   {nullptr, nullptr, 
source_local_states[0]->_copy_data_timer},
+                                   
{cast_set<int>(source_local_states[0]->_channel_id),
+                                    source_local_states[0].get()}),
+              Status::OK());
+    EXPECT_FALSE(eos);
+    ASSERT_EQ(output_block.rows(), expected_rows_per_source);
+
+    const auto& result_column =
+            assert_cast<const 
ColumnInt32&>(*output_block.get_by_position(0).column);
+    EXPECT_EQ(result_column.get_element(0), 0);
+    EXPECT_EQ(result_column.get_element(1), 4);
+    EXPECT_EQ(result_column.get_element(2), 8);
+}
+
+TEST_F(LocalExchangerTest, CheckNoDataLeftDetectsQueueResidual) {
+    auto shared_state = LocalExchangeSharedState::create_shared(1);
+    shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(1, 
1, 0);
+    auto* exchanger = 
(AdaptivePassthroughExchanger*)shared_state->exchanger.get();
+
+    Block in_block;
+    DataTypePtr int_type = std::make_shared<DataTypeInt32>();
+    auto int_col0 = ColumnInt32::create();
+    int_col0->insert_value(1);
+    in_block.insert({std::move(int_col0), int_type, "test_int_col0"});
+
+    auto wrapper = 
ExchangerBase::BlockWrapper::create_shared(std::move(in_block), nullptr, -1);
+    auto row_idx = std::make_shared<PODArray<uint32_t>>(1);
+    (*row_idx)[0] = 0;
+    ASSERT_TRUE(exchanger->_data_queue[0].enqueue(

Review Comment:
   This test target will not compile because it accesses 
`Exchanger<PartitionedBlock>::_data_queue`, which is a protected member of the 
base class. The next test has the same compile-time problem with 
`AdaptivePassthroughExchanger::_tmp_block` and `_tmp_eos`, which are private. 
Please construct these residual states through the public `sink`/`get_block` 
behavior (or otherwise add a deliberate test-only access mechanism) instead of 
directly reaching into class internals.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to