This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7296a21dbec [pipelineX](fix) Fix incorrect partition number (#29963)
7296a21dbec is described below

commit 7296a21dbecb67e92d0f07ff81a11e6aa7b6835e
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Jan 15 11:49:45 2024 +0800

    [pipelineX](fix) Fix incorrect partition number (#29963)
---
 .../pipeline/pipeline_x/pipeline_x_fragment_context.cpp   | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index b479e1d9334..e2f1d9742b4 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -728,21 +728,24 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
     // 1. Create a new pipeline with local exchange sink.
     DataSinkOperatorXPtr sink;
     auto sink_id = next_sink_operator_id();
+    const bool is_shuffled_hash_join = operator_xs.size() > idx
+                                               ? 
operator_xs[idx]->is_shuffled_hash_join()
+                                               : 
cur_pipe->sink_x()->is_shuffled_hash_join();
     sink.reset(new LocalExchangeSinkOperatorX(
-            sink_id, local_exchange_id, _total_instances, 
data_distribution.partition_exprs,
-            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
+            sink_id, local_exchange_id, is_shuffled_hash_join ? 
_total_instances : _num_instances,
+            data_distribution.partition_exprs, bucket_seq_to_instance_idx,
+            shuffle_idx_to_instance_idx));
     RETURN_IF_ERROR(new_pip->set_sink(sink));
     
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, 
num_buckets,
-                                            operator_xs.size() > idx
-                                                    ? 
operator_xs[idx]->is_shuffled_hash_join()
-                                                    : 
cur_pipe->sink_x()->is_shuffled_hash_join()));
+                                            is_shuffled_hash_join));
 
     // 2. Create and initialize LocalExchangeSharedState.
     auto shared_state = LocalExchangeSharedState::create_shared();
     switch (data_distribution.distribution_type) {
     case ExchangeType::HASH_SHUFFLE:
         shared_state->exchanger = ShuffleExchanger::create_unique(
-                std::max(cur_pipe->num_tasks(), _num_instances), 
_total_instances);
+                std::max(cur_pipe->num_tasks(), _num_instances),
+                is_shuffled_hash_join ? _total_instances : _num_instances);
         break;
     case ExchangeType::BUCKET_HASH_SHUFFLE:
         shared_state->exchanger = BucketShuffleExchanger::create_unique(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to