This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1c3789e4799 [fix](local shuffle) Fix bucket local shuffle (#44459) 1c3789e4799 is described below commit 1c3789e47998ea2ebbe445ddbb447f95a2b18c4a Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Mon Nov 25 12:02:25 2024 +0800 [fix](local shuffle) Fix bucket local shuffle (#44459) Data in different buckets should be distributed into all tasks after bucket-hash local exchange. Before: ``` ┌─────────────────────────────────────────┐ │ ┌─────────┐ ┌───────────────────────┐ │ │ │Bucket 0 │ │ │ │ │ └─────────┘ │ │ │ │ │ LOCAL EXCHANGE SOURCE │ │ ┌──────► ┌─────────┐ │ (BUCKET HASH) │ │ │ │ │Bucket 1 │ │ │ │ │ │ └─────────┘ └───────────────────────┘ │ │ └─────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────┐ │ │ ┌─────────┐ ┌───────┐ ┌─────────────────────┐ │ │ │ │Bucket 0 │ │ │ │ │ │ │ │ └─────────┘ │ │ │ │ │ │ │ │ SCAN │ │ LOCAL EXCHANGE SINK │ ├────────┤ │ ┌─────────┐ │ │ │ (BUCKET HASH) │ │ │ │ │Bucket 1 │ │ │ │ │ │ │ │ └─────────┘ └───────┘ └─────────────────────┘ │ │ └─────────────────────────────────────────────────────┘ │ ┌─────────────────────────────────────────┐ │ │ ┌───────────────────────┐ │ │ │ │ │ │ │ │ │ │ │ │ │ │ LOCAL EXCHANGE SOURCE │ │ └──────► │ (BUCKET HASH) │ │ │ │ │ │ │ └───────────────────────┘ │ └─────────────────────────────────────────┘ ``` After ``` ┌─────────────────────────────────────────┐ │ ┌─────────┐ ┌───────────────────────┐ │ │ │Bucket 0 │ │ │ │ │ └─────────┘ │ │ │ │ │ LOCAL EXCHANGE SOURCE │ │ ┌──────► │ (BUCKET HASH) │ │ │ │ │ │ │ │ │ └───────────────────────┘ │ │ └─────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────┐ │ │ ┌─────────┐ ┌───────┐ ┌─────────────────────┐ │ │ │ │Bucket 0 │ │ │ │ │ │ │ │ └─────────┘ │ │ │ │ │ │ │ │ SCAN │ │ LOCAL EXCHANGE SINK │ ├────────┤ │ ┌─────────┐ │ │ │ (BUCKET HASH) │ │ │ │ │Bucket 1 │ │ │ │ │ │ │ │ └─────────┘ └───────┘ └─────────────────────┘ │ │ └─────────────────────────────────────────────────────┘ │ ┌─────────────────────────────────────────┐ │ │ ┌───────────────────────┐ │ │ │ │ │ │ │ │ ┌─────────┐ │ │ │ │ │ │Bucket 1 │ │ LOCAL EXCHANGE SOURCE │ │ └──────► └─────────┘ │ (BUCKET HASH) │ │ │ │ │ │ │ └───────────────────────┘ │ └─────────────────────────────────────────┘ ``` --- be/src/exprs/runtime_filter.cpp | 7 ++++--- .../src/main/java/org/apache/doris/qe/Coordinator.java | 14 ++++++-------- .../org/apache/doris/qe/runtime/ThriftPlansBuilder.java | 15 ++++----------- gensrc/thrift/PaloInternalService.thrift | 4 ++-- 4 files changed, 16 insertions(+), 24 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 24333360ff6..bac14b616b2 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1273,7 +1273,8 @@ void IRuntimeFilter::update_state() { // In pipelineX, runtime filters will be ready or timeout before open phase. if (expected == RuntimeFilterState::NOT_READY) { DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms); - COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_); + COUNTER_SET(_wait_timer, + int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS)); _rf_state_atomic = RuntimeFilterState::TIME_OUT; } } @@ -1292,7 +1293,7 @@ PrimitiveType IRuntimeFilter::column_type() const { void IRuntimeFilter::signal() { DCHECK(is_consumer()); - COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_); + COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS)); _rf_state_atomic.store(RuntimeFilterState::READY); if (!_filter_timer.empty()) { for (auto& timer : _filter_timer) { @@ -1539,7 +1540,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_time) { _profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type())); _profile->add_info_string("LocalMergeTime", - std::to_string(local_merge_time / 1000000000.0) + " s"); + std::to_string((double)local_merge_time / NANOS_PER_SEC) + " s"); } std::string IRuntimeFilter::debug_string() const { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index acd0fbe0dae..e508efde42d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1935,7 +1935,6 @@ public class Coordinator implements CoordInterface { FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params); instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams); - instanceParam.perNodeSharedScans.put(planNodeId, sharedScan); params.instanceExecParams.add(instanceParam); } params.ignoreDataDistribution = sharedScan; @@ -2757,13 +2756,11 @@ public class Coordinator implements CoordInterface { null, addressScanRange.getKey(), 0, params); for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) { - instanceParam.addBucketSeq(nodeScanRangeMap.first); for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.second.entrySet()) { if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { range.put(nodeScanRange.getKey(), Lists.newArrayList()); instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList()); - instanceParam.perNodeSharedScans.put(nodeScanRange.getKey(), true); } range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()) @@ -2775,6 +2772,12 @@ public class Coordinator implements CoordInterface { params.instanceExecParams.add(new FInstanceExecParam( null, addressScanRange.getKey(), 0, params)); } + int index = 0; + for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) { + params.instanceExecParams.get(index % params.instanceExecParams.size()) + .addBucketSeq(nodeScanRangeMap.first); + index++; + } } else { int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { @@ -3131,10 +3134,8 @@ public class Coordinator implements CoordInterface { for (int i = 0; i < instanceExecParams.size(); ++i) { final FInstanceExecParam instanceExecParam = instanceExecParams.get(i); Map<Integer, List<TScanRangeParams>> scanRanges = instanceExecParam.perNodeScanRanges; - Map<Integer, Boolean> perNodeSharedScans = instanceExecParam.perNodeSharedScans; if (scanRanges == null) { scanRanges = Maps.newHashMap(); - perNodeSharedScans = Maps.newHashMap(); } if (!res.containsKey(instanceExecParam.host)) { TPipelineFragmentParams params = new TPipelineFragmentParams(); @@ -3162,7 +3163,6 @@ public class Coordinator implements CoordInterface { params.setFileScanParams(fileScanRangeParamsMap); params.setNumBuckets(fragment.getBucketNum()); - params.setPerNodeSharedScans(perNodeSharedScans); params.setTotalInstances(instanceExecParams.size()); if (ignoreDataDistribution) { params.setParallelInstances(parallelTasksNum); @@ -3187,7 +3187,6 @@ public class Coordinator implements CoordInterface { localParams.setFragmentInstanceId(instanceExecParam.instanceId); localParams.setPerNodeScanRanges(scanRanges); - localParams.setPerNodeSharedScans(perNodeSharedScans); localParams.setSenderId(i); localParams.setBackendNum(backendNum++); localParams.setRuntimeFilterParams(new TRuntimeFilterParams()); @@ -3335,7 +3334,6 @@ public class Coordinator implements CoordInterface { TUniqueId instanceId; TNetworkAddress host; Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newHashMap(); - Map<Integer, Boolean> perNodeSharedScans = Maps.newHashMap(); int perFragmentInstanceIdx; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index f0e3febe192..a02ee90e901 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -423,7 +423,7 @@ public class ThriftPlansBuilder { boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob; if (isLocalShuffle && ((LocalShuffleAssignedJob) instance).receiveDataFromLocal) { - // save thrift rpc message size, don't need perNodeScanRanges and perNodeSharedScans, + // save thrift rpc message size, don't need perNodeScanRanges, // but the perNodeScanRanges is required rpc field instanceParams.setPerNodeScanRanges(Maps.newLinkedHashMap()); return; @@ -459,19 +459,16 @@ public class ThriftPlansBuilder { private static PerNodeScanParams computeDefaultScanSourceParam(DefaultScanSource defaultScanSource) { Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newLinkedHashMap(); - Map<Integer, Boolean> perNodeSharedScans = Maps.newLinkedHashMap(); for (Entry<ScanNode, ScanRanges> kv : defaultScanSource.scanNodeToScanRanges.entrySet()) { int scanNodeId = kv.getKey().getId().asInt(); perNodeScanRanges.put(scanNodeId, kv.getValue().params); - perNodeSharedScans.put(scanNodeId, true); } - return new PerNodeScanParams(perNodeScanRanges, perNodeSharedScans); + return new PerNodeScanParams(perNodeScanRanges); } private static PerNodeScanParams computeBucketScanSourceParam(BucketScanSource bucketScanSource) { Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newLinkedHashMap(); - Map<Integer, Boolean> perNodeSharedScans = Maps.newLinkedHashMap(); for (Entry<Integer, Map<ScanNode, ScanRanges>> kv : bucketScanSource.bucketIndexToScanNodeToTablets.entrySet()) { Map<ScanNode, ScanRanges> scanNodeToRanges = kv.getValue(); @@ -479,10 +476,9 @@ public class ThriftPlansBuilder { int scanNodeId = kv2.getKey().getId().asInt(); List<TScanRangeParams> scanRanges = perNodeScanRanges.computeIfAbsent(scanNodeId, ArrayList::new); scanRanges.addAll(kv2.getValue().params); - perNodeSharedScans.put(scanNodeId, true); } } - return new PerNodeScanParams(perNodeScanRanges, perNodeSharedScans); + return new PerNodeScanParams(perNodeScanRanges); } private static Map<Integer, Integer> computeBucketIdToInstanceId( @@ -562,12 +558,9 @@ public class ThriftPlansBuilder { private static class PerNodeScanParams { Map<Integer, List<TScanRangeParams>> perNodeScanRanges; - Map<Integer, Boolean> perNodeSharedScans; - public PerNodeScanParams(Map<Integer, List<TScanRangeParams>> perNodeScanRanges, - Map<Integer, Boolean> perNodeSharedScans) { + public PerNodeScanParams(Map<Integer, List<TScanRangeParams>> perNodeScanRanges) { this.perNodeScanRanges = perNodeScanRanges; - this.perNodeSharedScans = perNodeSharedScans; } } } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 392aa8658df..9a0fd910d94 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -776,7 +776,7 @@ struct TPipelineInstanceParams { 4: optional i32 sender_id 5: optional TRuntimeFilterParams runtime_filter_params 6: optional i32 backend_num - 7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans + 7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated 8: optional list<i32> topn_filter_source_node_ids // deprecated after we set topn_filter_descs 9: optional list<PlanNodes.TTopnFilterDesc> topn_filter_descs } @@ -820,7 +820,7 @@ struct TPipelineFragmentParams { 33: optional i32 num_local_sink 34: optional i32 num_buckets 35: optional map<i32, i32> bucket_seq_to_instance_idx - 36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans + 36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated 37: optional i32 parallel_instances 38: optional i32 total_instances 39: optional map<i32, i32> shuffle_idx_to_instance_idx --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org