This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c5116c5d31b [fix](schema change) reduce memory usage of alter 
multi-column statement (#33073)
c5116c5d31b is described below

commit c5116c5d31be440bba343084f3599f0b029bb293
Author: Luwei <814383...@qq.com>
AuthorDate: Sat Apr 27 13:59:10 2024 +0800

    [fix](schema change) reduce memory usage of alter multi-column statement 
(#33073)
---
 .../java/org/apache/doris/alter/RollupJobV2.java   |  9 ++--
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  7 +--
 .../org/apache/doris/task/AlterReplicaTask.java    | 52 +++++++++++++++-------
 3 files changed, 44 insertions(+), 24 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index ec0868637e7..93aceb0936b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -62,7 +62,6 @@ import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.AlterReplicaTask;
 import org.apache.doris.task.CreateReplicaTask;
-import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
@@ -86,6 +85,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -388,7 +388,7 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
         }
 
         tbl.readLock();
-        Map<Object, List<TColumn>> tcloumnsPool  = Maps.newHashMap();
+        Map<Object, Object> objectPool = new ConcurrentHashMap<Object, 
Object>();
         String vaultId = tbl.getStorageVaultId();
         try {
             long expiration = (createTimeMs + timeoutMs) / 1000;
@@ -401,14 +401,13 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                 // the rollup task will transform the data before visible 
version(included).
                 long visibleVersion = partition.getVisibleVersion();
 
+                Map<String, Expr> defineExprs = Maps.newHashMap();
                 MaterializedIndex rollupIndex = entry.getValue();
                 Map<Long, Long> tabletIdMap = 
this.partitionIdToBaseRollupTabletIdMap.get(partitionId);
                 for (Tablet rollupTablet : rollupIndex.getTablets()) {
                     long rollupTabletId = rollupTablet.getId();
                     long baseTabletId = tabletIdMap.get(rollupTabletId);
 
-                    Map<String, Expr> defineExprs = Maps.newHashMap();
-
                     DescriptorTable descTable = new DescriptorTable();
                     TupleDescriptor destTupleDesc = 
descTable.createTupleDescriptor();
                     Map<String, SlotDescriptor> descMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@@ -470,7 +469,7 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                                 partitionId, rollupIndexId, baseIndexId, 
rollupTabletId, baseTabletId,
                                 rollupReplica.getId(), rollupSchemaHash, 
baseSchemaHash, visibleVersion, jobId,
                                 JobType.ROLLUP, defineExprs, descTable, 
tbl.getSchemaByIndexId(baseIndexId, true),
-                                tcloumnsPool, whereClause, expiration, 
vaultId);
+                                objectPool, whereClause, expiration, vaultId);
                         rollupBatchTask.addTask(rollupTask);
                     }
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index f00177ec537..277a3411541 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -54,7 +54,6 @@ import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.AlterReplicaTask;
 import org.apache.doris.task.CreateReplicaTask;
-import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
@@ -76,6 +75,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -418,7 +418,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         }
 
         tbl.readLock();
-        Map<Object, List<TColumn>> tcloumnsPool  = Maps.newHashMap();
+        Map<Object, Object> objectPool = new ConcurrentHashMap<Object, 
Object>();
         String vaultId = tbl.getStorageVaultId();
         try {
             long expiration = (createTimeMs + timeoutMs) / 1000;
@@ -478,12 +478,13 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                             AlterReplicaTask rollupTask = new 
AlterReplicaTask(shadowReplica.getBackendId(), dbId,
                                     tableId, partitionId, shadowIdxId, 
originIdxId, shadowTabletId, originTabletId,
                                     shadowReplica.getId(), shadowSchemaHash, 
originSchemaHash, visibleVersion, jobId,
-                                    JobType.SCHEMA_CHANGE, defineExprs, 
descTable, originSchemaColumns, tcloumnsPool,
+                                    JobType.SCHEMA_CHANGE, defineExprs, 
descTable, originSchemaColumns, objectPool,
                                     null, expiration, vaultId);
                             schemaChangeBatchTask.addTask(rollupTask);
                         }
                     }
                 }
