This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 60e20a3afea [fix](pipeline_x) Crc32HashPartitioner should use 
ShuffleChannelIds (#34147)
60e20a3afea is described below

commit 60e20a3afea42082f707aecf9d81ef2653f2b203
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Fri Apr 26 14:33:03 2024 +0800

    [fix](pipeline_x) Crc32HashPartitioner should use ShuffleChannelIds (#34147)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp                   | 8 ++++----
 be/src/pipeline/exec/partitioned_hash_join_probe_operator.h       | 4 ++--
 be/src/pipeline/exec/partitioned_hash_join_sink_operator.h        | 4 ++--
 .../pipeline_x/local_exchange/local_exchange_sink_operator.h      | 4 ++--
 be/src/vec/runtime/partitioner.cpp                                | 1 -
 5 files changed, 10 insertions(+), 11 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 79a6ee0e748..84381c6a8af 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -221,8 +221,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     }
     if (_part_type == TPartitionType::HASH_PARTITIONED) {
         _partition_count = channels.size();
-        _partitioner.reset(
-                new 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(channels.size()));
+        _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+                channels.size()));
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
         _profile->add_info_string("Partitioner",
@@ -269,8 +269,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
         _partition_count =
                 channels.size() * 
config::table_sink_partition_write_max_partition_nums_per_writer;
-        _partitioner.reset(
-                new 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_partition_count));
+        _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+                _partition_count));
         _partition_function.reset(new 
HashPartitionFunction(_partitioner.get()));
 
         scale_writer_partitioning_exchanger.reset(new 
vectorized::ScaleWriterPartitioningExchanger<
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 5bdc5278ffc..3702c2e1a6b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -24,16 +24,16 @@
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/join_build_sink_operator.h"
-#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" 
// LocalExchangeChannelIds
 #include "pipeline/pipeline_x/operator.h"
 #include "vec/runtime/partitioner.h"
+#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds
 
 namespace doris {
 class RuntimeState;
 
 namespace pipeline {
 
-using PartitionerType = 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;
+using PartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>;
 
 class PartitionedHashJoinProbeOperatorX;
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 3f29e3093b6..68c6b970163 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -24,9 +24,9 @@
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/join_build_sink_operator.h"
-#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" 
// LocalExchangeChannelIds
 #include "pipeline/pipeline_x/operator.h"
 #include "vec/runtime/partitioner.h"
+#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds
 
 namespace doris {
 class ExecNode;
@@ -34,7 +34,7 @@ class RuntimeState;
 
 namespace pipeline {
 
-using PartitionerType = 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;
+using PartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>;
 
 class PartitionedHashJoinSinkOperatorX;
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index b3ecf29736f..db6662a221a 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -114,8 +114,8 @@ public:
                     _shuffle_idx_to_instance_idx[i] = {i, i};
                 }
             }
-            _partitioner.reset(
-                    new 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
+            _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+                    _num_partitions));
             RETURN_IF_ERROR(_partitioner->init(_texprs));
         } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
             _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
diff --git a/be/src/vec/runtime/partitioner.cpp 
b/be/src/vec/runtime/partitioner.cpp
index db40610723c..fadf6d73b95 100644
--- a/be/src/vec/runtime/partitioner.cpp
+++ b/be/src/vec/runtime/partitioner.cpp
@@ -103,6 +103,5 @@ template class Partitioner<size_t, ShuffleChannelIds>;
 template class XXHashPartitioner<ShuffleChannelIds>;
 template class Partitioner<uint32_t, ShuffleChannelIds>;
 template class Crc32HashPartitioner<ShuffleChannelIds>;
-template class Crc32HashPartitioner<pipeline::LocalExchangeChannelIds>;
 
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to