This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7296a21dbec [pipelineX](fix) Fix incorrect partition number (#29963) 7296a21dbec is described below commit 7296a21dbecb67e92d0f07ff81a11e6aa7b6835e Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Jan 15 11:49:45 2024 +0800 [pipelineX](fix) Fix incorrect partition number (#29963) --- .../pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index b479e1d9334..e2f1d9742b4 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -728,21 +728,24 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( // 1. Create a new pipeline with local exchange sink. DataSinkOperatorXPtr sink; auto sink_id = next_sink_operator_id(); + const bool is_shuffled_hash_join = operator_xs.size() > idx + ? operator_xs[idx]->is_shuffled_hash_join() + : cur_pipe->sink_x()->is_shuffled_hash_join(); sink.reset(new LocalExchangeSinkOperatorX( - sink_id, local_exchange_id, _total_instances, data_distribution.partition_exprs, - bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); + sink_id, local_exchange_id, is_shuffled_hash_join ? _total_instances : _num_instances, + data_distribution.partition_exprs, bucket_seq_to_instance_idx, + shuffle_idx_to_instance_idx)); RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets, - operator_xs.size() > idx - ? operator_xs[idx]->is_shuffled_hash_join() - : cur_pipe->sink_x()->is_shuffled_hash_join())); + is_shuffled_hash_join)); // 2. Create and initialize LocalExchangeSharedState. auto shared_state = LocalExchangeSharedState::create_shared(); switch (data_distribution.distribution_type) { case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( - std::max(cur_pipe->num_tasks(), _num_instances), _total_instances); + std::max(cur_pipe->num_tasks(), _num_instances), + is_shuffled_hash_join ? _total_instances : _num_instances); break; case ExchangeType::BUCKET_HASH_SHUFFLE: shared_state->exchanger = BucketShuffleExchanger::create_unique( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org