This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 303e27597e3 [Fix](TPartitionVersionInfo) Fix duplicate `TPartitionVersionInfo` in `PublishVersionTask.partitionVersionInfos` (#44846) 303e27597e3 is described below commit 303e27597e3f6c2ee82033ab90593ae8d2599628 Author: bobhan1 <bao...@selectdb.com> AuthorDate: Mon Dec 2 21:42:18 2024 +0800 [Fix](TPartitionVersionInfo) Fix duplicate `TPartitionVersionInfo` in `PublishVersionTask.partitionVersionInfos` (#44846) ### What problem does this PR solve? Problem Summary: When FE handles BEs' tablet report and finds that there exists some expired txns on BE, it will generate publish version task. `TPartitionVersionInfo` with same values may be added to `transactionsToPublish` under same txn id many times when partitions involved in this failed txn involves many tablets on this BE. Because it uses `ArrayListMultimap`, these duplicate values may occupy a lot of memories when the number of tablets is large. ### Release note Fixed the issue of FE's memory occupation growing too fast in cases of persistent load and clone failures on merge-on-write tables. 修复了在merge-on-write表上有持续的导入失败和clone失败的情况下,FE使用内存增长过快的问题。 --- .../java/org/apache/doris/catalog/TabletInvertedIndex.java | 13 +++++++------ .../main/java/org/apache/doris/master/ReportHandler.java | 9 +++++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 4a421dc7b2b..a51d1f55014 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -37,13 +37,14 @@ import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Preconditions; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import com.google.common.collect.Table; import com.google.common.collect.TreeMultimap; @@ -135,7 +136,7 @@ public class TabletInvertedIndex { Set<Long> tabletFoundInMeta, ListMultimap<TStorageMedium, Long> tabletMigrationMap, Map<Long, Long> partitionVersionSyncMap, - Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish, + Map<Long, SetMultimap<Long, TPartitionVersionInfo>> transactionsToPublish, ListMultimap<Long, Long> transactionsToClear, ListMultimap<Long, Long> tabletRecoveryMap, List<TTabletMetaInfo> tabletToUpdate, @@ -314,7 +315,7 @@ public class TabletInvertedIndex { } private void handleBackendTransactions(long backendId, List<Long> transactionIds, long tabletId, - TabletMeta tabletMeta, Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish, + TabletMeta tabletMeta, Map<Long, SetMultimap<Long, TPartitionVersionInfo>> transactionsToPublish, ListMultimap<Long, Long> transactionsToClear) { GlobalTransactionMgrIface transactionMgr = Env.getCurrentGlobalTransactionMgr(); long partitionId = tabletMeta.getPartitionId(); @@ -376,15 +377,15 @@ public class TabletInvertedIndex { } private void publishPartition(TransactionState transactionState, long transactionId, TabletMeta tabletMeta, - long partitionId, Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish) { + long partitionId, Map<Long, SetMultimap<Long, TPartitionVersionInfo>> transactionsToPublish) { TPartitionVersionInfo versionInfo = generatePartitionVersionInfoWhenReport(transactionState, transactionId, tabletMeta, partitionId); if (versionInfo != null) { synchronized (transactionsToPublish) { - ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get( + SetMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get( transactionState.getDbId()); if (map == null) { - map = ArrayListMultimap.create(); + map = LinkedHashMultimap.create(); transactionsToPublish.put(transactionState.getDbId(), map); } map.put(transactionId, versionInfo); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index c5c72eae3c5..06047e2cf16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -95,6 +95,7 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; +import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -503,7 +504,7 @@ public class ReportHandler extends Daemon { Map<Long, Long> partitionVersionSyncMap = Maps.newConcurrentMap(); // dbid -> txn id -> [partition info] - Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish = Maps.newHashMap(); + Map<Long, SetMultimap<Long, TPartitionVersionInfo>> transactionsToPublish = Maps.newHashMap(); ListMultimap<Long, Long> transactionsToClear = LinkedListMultimap.create(); // db id -> tablet id @@ -1148,14 +1149,14 @@ public class ReportHandler extends Daemon { } private static void handleRepublishVersionInfo( - Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish, long backendId) { + Map<Long, SetMultimap<Long, TPartitionVersionInfo>> transactionsToPublish, long backendId) { AgentBatchTask batchTask = new AgentBatchTask(); long createPublishVersionTaskTime = System.currentTimeMillis(); for (Long dbId : transactionsToPublish.keySet()) { - ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(dbId); + SetMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(dbId); for (long txnId : map.keySet()) { PublishVersionTask task = new PublishVersionTask(backendId, txnId, dbId, - map.get(txnId), createPublishVersionTaskTime); + Lists.newArrayList(map.get(txnId)), createPublishVersionTaskTime); batchTask.addTask(task); // add to AgentTaskQueue for handling finish report. AgentTaskQueue.addTask(task); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org