This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d6d938ea75 [pipelineX](fix) Fix correctness problem due to local hash 
shuffle (#29881)
4d6d938ea75 is described below

commit 4d6d938ea75eaed8591f5b682843cea252e7912f
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Jan 11 22:28:00 2024 +0800

    [pipelineX](fix) Fix correctness problem due to local hash shuffle (#29881)
---
 .../local_exchange/local_exchange_sink_operator.h  |  7 ++--
 .../pipeline_x/local_exchange/local_exchanger.cpp  |  8 +++--
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 38 ++++++++++++++--------
 .../pipeline_x/pipeline_x_fragment_context.h       |  8 ++++-
 .../main/java/org/apache/doris/qe/Coordinator.java | 14 +++++++-
 gensrc/thrift/PaloInternalService.thrift           |  2 ++
 6 files changed, 56 insertions(+), 21 deletions(-)

diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 87afb2b098d..daf75c966af 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -86,11 +86,13 @@ public:
     using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>;
     LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions,
                                const std::vector<TExpr>& texprs,
-                               const std::map<int, int>& 
bucket_seq_to_instance_idx)
+                               const std::map<int, int>& 
bucket_seq_to_instance_idx,
+                               const std::map<int, int>& 
shuffle_idx_to_instance_idx)
             : Base(sink_id, dest_id, dest_id),
               _num_partitions(num_partitions),
               _texprs(texprs),
-              _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
+              _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx),
+              _shuffle_idx_to_instance_idx(shuffle_idx_to_instance_idx) {}
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override {
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
@@ -143,6 +145,7 @@ private:
     const std::vector<TExpr>& _texprs;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
     const std::map<int, int> _bucket_seq_to_instance_idx;
+    const std::map<int, int> _shuffle_idx_to_instance_idx;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index b2f1bb00ebe..602020c4882 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -118,14 +118,16 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
     
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
     new_block_wrapper->ref(_num_partitions);
     if (get_type() == ExchangeType::HASH_SHUFFLE) {
+        auto map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
+                           ._shuffle_idx_to_instance_idx;
         for (size_t i = 0; i < _num_partitions; i++) {
             size_t start = local_state._partition_rows_histogram[i];
             size_t size = local_state._partition_rows_histogram[i + 1] - start;
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
-                        i, new_block_wrapper->data_block.allocated_bytes(), 
false);
-                data_queue[i].enqueue({new_block_wrapper, {row_idx, start, 
size}});
-                local_state._shared_state->set_ready_to_read(i);
+                        map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
+                data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}});
+                local_state._shared_state->set_ready_to_read(map[i]);
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 4cc00312697..ffaccebe898 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -163,6 +163,7 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
         return Status::InternalError("Already prepared");
     }
     _num_instances = request.local_params.size();
+    _total_instances = request.__isset.total_instances ? 
request.total_instances : _num_instances;
     _runtime_profile.reset(new RuntimeProfile("PipelineContext"));
     _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
     SCOPED_TIMER(_prepare_timer);
@@ -235,8 +236,9 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
         
static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
     }
     if (_enable_local_shuffle()) {
-        RETURN_IF_ERROR(
-                _plan_local_exchange(request.num_buckets, 
request.bucket_seq_to_instance_idx));
+        RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
+                                             
request.bucket_seq_to_instance_idx,
+                                             
request.shuffle_idx_to_instance_idx));
     }
     // 4. Initialize global states in pipelines.
     for (PipelinePtr& pipeline : _pipelines) {
@@ -254,7 +256,8 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
 }
 
 Status PipelineXFragmentContext::_plan_local_exchange(
-        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx) 
{
+        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
+        const std::map<int, int>& shuffle_idx_to_instance_idx) {
     for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) {
         _pipelines[pip_idx]->init_data_distribution();
         // Set property if child pipeline is not join operator's child.
@@ -274,6 +277,7 @@ Status PipelineXFragmentContext::_plan_local_exchange(
                         ? _num_instances
                         : num_buckets,
                 pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx,
+                shuffle_idx_to_instance_idx,
                 
_pipelines[pip_idx]->operator_xs().front()->ignore_data_hash_distribution()));
     }
     return Status::OK();
