This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit dca623a58fe226f9fcc6e19e5f7ee0cea1b48806 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Sat Apr 27 10:43:02 2024 +0800 [fix](spill) use different algorithm to avoid partition data skew (#34162) --- be/src/pipeline/exec/partitioned_hash_join_probe_operator.h | 3 +-- be/src/pipeline/exec/partitioned_hash_join_sink_operator.h | 3 +-- be/src/vec/runtime/partitioner.cpp | 1 + be/src/vec/runtime/partitioner.h | 7 +++++++ 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 3702c2e1a6b..d650dd1590d 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -26,14 +26,13 @@ #include "pipeline/exec/join_build_sink_operator.h" #include "pipeline/pipeline_x/operator.h" #include "vec/runtime/partitioner.h" -#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds namespace doris { class RuntimeState; namespace pipeline { -using PartitionerType = vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>; +using PartitionerType = vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>; class PartitionedHashJoinProbeOperatorX; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 68c6b970163..60cb2e8f60c 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -26,7 +26,6 @@ #include "pipeline/exec/join_build_sink_operator.h" #include "pipeline/pipeline_x/operator.h" #include "vec/runtime/partitioner.h" -#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds namespace doris { class ExecNode; @@ -34,7 +33,7 @@ class RuntimeState; namespace pipeline { -using PartitionerType = vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>; +using PartitionerType = vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>; class PartitionedHashJoinSinkOperatorX; diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index fadf6d73b95..bbb6ebfc1a8 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -103,5 +103,6 @@ template class Partitioner<size_t, ShuffleChannelIds>; template class XXHashPartitioner<ShuffleChannelIds>; template class Partitioner<uint32_t, ShuffleChannelIds>; template class Crc32HashPartitioner<ShuffleChannelIds>; +template class Crc32HashPartitioner<SpillPartitionChannelIds>; } // namespace doris::vectorized diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index 66ed8809d7c..66e00122ee7 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -112,5 +112,12 @@ private: void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override; }; +struct SpillPartitionChannelIds { + template <typename HashValueType> + HashValueType operator()(HashValueType l, size_t r) { + return ((l >> 16) | (l << 16)) % r; + } +}; + } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org