This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e083dc26a00 [cherry-pick](branch-2.1) Pick "[Fix](group commit) Fix multiple cluster group commit BE select strategy (#38644)" (#39010) e083dc26a00 is described below commit e083dc26a00c3a109de8eb51ee3af31b242edbfa Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Wed Aug 7 22:07:30 2024 +0800 [cherry-pick](branch-2.1) Pick "[Fix](group commit) Fix multiple cluster group commit BE select strategy (#38644)" (#39010) ## Proposed changes Pick #38644 <!--Describe your changes.--> --- .../org/apache/doris/load/GroupCommitManager.java | 30 ++++++++++++++-------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 1ec6a06179e..b6cf6cbb0a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -55,8 +55,8 @@ public class GroupCommitManager { // Table id to BE id map. Only for group commit. private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>(); - // BE id to pressure map. Only for group commit. - private Map<Long, SlidingWindowCounter> tablePressureMap = new ConcurrentHashMap<>(); + // Table id to pressure map. Only for group commit. + private Map<Long, SlidingWindowCounter> tableToPressureMap = new ConcurrentHashMap<>(); public boolean isBlock(long tableId) { return blockedTableIds.contains(tableId); @@ -236,8 +236,8 @@ public class GroupCommitManager { } private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException { - LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), - tablePressureMap.toString()); + LOG.debug("group commit select be info, tableToBeMap {}, tableToPressureMap {}", tableToBeMap.toString(), + tableToPressureMap.toString()); Long cachedBackendId = getCachedBackend(tableId); if (cachedBackendId != null) { return cachedBackendId; @@ -264,8 +264,18 @@ public class GroupCommitManager { private Long getCachedBackend(long tableId) { OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); if (tableToBeMap.containsKey(tableId)) { - if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { - Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId)); + if (tableToPressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { + // There are multiple threads getting cached backends for the same table. + // Maybe one thread removes the tableId from the tableToBeMap. + // Another thread gets the same tableId but can not find this tableId. + // So another thread needs to get the random backend. + Long backendId = tableToBeMap.get(tableId); + Backend backend; + if (backendId != null) { + backend = Env.getCurrentSystemInfo().getBackend(backendId); + } else { + return null; + } if (backend.isAlive() && !backend.isDecommissioned()) { return backend.getId(); } else { @@ -285,7 +295,7 @@ public class GroupCommitManager { for (Backend backend : backends) { if (backend.isAlive() && !backend.isDecommissioned()) { tableToBeMap.put(tableId, backend.getId()); - tablePressureMap.put(tableId, + tableToPressureMap.put(tableId, new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); return backend.getId(); } @@ -315,10 +325,10 @@ public class GroupCommitManager { } public void updateLoadDataInternal(long tableId, long receiveData) { - if (tablePressureMap.containsKey(tableId)) { - tablePressureMap.get(tableId).add(receiveData); + if (tableToPressureMap.containsKey(tableId)) { + tableToPressureMap.get(tableId).add(receiveData); LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData, - tablePressureMap.toString()); + tableToPressureMap.toString()); } else { LOG.warn("can not find backend id: {}", tableId); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org