@@ -282,6 +286,7 @@ Status PipelineXFragmentContext::_plan_local_exchange(
 Status PipelineXFragmentContext::_plan_local_exchange(
         int num_buckets, int pip_idx, PipelinePtr pip,
         const std::map<int, int>& bucket_seq_to_instance_idx,
+        const std::map<int, int>& shuffle_idx_to_instance_idx,
         const bool ignore_data_hash_distribution) {
     int idx = 1;
     bool do_local_exchange = false;
@@ -294,7 +299,8 @@ Status PipelineXFragmentContext::_plan_local_exchange(
                 RETURN_IF_ERROR(_add_local_exchange(
                         pip_idx, idx, ops[idx]->node_id(), 
_runtime_state->obj_pool(), pip,
                         ops[idx]->required_data_distribution(), 
&do_local_exchange, num_buckets,
-                        bucket_seq_to_instance_idx, 
ignore_data_hash_distribution));
+                        bucket_seq_to_instance_idx, 
shuffle_idx_to_instance_idx,
+                        ignore_data_hash_distribution));
             }
             if (do_local_exchange) {
                 // If local exchange is needed for current operator, we will 
split this pipeline to
@@ -311,7 +317,8 @@ Status PipelineXFragmentContext::_plan_local_exchange(
         RETURN_IF_ERROR(_add_local_exchange(
                 pip_idx, idx, pip->sink_x()->node_id(), 
_runtime_state->obj_pool(), pip,
                 pip->sink_x()->required_data_distribution(), 
&do_local_exchange, num_buckets,
-                bucket_seq_to_instance_idx, ignore_data_hash_distribution));
+                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
+                ignore_data_hash_distribution));
     }
     return Status::OK();
 }
@@ -713,6 +720,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
         int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
         DataDistribution data_distribution, bool* do_local_exchange, int 
num_buckets,
         const std::map<int, int>& bucket_seq_to_instance_idx,
