This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 60e20a3afea [fix](pipeline_x) Crc32HashPartitioner should use ShuffleChannelIds (#34147) 60e20a3afea is described below commit 60e20a3afea42082f707aecf9d81ef2653f2b203 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Fri Apr 26 14:33:03 2024 +0800 [fix](pipeline_x) Crc32HashPartitioner should use ShuffleChannelIds (#34147) --- be/src/pipeline/exec/exchange_sink_operator.cpp | 8 ++++---- be/src/pipeline/exec/partitioned_hash_join_probe_operator.h | 4 ++-- be/src/pipeline/exec/partitioned_hash_join_sink_operator.h | 4 ++-- .../pipeline_x/local_exchange/local_exchange_sink_operator.h | 4 ++-- be/src/vec/runtime/partitioner.cpp | 1 - 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 79a6ee0e748..84381c6a8af 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -221,8 +221,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { } if (_part_type == TPartitionType::HASH_PARTITIONED) { _partition_count = channels.size(); - _partitioner.reset( - new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(channels.size())); + _partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( + channels.size())); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner", @@ -269,8 +269,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { _partition_count = channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer; - _partitioner.reset( - new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_partition_count)); + _partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( + _partition_count)); _partition_function.reset(new HashPartitionFunction(_partitioner.get())); scale_writer_partitioning_exchanger.reset(new vectorized::ScaleWriterPartitioningExchanger< diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 5bdc5278ffc..3702c2e1a6b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -24,16 +24,16 @@ #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/join_build_sink_operator.h" -#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" // LocalExchangeChannelIds #include "pipeline/pipeline_x/operator.h" #include "vec/runtime/partitioner.h" +#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds namespace doris { class RuntimeState; namespace pipeline { -using PartitionerType = vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>; +using PartitionerType = vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>; class PartitionedHashJoinProbeOperatorX; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 3f29e3093b6..68c6b970163 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -24,9 +24,9 @@ #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/join_build_sink_operator.h" -#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" // LocalExchangeChannelIds #include "pipeline/pipeline_x/operator.h" #include "vec/runtime/partitioner.h" +#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds namespace doris { class ExecNode; @@ -34,7 +34,7 @@ class RuntimeState; namespace pipeline { -using PartitionerType = vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>; +using PartitionerType = vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>; class PartitionedHashJoinSinkOperatorX; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index b3ecf29736f..db6662a221a 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -114,8 +114,8 @@ public: _shuffle_idx_to_instance_idx[i] = {i, i}; } } - _partitioner.reset( - new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions)); + _partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( + _num_partitions)); RETURN_IF_ERROR(_partitioner->init(_texprs)); } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) { _partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index db40610723c..fadf6d73b95 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -103,6 +103,5 @@ template class Partitioner<size_t, ShuffleChannelIds>; template class XXHashPartitioner<ShuffleChannelIds>; template class Partitioner<uint32_t, ShuffleChannelIds>; template class Crc32HashPartitioner<ShuffleChannelIds>; -template class Crc32HashPartitioner<pipeline::LocalExchangeChannelIds>; } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org