This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a3279a26044 [refine](exchange)  Use is_merge from FE for judgment 
instead of relying on the operator in BE. (#45592)
a3279a26044 is described below

commit a3279a260441692cd0494c0eb8a93e5f97aa2945
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Thu Dec 19 17:04:03 2024 +0800

    [refine](exchange)  Use is_merge from FE for judgment instead of relying on 
the operator in BE. (#45592)
    
    ### What problem does this PR solve?
    
    Previously, determining whether the receiver is a merge exchange relied
    on checking if the specific operator was a sort node.
    However, this approach is incorrect because there are many types of sort
    operators: regular sort, partitioned sort, and spill sort.
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 10 ++--------
 be/src/pipeline/exec/exchange_sink_operator.h   |  4 +++-
 2 files changed, 5 insertions(+), 9 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e7fed76be8f..cc789f6e25b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -32,7 +32,6 @@
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/sort_source_operator.h"
 #include "pipeline/local_exchange/local_exchange_sink_operator.h"
-#include "pipeline/local_exchange/local_exchange_source_operator.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "util/runtime_profile.h"
 #include "util/uid_util.h"
@@ -279,6 +278,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
           _tablet_sink_txn_id(sink.tablet_sink_txn_id),
           _t_tablet_sink_exprs(&sink.tablet_sink_exprs),
           _enable_local_merge_sort(state->enable_local_merge_sort()),
+          _dest_is_merge(sink.__isset.is_merge && sink.is_merge),
           _fragment_instance_ids(fragment_instance_ids) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
@@ -571,19 +571,13 @@ std::shared_ptr<ExchangeSinkBuffer> 
ExchangeSinkOperatorX::_create_buffer(
 // Therefore, a shared sink buffer is used here to limit the number of 
concurrent RPCs.
 // (Note: This does not reduce the total number of RPCs.)
 // In a merge sort scenario, there are only n RPCs, so a shared sink buffer is 
not needed.
-/// TODO: Modify this to let FE handle the judgment instead of BE.
 std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer(
         InstanceLoId sender_ins_id) {
-    if (!_child) {
-        throw doris::Exception(ErrorCode::INTERNAL_ERROR,
-                               "ExchangeSinkOperatorX did not correctly set 
the child.");
-    }
     // When the child is SortSourceOperatorX or LocalExchangeSourceOperatorX,
     // it is an order-by scenario.
     // In this case, there is only one target instance, and no n * n RPC 
concurrency will occur.
     // Therefore, sharing a sink buffer is not necessary.
-    if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child) ||
-        std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {
+    if (_dest_is_merge) {
         return _create_buffer({sender_ins_id});
     }
     if (_state->enable_shared_exchange_sink_buffer()) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 85575beb9f7..3d6eeb4b39e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -205,7 +205,6 @@ public:
     // Therefore, a shared sink buffer is used here to limit the number of 
concurrent RPCs.
     // (Note: This does not reduce the total number of RPCs.)
     // In a merge sort scenario, there are only n RPCs, so a shared sink 
buffer is not needed.
-    /// TODO: Modify this to let FE handle the judgment instead of BE.
     std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId 
sender_ins_id);
     vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return 
_tablet_sink_expr_ctxs; }
 
@@ -260,6 +259,9 @@ private:
     size_t _data_processed = 0;
     int _writer_count = 1;
     const bool _enable_local_merge_sort;
+    // If dest_is_merge is true, it indicates that the corresponding receiver 
is a VMERGING-EXCHANGE.
+    // The receiver will sort the collected data, so the sender must ensure 
that the data sent is ordered.
+    const bool _dest_is_merge;
     const std::vector<TUniqueId>& _fragment_instance_ids;
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to