This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f0cde35ea6 [performance improvement] Spark Load, SparkDpp 
processRDDAggregate performance improvement (#12186)
f0cde35ea6 is described below

commit f0cde35ea66220d6d618dc06caf566b8e7a6ab89
Author: HouRong <[email protected]>
AuthorDate: Wed Aug 31 09:14:13 2022 +0800

    [performance improvement] Spark Load, SparkDpp processRDDAggregate 
performance improvement (#12186)
    
    Co-authored-by: hourong <[email protected]>
---
 .../src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java  | 8 ++------
 1 file changed, 2 insertions(+), 6 deletions(-)

diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 5d951ad70b..53ae81686f 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -138,12 +138,8 @@ public final class SparkDpp implements 
java.io.Serializable {
             RollupTreeNode curNode, SparkRDDAggregator[] sparkRDDAggregators) 
throws SparkDppException {
         final boolean isDuplicateTable = 
!StringUtils.equalsIgnoreCase(curNode.indexMeta.indexType, "AGGREGATE")
                 && !StringUtils.equalsIgnoreCase(curNode.indexMeta.indexType, 
"UNIQUE");
-
         // Aggregate/UNIQUE table
         if (!isDuplicateTable) {
-            // TODO(wb) set the reduce concurrency by statistic instead of 
hard code 200
-            int aggregateConcurrency = 200;
-
             int idx = 0;
             for (int i = 0; i < curNode.indexMeta.columns.size(); i++) {
                 if (!curNode.indexMeta.columns.get(i).isKey) {
@@ -155,14 +151,14 @@ public final class SparkDpp implements 
java.io.Serializable {
             if (curNode.indexMeta.isBaseIndex) {
                 JavaPairRDD<List<Object>, Object[]> result = 
currentPairRDD.mapToPair(
                         new 
EncodeBaseAggregateTableFunction(sparkRDDAggregators))
-                        .reduceByKey(new 
AggregateReduceFunction(sparkRDDAggregators), aggregateConcurrency);
+                        .reduceByKey(new 
AggregateReduceFunction(sparkRDDAggregators));
                 return result;
             } else {
                 JavaPairRDD<List<Object>, Object[]> result = currentPairRDD
                         .mapToPair(new EncodeRollupAggregateTableFunction(
                                 
getColumnIndexInParentRollup(curNode.keyColumnNames, curNode.valueColumnNames,
                                         curNode.parent.keyColumnNames, 
curNode.parent.valueColumnNames)))
-                        .reduceByKey(new 
AggregateReduceFunction(sparkRDDAggregators), aggregateConcurrency);
+                        .reduceByKey(new 
AggregateReduceFunction(sparkRDDAggregators));
                 return result;
             }
         // Duplicate Table


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to