This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c5116c5d31b [fix](schema change) reduce memory usage of alter multi-column statement (#33073) c5116c5d31b is described below commit c5116c5d31be440bba343084f3599f0b029bb293 Author: Luwei <814383...@qq.com> AuthorDate: Sat Apr 27 13:59:10 2024 +0800 [fix](schema change) reduce memory usage of alter multi-column statement (#33073) --- .../java/org/apache/doris/alter/RollupJobV2.java | 9 ++-- .../org/apache/doris/alter/SchemaChangeJobV2.java | 7 +-- .../org/apache/doris/task/AlterReplicaTask.java | 52 +++++++++++++++------- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index ec0868637e7..93aceb0936b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -62,7 +62,6 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; -import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -86,6 +85,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -388,7 +388,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { } tbl.readLock(); - Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap(); + Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>(); String vaultId = tbl.getStorageVaultId(); try { long expiration = (createTimeMs + timeoutMs) / 1000; @@ -401,14 +401,13 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { // the rollup task will transform the data before visible version(included). long visibleVersion = partition.getVisibleVersion(); + Map<String, Expr> defineExprs = Maps.newHashMap(); MaterializedIndex rollupIndex = entry.getValue(); Map<Long, Long> tabletIdMap = this.partitionIdToBaseRollupTabletIdMap.get(partitionId); for (Tablet rollupTablet : rollupIndex.getTablets()) { long rollupTabletId = rollupTablet.getId(); long baseTabletId = tabletIdMap.get(rollupTabletId); - Map<String, Expr> defineExprs = Maps.newHashMap(); - DescriptorTable descTable = new DescriptorTable(); TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); Map<String, SlotDescriptor> descMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); @@ -470,7 +469,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId, JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true), - tcloumnsPool, whereClause, expiration, vaultId); + objectPool, whereClause, expiration, vaultId); rollupBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index f00177ec537..277a3411541 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -54,7 +54,6 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; -import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -76,6 +75,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -418,7 +418,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } tbl.readLock(); - Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap(); + Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>(); String vaultId = tbl.getStorageVaultId(); try { long expiration = (createTimeMs + timeoutMs) / 1000; @@ -478,12 +478,13 @@ public class SchemaChangeJobV2 extends AlterJobV2 { AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId, tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId, shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId, - JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, tcloumnsPool, + JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, objectPool, null, expiration, vaultId); schemaChangeBatchTask.addTask(rollupTask); } } } + } // end for partitions } finally { tbl.readUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index ebf505e454a..c95cc267076 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -42,6 +42,7 @@ import java.util.Map; * The new replica can be a rollup replica, or a shadow replica of schema change. */ public class AlterReplicaTask extends AgentTask { + private long baseTabletId; private long newReplicaId; private int baseSchemaHash; @@ -53,7 +54,7 @@ public class AlterReplicaTask extends AgentTask { private Map<String, Expr> defineExprs; private Expr whereClause; private DescriptorTable descTable; - private Map<Object, List<TColumn>> tcloumnsPool; + private Map<Object, Object> objectPool; private List<Column> baseSchemaColumns; private long expiration; @@ -67,7 +68,7 @@ public class AlterReplicaTask extends AgentTask { public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs, - DescriptorTable descTable, List<Column> baseSchemaColumns, Map<Object, List<TColumn>> tcloumnsPool, + DescriptorTable descTable, List<Column> baseSchemaColumns, Map<Object, Object> objectPool, Expr whereClause, long expiration, String vaultId) { super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId); @@ -85,7 +86,7 @@ public class AlterReplicaTask extends AgentTask { this.whereClause = whereClause; this.descTable = descTable; this.baseSchemaColumns = baseSchemaColumns; - this.tcloumnsPool = tcloumnsPool; + this.objectPool = objectPool; this.expiration = expiration; this.vaultId = vaultId; } @@ -134,32 +135,51 @@ public class AlterReplicaTask extends AgentTask { default: break; } + if (defineExprs != null) { for (Map.Entry<String, Expr> entry : defineExprs.entrySet()) { - List<SlotRef> slots = Lists.newArrayList(); - entry.getValue().collect(SlotRef.class, slots); - TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); - mvParam.setMvExpr(entry.getValue().treeToThrift()); - req.addToMaterializedViewParams(mvParam); + Object value = objectPool.get(entry.getKey()); + if (value == null) { + List<SlotRef> slots = Lists.newArrayList(); + entry.getValue().collect(SlotRef.class, slots); + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); + mvParam.setMvExpr(entry.getValue().treeToThrift()); + req.addToMaterializedViewParams(mvParam); + objectPool.put(entry.getKey(), mvParam); + } else { + TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value; + req.addToMaterializedViewParams(mvParam); + } } } + if (whereClause != null) { - TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN); - mvParam.setMvExpr(whereClause.treeToThrift()); - req.addToMaterializedViewParams(mvParam); + Object value = objectPool.get(Column.WHERE_SIGN); + if (value == null) { + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN); + mvParam.setMvExpr(whereClause.treeToThrift()); + req.addToMaterializedViewParams(mvParam); + objectPool.put(Column.WHERE_SIGN, mvParam); + } else { + TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value; + req.addToMaterializedViewParams(mvParam); + } } req.setDescTbl(descTable.toThrift()); if (baseSchemaColumns != null) { - List<TColumn> columns = tcloumnsPool.get(baseSchemaColumns); - if (columns == null) { - columns = new ArrayList<TColumn>(); + Object value = objectPool.get(baseSchemaColumns); + if (value == null) { + List<TColumn> columns = new ArrayList<TColumn>(); for (Column column : baseSchemaColumns) { columns.add(column.toThrift()); } - tcloumnsPool.put(baseSchemaColumns, columns); + objectPool.put(baseSchemaColumns, columns); + req.setColumns(columns); + } else { + List<TColumn> columns = (List<TColumn>) value; + req.setColumns(columns); } - req.setColumns(columns); } req.setStorageVaultId(this.vaultId); return req; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org