+
             } // end for partitions
         } finally {
             tbl.readUnlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
index ebf505e454a..c95cc267076 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
@@ -42,6 +42,7 @@ import java.util.Map;
  * The new replica can be a rollup replica, or a shadow replica of schema 
change.
  */
 public class AlterReplicaTask extends AgentTask {
+
     private long baseTabletId;
     private long newReplicaId;
     private int baseSchemaHash;
@@ -53,7 +54,7 @@ public class AlterReplicaTask extends AgentTask {
     private Map<String, Expr> defineExprs;
     private Expr whereClause;
     private DescriptorTable descTable;
-    private Map<Object, List<TColumn>> tcloumnsPool;
+    private Map<Object, Object> objectPool;
     private List<Column> baseSchemaColumns;
 
     private long expiration;
@@ -67,7 +68,7 @@ public class AlterReplicaTask extends AgentTask {
     public AlterReplicaTask(long backendId, long dbId, long tableId, long 
partitionId, long rollupIndexId,
             long baseIndexId, long rollupTabletId, long baseTabletId, long 
newReplicaId, int newSchemaHash,
             int baseSchemaHash, long version, long jobId, AlterJobV2.JobType 
jobType, Map<String, Expr> defineExprs,
-            DescriptorTable descTable, List<Column> baseSchemaColumns, 
Map<Object, List<TColumn>> tcloumnsPool,
+            DescriptorTable descTable, List<Column> baseSchemaColumns, 
Map<Object, Object> objectPool,
             Expr whereClause, long expiration, String vaultId) {
         super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, 
rollupIndexId, rollupTabletId);
 
@@ -85,7 +86,7 @@ public class AlterReplicaTask extends AgentTask {
         this.whereClause = whereClause;
         this.descTable = descTable;
         this.baseSchemaColumns = baseSchemaColumns;
-        this.tcloumnsPool = tcloumnsPool;
+        this.objectPool = objectPool;
         this.expiration = expiration;
         this.vaultId = vaultId;
     }
@@ -134,32 +135,51 @@ public class AlterReplicaTask extends AgentTask {
             default:
                 break;
         }
+
         if (defineExprs != null) {
             for (Map.Entry<String, Expr> entry : defineExprs.entrySet()) {
-                List<SlotRef> slots = Lists.newArrayList();
-                entry.getValue().collect(SlotRef.class, slots);
-                TAlterMaterializedViewParam mvParam = new 
TAlterMaterializedViewParam(entry.getKey());
-                mvParam.setMvExpr(entry.getValue().treeToThrift());
-                req.addToMaterializedViewParams(mvParam);
+                Object value = objectPool.get(entry.getKey());
+                if (value == null) {
+                    List<SlotRef> slots = Lists.newArrayList();
+                    entry.getValue().collect(SlotRef.class, slots);
+                    TAlterMaterializedViewParam mvParam = new 
TAlterMaterializedViewParam(entry.getKey());
+                    mvParam.setMvExpr(entry.getValue().treeToThrift());
+                    req.addToMaterializedViewParams(mvParam);
+                    objectPool.put(entry.getKey(), mvParam);
+                } else {
+                    TAlterMaterializedViewParam mvParam = 
(TAlterMaterializedViewParam) value;
+                    req.addToMaterializedViewParams(mvParam);
+                }
             }
         }
+
         if (whereClause != null) {
-            TAlterMaterializedViewParam mvParam = new 
TAlterMaterializedViewParam(Column.WHERE_SIGN);
-            mvParam.setMvExpr(whereClause.treeToThrift());
-            req.addToMaterializedViewParams(mvParam);
+            Object value = objectPool.get(Column.WHERE_SIGN);
+            if (value == null) {
+                TAlterMaterializedViewParam mvParam = new 
TAlterMaterializedViewParam(Column.WHERE_SIGN);
+                mvParam.setMvExpr(whereClause.treeToThrift());
+                req.addToMaterializedViewParams(mvParam);
+                objectPool.put(Column.WHERE_SIGN, mvParam);
+            } else {
+                TAlterMaterializedViewParam mvParam = 
(TAlterMaterializedViewParam) value;
+                req.addToMaterializedViewParams(mvParam);
+            }
         }
         req.setDescTbl(descTable.toThrift());
 
         if (baseSchemaColumns != null) {
-            List<TColumn> columns = tcloumnsPool.get(baseSchemaColumns);
-            if (columns == null) {
-                columns = new ArrayList<TColumn>();
+            Object value = objectPool.get(baseSchemaColumns);
+            if (value == null) {
+                List<TColumn> columns = new ArrayList<TColumn>();
                 for (Column column : baseSchemaColumns) {
                     columns.add(column.toThrift());
                 }
-                tcloumnsPool.put(baseSchemaColumns, columns);
+                objectPool.put(baseSchemaColumns, columns);
+                req.setColumns(columns);
+            } else {
+                List<TColumn> columns = (List<TColumn>) value;
+                req.setColumns(columns);
             }
-            req.setColumns(columns);
         }
         req.setStorageVaultId(this.vaultId);
         return req;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to