This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ff56605c2eb [pipelineX](bug) Fix hash partition shuffle (#28071)
ff56605c2eb is described below

commit ff56605c2ebc26c535933573d5b675b484d799b0
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Dec 6 19:26:46 2023 +0800

    [pipelineX](bug) Fix hash partition shuffle (#28071)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index f8c3e392184..c4319abf03c 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -213,11 +213,11 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     if (p._part_type == TPartitionType::HASH_PARTITIONED) {
         _partition_count = channels.size();
         _partitioner.reset(
-                new 
vectorized::XXHashPartitioner<LocalExchangeChannelIds>(channels.size()));
+                new 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(channels.size()));
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
         _profile->add_info_string("Partitioner",
-                                  fmt::format("XXHashPartitioner({})", 
_partition_count));
+                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
     } else if (p._part_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         _partition_count = channel_shared_ptrs.size();
         _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
@@ -410,7 +410,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         if (_part_type == TPartitionType::HASH_PARTITIONED) {
             RETURN_IF_ERROR(channel_add_rows(state, local_state.channels,
                                              local_state._partition_count,
-                                             
(uint64_t*)local_state._partitioner->get_channel_ids(),
+                                             
(uint32_t*)local_state._partitioner->get_channel_ids(),
                                              rows, block, source_state == 
SourceState::FINISHED));
         } else {
             RETURN_IF_ERROR(channel_add_rows(state, 
local_state.channel_shared_ptrs,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to