+        const std::map<int, int>& shuffle_idx_to_instance_idx,
         const bool ignore_data_hash_distribution) {
     auto& operator_xs = cur_pipe->operator_xs();
     const auto downstream_pipeline_id = cur_pipe->id();
@@ -720,9 +728,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
     // 1. Create a new pipeline with local exchange sink.
     DataSinkOperatorXPtr sink;
     auto sink_id = next_sink_operator_id();
-    sink.reset(new LocalExchangeSinkOperatorX(sink_id, local_exchange_id, 
_num_instances,
-                                              
data_distribution.partition_exprs,
-                                              bucket_seq_to_instance_idx));
+    sink.reset(new LocalExchangeSinkOperatorX(
+            sink_id, local_exchange_id, _total_instances, 
data_distribution.partition_exprs,
+            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
     RETURN_IF_ERROR(new_pip->set_sink(sink));
     
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, 
num_buckets));
 
@@ -731,7 +739,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
     switch (data_distribution.distribution_type) {
     case ExchangeType::HASH_SHUFFLE:
         shared_state->exchanger = ShuffleExchanger::create_unique(
-                std::max(cur_pipe->num_tasks(), _num_instances), 
_num_instances);
+                std::max(cur_pipe->num_tasks(), _num_instances), 
_total_instances);
         break;
     case ExchangeType::BUCKET_HASH_SHUFFLE:
         shared_state->exchanger = BucketShuffleExchanger::create_unique(
@@ -826,7 +834,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
 Status PipelineXFragmentContext::_add_local_exchange(
         int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr 
cur_pipe,
         DataDistribution data_distribution, bool* do_local_exchange, int 
num_buckets,
-        const std::map<int, int>& bucket_seq_to_instance_idx, const bool 
ignore_data_distribution) {
+        const std::map<int, int>& bucket_seq_to_instance_idx,
+        const std::map<int, int>& shuffle_idx_to_instance_idx,
+        const bool ignore_data_distribution) {
     DCHECK(_enable_local_shuffle());
     if (_num_instances <= 1) {
         return Status::OK();
@@ -840,9 +850,9 @@ Status PipelineXFragmentContext::_add_local_exchange(
     auto& operator_xs = cur_pipe->operator_xs();
     auto total_op_num = operator_xs.size();
     auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
-    RETURN_IF_ERROR(_add_local_exchange_impl(idx, pool, cur_pipe, new_pip, 
data_distribution,
-                                             do_local_exchange, num_buckets,
-                                             bucket_seq_to_instance_idx, 
ignore_data_distribution));
+    RETURN_IF_ERROR(_add_local_exchange_impl(
+            idx, pool, cur_pipe, new_pip, data_distribution, 
do_local_exchange, num_buckets,
+            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, 
ignore_data_distribution));
 
     CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() + 
new_pip->operator_xs().size())
             << "total_op_num: " << total_op_num
@@ -855,7 +865,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
         RETURN_IF_ERROR(_add_local_exchange_impl(
                 new_pip->operator_xs().size(), pool, new_pip, 
add_pipeline(new_pip, pip_idx + 2),
                 DataDistribution(ExchangeType::PASSTHROUGH), 
do_local_exchange, num_buckets,
-                bucket_seq_to_instance_idx, ignore_data_distribution));
+                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, 
ignore_data_distribution));
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 34320d64f38..92178d359d9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -126,6 +126,7 @@ private:
                                PipelinePtr cur_pipe, DataDistribution 
data_distribution,
                                bool* do_local_exchange, int num_buckets,
                                const std::map<int, int>& 
bucket_seq_to_instance_idx,
+                               const std::map<int, int>& 
shuffle_idx_to_instance_idx,
                                const bool ignore_data_distribution);
     void _inherit_pipeline_properties(const DataDistribution& 
data_distribution,
                                       PipelinePtr pipe_with_source, 
PipelinePtr pipe_with_sink);
@@ -133,6 +134,7 @@ private:
                                     PipelinePtr new_pipe, DataDistribution 
data_distribution,
                                     bool* do_local_exchange, int num_buckets,
                                     const std::map<int, int>& 
bucket_seq_to_instance_idx,
+                                    const std::map<int, int>& 
shuffle_idx_to_instance_idx,
                                     const bool ignore_data_distribution);
 
     [[nodiscard]] Status _build_pipelines(ObjectPool* pool,
@@ -160,9 +162,11 @@ private:
                              RuntimeState* state, DescriptorTbl& desc_tbl,
                              PipelineId cur_pipeline_id);
     Status _plan_local_exchange(int num_buckets,
-                                const std::map<int, int>& 
bucket_seq_to_instance_idx);
+                                const std::map<int, int>& 
bucket_seq_to_instance_idx,
+                                const std::map<int, int>& 
shuffle_idx_to_instance_idx);
     Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip,
                                 const std::map<int, int>& 
bucket_seq_to_instance_idx,
+                                const std::map<int, int>& 
shuffle_idx_to_instance_idx,
                                 const bool ignore_data_distribution);
 
     bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
@@ -239,6 +243,8 @@ private:
     std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
 
     std::vector<std::unique_ptr<RuntimeFilterParamsContext>> 
_runtime_filter_states;
+
+    int _total_instances = -1;
 };
 
 } // namespace pipeline
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 12be2b2d8ec..5e2d0ef85c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1611,6 +1611,7 @@ public class Coordinator implements CoordInterface {
                             dest.fragment_instance_id = 
instanceExecParams.instanceId;
                             dest.server = toRpcHost(instanceExecParams.host);
                             
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+                            instanceExecParams.recvrId = 
params.destinations.size();
                             break;
                         }
                     }
