This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c3789e4799 [fix](local shuffle) Fix bucket local shuffle (#44459)
1c3789e4799 is described below

commit 1c3789e47998ea2ebbe445ddbb447f95a2b18c4a
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Mon Nov 25 12:02:25 2024 +0800

    [fix](local shuffle) Fix bucket local shuffle (#44459)
    
    Data in different buckets should be distributed into all tasks after
    bucket-hash local exchange.
    
    Before:
    ```
                                                                          
┌─────────────────────────────────────────┐
                                                                          │ 
┌─────────┐   ┌───────────────────────┐ │
                                                                          │ 
│Bucket 0 │   │                       │ │
                                                                          │ 
└─────────┘   │                       │ │
                                                                          │     
          │ LOCAL EXCHANGE SOURCE │ │
                                                                   ┌──────► 
┌─────────┐   │    (BUCKET HASH)      │ │
                                                                   │      │ 
│Bucket 1 │   │                       │ │
                                                                   │      │ 
└─────────┘   └───────────────────────┘ │
                                                                   │      
└─────────────────────────────────────────┘
    ┌─────────────────────────────────────────────────────┐        │
    │ ┌─────────┐  ┌───────┐     ┌─────────────────────┐  │        │
    │ │Bucket 0 │  │       │     │                     │  │        │
    │ └─────────┘  │       │     │                     │  │        │
    │              │ SCAN  │     │ LOCAL EXCHANGE SINK │  ├────────┤
    │ ┌─────────┐  │       │     │    (BUCKET HASH)    │  │        │
    │ │Bucket 1 │  │       │     │                     │  │        │
    │ └─────────┘  └───────┘     └─────────────────────┘  │        │
    └─────────────────────────────────────────────────────┘        │      
┌─────────────────────────────────────────┐
                                                                   │      │     
          ┌───────────────────────┐ │
                                                                   │      │     
          │                       │ │
                                                                   │      │     
          │                       │ │
                                                                   │      │     
          │ LOCAL EXCHANGE SOURCE │ │
                                                                   └──────►     
          │    (BUCKET HASH)      │ │
                                                                          │     
          │                       │ │
                                                                          │     
          └───────────────────────┘ │
                                                                          
└─────────────────────────────────────────┘
    ```
    
    After
    ```
    
    
    
                                                                                
 ┌─────────────────────────────────────────┐
                                                                                
 │ ┌─────────┐   ┌───────────────────────┐ │
                                                                                
 │ │Bucket 0 │   │                       │ │
                                                                                
 │ └─────────┘   │                       │ │
                                                                                
 │               │ LOCAL EXCHANGE SOURCE │ │
                                                                          
┌──────►               │    (BUCKET HASH)      │ │
                                                                          │     
 │               │                       │ │
                                                                          │     
 │               └───────────────────────┘ │
                                                                          │     
 └─────────────────────────────────────────┘
           ┌─────────────────────────────────────────────────────┐        │
           │ ┌─────────┐  ┌───────┐     ┌─────────────────────┐  │        │
           │ │Bucket 0 │  │       │     │                     │  │        │
           │ └─────────┘  │       │     │                     │  │        │
           │              │ SCAN  │     │ LOCAL EXCHANGE SINK │  ├────────┤
           │ ┌─────────┐  │       │     │    (BUCKET HASH)    │  │        │
           │ │Bucket 1 │  │       │     │                     │  │        │
           │ └─────────┘  └───────┘     └─────────────────────┘  │        │
           └─────────────────────────────────────────────────────┘        │     
 ┌─────────────────────────────────────────┐
                                                                          │     
 │               ┌───────────────────────┐ │
                                                                          │     
 │               │                       │ │
                                                                          │     
 │  ┌─────────┐  │                       │ │
                                                                          │     
 │  │Bucket 1 │  │ LOCAL EXCHANGE SOURCE │ │
                                                                          
└──────►  └─────────┘  │    (BUCKET HASH)      │ │
                                                                                
 │               │                       │ │
                                                                                
 │               └───────────────────────┘ │
                                                                                
 └─────────────────────────────────────────┘
    
    
    
    ```
---
 be/src/exprs/runtime_filter.cpp                           |  7 ++++---
 .../src/main/java/org/apache/doris/qe/Coordinator.java    | 14 ++++++--------
 .../org/apache/doris/qe/runtime/ThriftPlansBuilder.java   | 15 ++++-----------
 gensrc/thrift/PaloInternalService.thrift                  |  4 ++--
 4 files changed, 16 insertions(+), 24 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 24333360ff6..bac14b616b2 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1273,7 +1273,8 @@ void IRuntimeFilter::update_state() {
     // In pipelineX, runtime filters will be ready or timeout before open 
phase.
     if (expected == RuntimeFilterState::NOT_READY) {
         DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
-        COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
+        COUNTER_SET(_wait_timer,
+                    int64_t((MonotonicMillis() - registration_time_) * 
NANOS_PER_MILLIS));
         _rf_state_atomic = RuntimeFilterState::TIME_OUT;
     }
 }
@@ -1292,7 +1293,7 @@ PrimitiveType IRuntimeFilter::column_type() const {
 
 void IRuntimeFilter::signal() {
     DCHECK(is_consumer());
-    COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
+    COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - registration_time_) 
* NANOS_PER_MILLIS));
     _rf_state_atomic.store(RuntimeFilterState::READY);
     if (!_filter_timer.empty()) {
         for (auto& timer : _filter_timer) {
@@ -1539,7 +1540,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* 
parent_profile) {
 void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t 
local_merge_time) {
     _profile->add_info_string("RealRuntimeFilterType", 
to_string(_wrapper->get_real_type()));
     _profile->add_info_string("LocalMergeTime",
-                              std::to_string(local_merge_time / 1000000000.0) 
+ " s");
+                              std::to_string((double)local_merge_time / 
NANOS_PER_SEC) + " s");
 }
 
 std::string IRuntimeFilter::debug_string() const {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index acd0fbe0dae..e508efde42d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1935,7 +1935,6 @@ public class Coordinator implements CoordInterface {
 
                             FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, key, 0, params);
                             instanceParam.perNodeScanRanges.put(planNodeId, 
scanRangeParams);
-                            instanceParam.perNodeSharedScans.put(planNodeId, 
sharedScan);
                             params.instanceExecParams.add(instanceParam);
                         }
                         params.ignoreDataDistribution = sharedScan;
@@ -2757,13 +2756,11 @@ public class Coordinator implements CoordInterface {
                         null, addressScanRange.getKey(), 0, params);
 
                 for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
nodeScanRangeMap : scanRange) {
-                    instanceParam.addBucketSeq(nodeScanRangeMap.first);
                     for (Map.Entry<Integer, List<TScanRangeParams>> 
nodeScanRange
                             : nodeScanRangeMap.second.entrySet()) {
                         if 
(!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
                             range.put(nodeScanRange.getKey(), 
Lists.newArrayList());
                             
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), 
Lists.newArrayList());
-                            
instanceParam.perNodeSharedScans.put(nodeScanRange.getKey(), true);
                         }
                         
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
                         
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
@@ -2775,6 +2772,12 @@ public class Coordinator implements CoordInterface {
                     params.instanceExecParams.add(new FInstanceExecParam(
                             null, addressScanRange.getKey(), 0, params));
                 }
