This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4d6d938ea75 [pipelineX](fix) Fix correctness problem due to local hash shuffle (#29881) 4d6d938ea75 is described below commit 4d6d938ea75eaed8591f5b682843cea252e7912f Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Jan 11 22:28:00 2024 +0800 [pipelineX](fix) Fix correctness problem due to local hash shuffle (#29881) --- .../local_exchange/local_exchange_sink_operator.h | 7 ++-- .../pipeline_x/local_exchange/local_exchanger.cpp | 8 +++-- .../pipeline_x/pipeline_x_fragment_context.cpp | 38 ++++++++++++++-------- .../pipeline_x/pipeline_x_fragment_context.h | 8 ++++- .../main/java/org/apache/doris/qe/Coordinator.java | 14 +++++++- gensrc/thrift/PaloInternalService.thrift | 2 ++ 6 files changed, 56 insertions(+), 21 deletions(-) diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index 87afb2b098d..daf75c966af 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -86,11 +86,13 @@ public: using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>; LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions, const std::vector<TExpr>& texprs, - const std::map<int, int>& bucket_seq_to_instance_idx) + const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx) : Base(sink_id, dest_id, dest_id), _num_partitions(num_partitions), _texprs(texprs), - _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {} + _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx), + _shuffle_idx_to_instance_idx(shuffle_idx_to_instance_idx) {} Status init(const TPlanNode& tnode, RuntimeState* state) override { return Status::InternalError("{} should not init with TPlanNode", Base::_name); @@ -143,6 +145,7 @@ private: const std::vector<TExpr>& _texprs; std::unique_ptr<vectorized::PartitionerBase> _partitioner; const std::map<int, int> _bucket_seq_to_instance_idx; + const std::map<int, int> _shuffle_idx_to_instance_idx; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index b2f1bb00ebe..602020c4882 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -118,14 +118,16 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes()); new_block_wrapper->ref(_num_partitions); if (get_type() == ExchangeType::HASH_SHUFFLE) { + auto map = local_state._parent->cast<LocalExchangeSinkOperatorX>() + ._shuffle_idx_to_instance_idx; for (size_t i = 0; i < _num_partitions; i++) { size_t start = local_state._partition_rows_histogram[i]; size_t size = local_state._partition_rows_histogram[i + 1] - start; if (size > 0) { local_state._shared_state->add_mem_usage( - i, new_block_wrapper->data_block.allocated_bytes(), false); - data_queue[i].enqueue({new_block_wrapper, {row_idx, start, size}}); - local_state._shared_state->set_ready_to_read(i); + map[i], new_block_wrapper->data_block.allocated_bytes(), false); + data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}}); + local_state._shared_state->set_ready_to_read(map[i]); } else { new_block_wrapper->unref(local_state._shared_state); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 4cc00312697..ffaccebe898 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -163,6 +163,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r return Status::InternalError("Already prepared"); } _num_instances = request.local_params.size(); + _total_instances = request.__isset.total_instances ? request.total_instances : _num_instances; _runtime_profile.reset(new RuntimeProfile("PipelineContext")); _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime"); SCOPED_TIMER(_prepare_timer); @@ -235,8 +236,9 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back())); } if (_enable_local_shuffle()) { - RETURN_IF_ERROR( - _plan_local_exchange(request.num_buckets, request.bucket_seq_to_instance_idx)); + RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets, + request.bucket_seq_to_instance_idx, + request.shuffle_idx_to_instance_idx)); } // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { @@ -254,7 +256,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r } Status PipelineXFragmentContext::_plan_local_exchange( - int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx) { + int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx) { for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) { _pipelines[pip_idx]->init_data_distribution(); // Set property if child pipeline is not join operator's child. @@ -274,6 +277,7 @@ Status PipelineXFragmentContext::_plan_local_exchange( ? _num_instances : num_buckets, pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx, + shuffle_idx_to_instance_idx, _pipelines[pip_idx]->operator_xs().front()->ignore_data_hash_distribution())); } return Status::OK(); @@ -282,6 +286,7 @@ Status PipelineXFragmentContext::_plan_local_exchange( Status PipelineXFragmentContext::_plan_local_exchange( int num_buckets, int pip_idx, PipelinePtr pip, const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx, const bool ignore_data_hash_distribution) { int idx = 1; bool do_local_exchange = false; @@ -294,7 +299,8 @@ Status PipelineXFragmentContext::_plan_local_exchange( RETURN_IF_ERROR(_add_local_exchange( pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, ops[idx]->required_data_distribution(), &do_local_exchange, num_buckets, - bucket_seq_to_instance_idx, ignore_data_hash_distribution)); + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, + ignore_data_hash_distribution)); } if (do_local_exchange) { // If local exchange is needed for current operator, we will split this pipeline to @@ -311,7 +317,8 @@ Status PipelineXFragmentContext::_plan_local_exchange( RETURN_IF_ERROR(_add_local_exchange( pip_idx, idx, pip->sink_x()->node_id(), _runtime_state->obj_pool(), pip, pip->sink_x()->required_data_distribution(), &do_local_exchange, num_buckets, - bucket_seq_to_instance_idx, ignore_data_hash_distribution)); + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, + ignore_data_hash_distribution)); } return Status::OK(); } @@ -713,6 +720,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip, DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx, const bool ignore_data_hash_distribution) { auto& operator_xs = cur_pipe->operator_xs(); const auto downstream_pipeline_id = cur_pipe->id(); @@ -720,9 +728,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( // 1. Create a new pipeline with local exchange sink. DataSinkOperatorXPtr sink; auto sink_id = next_sink_operator_id(); - sink.reset(new LocalExchangeSinkOperatorX(sink_id, local_exchange_id, _num_instances, - data_distribution.partition_exprs, - bucket_seq_to_instance_idx)); + sink.reset(new LocalExchangeSinkOperatorX( + sink_id, local_exchange_id, _total_instances, data_distribution.partition_exprs, + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets)); @@ -731,7 +739,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( switch (data_distribution.distribution_type) { case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( - std::max(cur_pipe->num_tasks(), _num_instances), _num_instances); + std::max(cur_pipe->num_tasks(), _num_instances), _total_instances); break; case ExchangeType::BUCKET_HASH_SHUFFLE: shared_state->exchanger = BucketShuffleExchanger::create_unique( @@ -826,7 +834,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( Status PipelineXFragmentContext::_add_local_exchange( int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, - const std::map<int, int>& bucket_seq_to_instance_idx, const bool ignore_data_distribution) { + const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx, + const bool ignore_data_distribution) { DCHECK(_enable_local_shuffle()); if (_num_instances <= 1) { return Status::OK(); @@ -840,9 +850,9 @@ Status PipelineXFragmentContext::_add_local_exchange( auto& operator_xs = cur_pipe->operator_xs(); auto total_op_num = operator_xs.size(); auto new_pip = add_pipeline(cur_pipe, pip_idx + 1); - RETURN_IF_ERROR(_add_local_exchange_impl(idx, pool, cur_pipe, new_pip, data_distribution, - do_local_exchange, num_buckets, - bucket_seq_to_instance_idx, ignore_data_distribution)); + RETURN_IF_ERROR(_add_local_exchange_impl( + idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets, + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, ignore_data_distribution)); CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() + new_pip->operator_xs().size()) << "total_op_num: " << total_op_num @@ -855,7 +865,7 @@ Status PipelineXFragmentContext::_add_local_exchange( RETURN_IF_ERROR(_add_local_exchange_impl( new_pip->operator_xs().size(), pool, new_pip, add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH), do_local_exchange, num_buckets, - bucket_seq_to_instance_idx, ignore_data_distribution)); + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, ignore_data_distribution)); } return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 34320d64f38..92178d359d9 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -126,6 +126,7 @@ private: PipelinePtr cur_pipe, DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx, const bool ignore_data_distribution); void _inherit_pipeline_properties(const DataDistribution& data_distribution, PipelinePtr pipe_with_source, PipelinePtr pipe_with_sink); @@ -133,6 +134,7 @@ private: PipelinePtr new_pipe, DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx, const bool ignore_data_distribution); [[nodiscard]] Status _build_pipelines(ObjectPool* pool, @@ -160,9 +162,11 @@ private: RuntimeState* state, DescriptorTbl& desc_tbl, PipelineId cur_pipeline_id); Status _plan_local_exchange(int num_buckets, - const std::map<int, int>& bucket_seq_to_instance_idx); + const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx); Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip, const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx, const bool ignore_data_distribution); bool _has_inverted_index_or_partial_update(TOlapTableSink sink); @@ -239,6 +243,8 @@ private: std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states; std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states; + + int _total_instances = -1; }; } // namespace pipeline diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 12be2b2d8ec..5e2d0ef85c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1611,6 +1611,7 @@ public class Coordinator implements CoordInterface { dest.fragment_instance_id = instanceExecParams.instanceId; dest.server = toRpcHost(instanceExecParams.host); dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); + instanceExecParams.recvrId = params.destinations.size(); break; } } @@ -1630,6 +1631,7 @@ public class Coordinator implements CoordInterface { destHosts.put(param.host, param); param.buildHashTableForBroadcastJoin = true; TPlanFragmentDestination dest = new TPlanFragmentDestination(); + param.recvrId = params.destinations.size(); dest.fragment_instance_id = param.instanceId; try { dest.server = toRpcHost(param.host); @@ -1653,6 +1655,7 @@ public class Coordinator implements CoordInterface { dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host)); + destParams.instanceExecParams.get(j).recvrId = params.destinations.size(); params.destinations.add(dest); } } @@ -1732,6 +1735,7 @@ public class Coordinator implements CoordInterface { dest.fragment_instance_id = instanceExecParams.instanceId; dest.server = toRpcHost(instanceExecParams.host); dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); + instanceExecParams.recvrId = params.destinations.size(); break; } } @@ -1752,6 +1756,7 @@ public class Coordinator implements CoordInterface { param.buildHashTableForBroadcastJoin = true; TPlanFragmentDestination dest = new TPlanFragmentDestination(); dest.fragment_instance_id = param.instanceId; + param.recvrId = params.destinations.size(); try { dest.server = toRpcHost(param.host); dest.setBrpcServer(toBrpcHost(param.host)); @@ -1773,6 +1778,7 @@ public class Coordinator implements CoordInterface { dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); dest.brpc_server = toBrpcHost(destParams.instanceExecParams.get(j).host); + destParams.instanceExecParams.get(j).recvrId = params.destinations.size(); destinations.add(dest); } } @@ -3755,22 +3761,26 @@ public class Coordinator implements CoordInterface { params.setFileScanParams(fileScanRangeParamsMap); params.setNumBuckets(fragment.getBucketNum()); params.setPerNodeSharedScans(perNodeSharedScans); + params.setTotalInstances(instanceExecParams.size()); if (ignoreDataDistribution) { params.setParallelInstances(parallelTasksNum); } res.put(instanceExecParam.host, params); res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, Integer>()); + res.get(instanceExecParam.host).setShuffleIdxToInstanceIdx(new HashMap<Integer, Integer>()); instanceIdx.put(instanceExecParam.host, 0); } // Set each bucket belongs to which instance on this BE. // This is used for LocalExchange(BUCKET_HASH_SHUFFLE). int instanceId = instanceIdx.get(instanceExecParam.host); + for (int bucket : instanceExecParam.bucketSeqSet) { res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, instanceId); - } instanceIdx.replace(instanceExecParam.host, ++instanceId); TPipelineFragmentParams params = res.get(instanceExecParam.host); + res.get(instanceExecParam.host).getShuffleIdxToInstanceIdx().put(instanceExecParam.recvrId, + params.getLocalParams().size()); TPipelineInstanceParams localParams = new TPipelineInstanceParams(); localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin); @@ -3919,6 +3929,8 @@ public class Coordinator implements CoordInterface { boolean buildHashTableForBroadcastJoin = false; + int recvrId = -1; + List<TUniqueId> instancesSharingHashTable = Lists.newArrayList(); public void addBucketSeq(int bucketSeq) { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 818643f6ba6..7559451e373 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -715,6 +715,8 @@ struct TPipelineFragmentParams { 35: optional map<i32, i32> bucket_seq_to_instance_idx 36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans 37: optional i32 parallel_instances + 38: optional i32 total_instances + 39: optional map<i32, i32> shuffle_idx_to_instance_idx // For cloud 1000: optional bool is_mow_table; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org