@@ -1630,6 +1631,7 @@ public class Coordinator implements CoordInterface {
                             destHosts.put(param.host, param);
                             param.buildHashTableForBroadcastJoin = true;
                             TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
+                            param.recvrId = params.destinations.size();
                             dest.fragment_instance_id = param.instanceId;
                             try {
                                 dest.server = toRpcHost(param.host);
@@ -1653,6 +1655,7 @@ public class Coordinator implements CoordInterface {
                         dest.fragment_instance_id = 
destParams.instanceExecParams.get(j).instanceId;
                         dest.server = 
toRpcHost(destParams.instanceExecParams.get(j).host);
                         
dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
+                        destParams.instanceExecParams.get(j).recvrId = 
params.destinations.size();
                         params.destinations.add(dest);
                     }
                 }
@@ -1732,6 +1735,7 @@ public class Coordinator implements CoordInterface {
                                 dest.fragment_instance_id = 
instanceExecParams.instanceId;
                                 dest.server = 
toRpcHost(instanceExecParams.host);
                                 
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
+                                instanceExecParams.recvrId = 
params.destinations.size();
                                 break;
                             }
                         }
@@ -1752,6 +1756,7 @@ public class Coordinator implements CoordInterface {
                             param.buildHashTableForBroadcastJoin = true;
                             TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
                             dest.fragment_instance_id = param.instanceId;
+                            param.recvrId = params.destinations.size();
                             try {
                                 dest.server = toRpcHost(param.host);
                                 dest.setBrpcServer(toBrpcHost(param.host));
@@ -1773,6 +1778,7 @@ public class Coordinator implements CoordInterface {
                         dest.fragment_instance_id = 
destParams.instanceExecParams.get(j).instanceId;
                         dest.server = 
toRpcHost(destParams.instanceExecParams.get(j).host);
                         dest.brpc_server = 
toBrpcHost(destParams.instanceExecParams.get(j).host);
+                        destParams.instanceExecParams.get(j).recvrId = 
params.destinations.size();
                         destinations.add(dest);
                     }
                 }
@@ -3755,22 +3761,26 @@ public class Coordinator implements CoordInterface {
                     params.setFileScanParams(fileScanRangeParamsMap);
                     params.setNumBuckets(fragment.getBucketNum());
                     params.setPerNodeSharedScans(perNodeSharedScans);
+                    params.setTotalInstances(instanceExecParams.size());
                     if (ignoreDataDistribution) {
                         params.setParallelInstances(parallelTasksNum);
                     }
                     res.put(instanceExecParam.host, params);
                     
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, 
Integer>());
+                    
res.get(instanceExecParam.host).setShuffleIdxToInstanceIdx(new HashMap<Integer, 
Integer>());
                     instanceIdx.put(instanceExecParam.host, 0);
                 }
                 // Set each bucket belongs to which instance on this BE.
                 // This is used for LocalExchange(BUCKET_HASH_SHUFFLE).
                 int instanceId = instanceIdx.get(instanceExecParam.host);
+
                 for (int bucket : instanceExecParam.bucketSeqSet) {
                     
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, 
instanceId);
-
                 }
                 instanceIdx.replace(instanceExecParam.host, ++instanceId);
                 TPipelineFragmentParams params = 
res.get(instanceExecParam.host);
+                
res.get(instanceExecParam.host).getShuffleIdxToInstanceIdx().put(instanceExecParam.recvrId,
+                        params.getLocalParams().size());
                 TPipelineInstanceParams localParams = new 
TPipelineInstanceParams();
 
                 
localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
@@ -3919,6 +3929,8 @@ public class Coordinator implements CoordInterface {
 
         boolean buildHashTableForBroadcastJoin = false;
 
+        int recvrId = -1;
+
         List<TUniqueId> instancesSharingHashTable = Lists.newArrayList();
 
         public void addBucketSeq(int bucketSeq) {
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 818643f6ba6..7559451e373 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -715,6 +715,8 @@ struct TPipelineFragmentParams {
   35: optional map<i32, i32> bucket_seq_to_instance_idx
   36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
   37: optional i32 parallel_instances
+  38: optional i32 total_instances
+  39: optional map<i32, i32> shuffle_idx_to_instance_idx
 
   // For cloud
   1000: optional bool is_mow_table;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to