+                int index = 0;
+                for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
nodeScanRangeMap : scanRange) {
+                    params.instanceExecParams.get(index % 
params.instanceExecParams.size())
+                            .addBucketSeq(nodeScanRangeMap.first);
+                    index++;
+                }
             } else {
                 int expectedInstanceNum = 1;
                 if (parallelExecInstanceNum > 1) {
@@ -3131,10 +3134,8 @@ public class Coordinator implements CoordInterface {
             for (int i = 0; i < instanceExecParams.size(); ++i) {
                 final FInstanceExecParam instanceExecParam = 
instanceExecParams.get(i);
                 Map<Integer, List<TScanRangeParams>> scanRanges = 
instanceExecParam.perNodeScanRanges;
-                Map<Integer, Boolean> perNodeSharedScans = 
instanceExecParam.perNodeSharedScans;
                 if (scanRanges == null) {
                     scanRanges = Maps.newHashMap();
-                    perNodeSharedScans = Maps.newHashMap();
                 }
                 if (!res.containsKey(instanceExecParam.host)) {
                     TPipelineFragmentParams params = new 
TPipelineFragmentParams();
@@ -3162,7 +3163,6 @@ public class Coordinator implements CoordInterface {
 
                     params.setFileScanParams(fileScanRangeParamsMap);
                     params.setNumBuckets(fragment.getBucketNum());
-                    params.setPerNodeSharedScans(perNodeSharedScans);
                     params.setTotalInstances(instanceExecParams.size());
                     if (ignoreDataDistribution) {
                         params.setParallelInstances(parallelTasksNum);
@@ -3187,7 +3187,6 @@ public class Coordinator implements CoordInterface {
 
                 
localParams.setFragmentInstanceId(instanceExecParam.instanceId);
                 localParams.setPerNodeScanRanges(scanRanges);
-                localParams.setPerNodeSharedScans(perNodeSharedScans);
                 localParams.setSenderId(i);
                 localParams.setBackendNum(backendNum++);
                 localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
@@ -3335,7 +3334,6 @@ public class Coordinator implements CoordInterface {
         TUniqueId instanceId;
         TNetworkAddress host;
         Map<Integer, List<TScanRangeParams>> perNodeScanRanges = 
Maps.newHashMap();
-        Map<Integer, Boolean> perNodeSharedScans = Maps.newHashMap();
 
         int perFragmentInstanceIdx;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index f0e3febe192..a02ee90e901 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -423,7 +423,7 @@ public class ThriftPlansBuilder {
 
         boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
         if (isLocalShuffle && ((LocalShuffleAssignedJob) 
instance).receiveDataFromLocal) {
-            // save thrift rpc message size, don't need perNodeScanRanges and 
perNodeSharedScans,
+            // save thrift rpc message size, don't need perNodeScanRanges,
             // but the perNodeScanRanges is required rpc field
             instanceParams.setPerNodeScanRanges(Maps.newLinkedHashMap());
             return;
@@ -459,19 +459,16 @@ public class ThriftPlansBuilder {
 
     private static PerNodeScanParams 
computeDefaultScanSourceParam(DefaultScanSource defaultScanSource) {
         Map<Integer, List<TScanRangeParams>> perNodeScanRanges = 
Maps.newLinkedHashMap();
-        Map<Integer, Boolean> perNodeSharedScans = Maps.newLinkedHashMap();
         for (Entry<ScanNode, ScanRanges> kv : 
defaultScanSource.scanNodeToScanRanges.entrySet()) {
             int scanNodeId = kv.getKey().getId().asInt();
             perNodeScanRanges.put(scanNodeId, kv.getValue().params);
-            perNodeSharedScans.put(scanNodeId, true);
         }
 
-        return new PerNodeScanParams(perNodeScanRanges, perNodeSharedScans);
+        return new PerNodeScanParams(perNodeScanRanges);
     }
 
     private static PerNodeScanParams 
computeBucketScanSourceParam(BucketScanSource bucketScanSource) {
         Map<Integer, List<TScanRangeParams>> perNodeScanRanges = 
Maps.newLinkedHashMap();
-        Map<Integer, Boolean> perNodeSharedScans = Maps.newLinkedHashMap();
         for (Entry<Integer, Map<ScanNode, ScanRanges>> kv :
                 bucketScanSource.bucketIndexToScanNodeToTablets.entrySet()) {
             Map<ScanNode, ScanRanges> scanNodeToRanges = kv.getValue();
@@ -479,10 +476,9 @@ public class ThriftPlansBuilder {
                 int scanNodeId = kv2.getKey().getId().asInt();
                 List<TScanRangeParams> scanRanges = 
perNodeScanRanges.computeIfAbsent(scanNodeId, ArrayList::new);
                 scanRanges.addAll(kv2.getValue().params);
-                perNodeSharedScans.put(scanNodeId, true);
             }
         }
-        return new PerNodeScanParams(perNodeScanRanges, perNodeSharedScans);
+        return new PerNodeScanParams(perNodeScanRanges);
     }
 
     private static Map<Integer, Integer> computeBucketIdToInstanceId(
@@ -562,12 +558,9 @@ public class ThriftPlansBuilder {
 
     private static class PerNodeScanParams {
         Map<Integer, List<TScanRangeParams>> perNodeScanRanges;
-        Map<Integer, Boolean> perNodeSharedScans;
 
-        public PerNodeScanParams(Map<Integer, List<TScanRangeParams>> 
perNodeScanRanges,
-                Map<Integer, Boolean> perNodeSharedScans) {
+        public PerNodeScanParams(Map<Integer, List<TScanRangeParams>> 
perNodeScanRanges) {
             this.perNodeScanRanges = perNodeScanRanges;
-            this.perNodeSharedScans = perNodeSharedScans;
         }
     }
 }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 392aa8658df..9a0fd910d94 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -776,7 +776,7 @@ struct TPipelineInstanceParams {
   4: optional i32 sender_id
   5: optional TRuntimeFilterParams runtime_filter_params
   6: optional i32 backend_num
-  7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
+  7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated
   8: optional list<i32> topn_filter_source_node_ids // deprecated after we set 
topn_filter_descs
   9: optional list<PlanNodes.TTopnFilterDesc> topn_filter_descs
 }
@@ -820,7 +820,7 @@ struct TPipelineFragmentParams {
   33: optional i32 num_local_sink
   34: optional i32 num_buckets
   35: optional map<i32, i32> bucket_seq_to_instance_idx
-  36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
+  36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated
   37: optional i32 parallel_instances
   38: optional i32 total_instances
   39: optional map<i32, i32> shuffle_idx_to_instance_idx


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to