This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 360933b885d branch-2.1: [feat](binlog) Speed binlog gc by locked binlogs #47547 (#48128) 360933b885d is described below commit 360933b885d69d33723d56f44b73e846919a7a96 Author: walter <maoch...@selectdb.com> AuthorDate: Thu Feb 20 17:08:26 2025 +0800 branch-2.1: [feat](binlog) Speed binlog gc by locked binlogs #47547 (#48128) cherry pick from #47547 --- .../main/java/org/apache/doris/common/Config.java | 2 +- .../org/apache/doris/binlog/BinlogManager.java | 5 ++- .../java/org/apache/doris/binlog/DBBinlog.java | 45 +++++++++++++++++++--- .../java/org/apache/doris/binlog/TableBinlog.java | 20 +++++++++- 4 files changed, 63 insertions(+), 9 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index bbfa2f30e3a..5deaf8b51d5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2417,7 +2417,7 @@ public class Config extends ConfigBase { public static int max_binlog_messsage_size = 1024 * 1024 * 1024; @ConfField(mutable = true, masterOnly = true, description = { - "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。", + "是否禁止使用 WITH RESOURCE 语句创建 Catalog。", "Whether to disable creating catalog with WITH RESOURCE statement."}) public static boolean disallow_create_catalog_with_resource = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 2403ede6fb3..d4bf72a1e9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -112,6 +112,10 @@ public class BinlogManager { return; } + LOG.debug("add binlog, db {}, table {}, commitSeq {}, timestamp {}, type {}, data {}", + binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), binlog.getTimestamp(), binlog.getType(), + binlog.getData()); + DBBinlog dbBinlog; lock.writeLock().lock(); try { @@ -589,7 +593,6 @@ public class BinlogManager { return tombstones; } - public void replayGc(BinlogGcInfo binlogGcInfo) { lock.writeLock().lock(); Map<Long, DBBinlog> gcDbBinlogMap; diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 675c9dc78a8..bd7fd184426 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -43,6 +43,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -288,7 +289,6 @@ public class DBBinlog { public Pair<TStatus, Long> lockBinlog(long tableId, String jobUniqueId, long lockCommitSeq) { TableBinlog tableBinlog = null; - lock.writeLock().lock(); try { if (tableId < 0) { @@ -457,20 +457,43 @@ public class DBBinlog { } if (lastExpiredBinlog != null) { - dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq()); + final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq(); + dummy.setCommitSeq(expiredCommitSeq); // release expired timestamps by commit seq. Iterator<Pair<Long, Long>> timeIter = timestamps.iterator(); - while (timeIter.hasNext() && timeIter.next().first <= lastExpiredBinlog.getCommitSeq()) { + while (timeIter.hasNext() && timeIter.next().first <= expiredCommitSeq) { timeIter.remove(); } - gcDroppedResources(lastExpiredBinlog.getCommitSeq()); + lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <= expiredCommitSeq); + gcDroppedResources(expiredCommitSeq); } return lastExpiredBinlog; } + private Optional<Long> getMinLockedCommitSeq() { + lock.readLock().lock(); + try { + Optional<Long> minLockedCommitSeq = lockedBinlogs.values().stream().min(Long::compareTo); + for (TableBinlog tableBinlog : tableBinlogMap.values()) { + Optional<Long> tableMinLockedCommitSeq = tableBinlog.getMinLockedCommitSeq(); + if (!tableMinLockedCommitSeq.isPresent()) { + continue; + } + if (minLockedCommitSeq.isPresent()) { + minLockedCommitSeq = Optional.of(Math.min(minLockedCommitSeq.get(), tableMinLockedCommitSeq.get())); + } else { + minLockedCommitSeq = tableMinLockedCommitSeq; + } + } + return minLockedCommitSeq; + } finally { + lock.readLock().unlock(); + } + } + private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) { long ttlSeconds = dbBinlogConfig.getTtlSeconds(); long maxBytes = dbBinlogConfig.getMaxBytes(); @@ -481,10 +504,12 @@ public class DBBinlog { dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums); // step 1: get current tableBinlog info and expiredCommitSeq + Optional<Long> minLockedCommitSeq = getMinLockedCommitSeq(); TBinlog lastExpiredBinlog = null; + List<TableBinlog> tableBinlogs = Lists.newArrayList(); lock.writeLock().lock(); try { - long expiredCommitSeq = -1; + long expiredCommitSeq = -1L; Iterator<Pair<Long, Long>> timeIter = timestamps.iterator(); while (timeIter.hasNext()) { Pair<Long, Long> pair = timeIter.next(); @@ -494,6 +519,13 @@ public class DBBinlog { expiredCommitSeq = pair.first; } + // Speed up gc by recycling binlogs that are not locked by syncer. + // To keep compatible with the old version, if no binlog is locked here, fallthrough to the + // previous behavior (keep the entire binlogs until it is expired). + if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L < minLockedCommitSeq.get()) { + expiredCommitSeq = minLockedCommitSeq.get() - 1L; + } + final long lastExpiredCommitSeq = expiredCommitSeq; BinlogComparator checker = (binlog) -> { // NOTE: TreeSet read size during iterator remove is valid. @@ -507,6 +539,7 @@ public class DBBinlog { || maxHistoryNums < allBinlogs.size(); }; lastExpiredBinlog = getLastExpiredBinlog(checker); + tableBinlogs.addAll(tableBinlogMap.values()); } finally { lock.writeLock().unlock(); } @@ -518,7 +551,7 @@ public class DBBinlog { // step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db // tombstone List<BinlogTombstone> tableTombstones = Lists.newArrayList(); - for (TableBinlog tableBinlog : tableBinlogMap.values()) { + for (TableBinlog tableBinlog : tableBinlogs) { // step 2.1: gc tableBinlog,and get table tombstone BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq()); if (tableTombstone != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index b6cf328eccc..cef60c85ac4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -39,6 +39,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -171,6 +172,15 @@ public class TableBinlog { return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq); } + public Optional<Long> getMinLockedCommitSeq() { + lock.readLock().lock(); + try { + return lockedBinlogs.values().stream().min(Long::compareTo); + } finally { + lock.readLock().unlock(); + } + } + private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(BinlogComparator checker) { if (binlogs.size() <= 1) { return null; @@ -199,7 +209,7 @@ public class TableBinlog { return null; } - long expiredCommitSeq = lastExpiredBinlog.getCommitSeq(); + final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq(); dummyBinlog.setCommitSeq(expiredCommitSeq); Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator(); @@ -207,6 +217,7 @@ public class TableBinlog { timeIterator.remove(); } + lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <= expiredCommitSeq); return Pair.of(tombstoneUpsert, expiredCommitSeq); } @@ -279,6 +290,13 @@ public class TableBinlog { } expiredCommitSeq = entry.first; } + + // find the min locked binlog commit seq, if not exists, use the last binlog commit seq. + Optional<Long> minLockedCommitSeq = lockedBinlogs.values().stream().min(Long::compareTo); + if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L < minLockedCommitSeq.get()) { + // Speed up the gc progress by the min locked commit seq. + expiredCommitSeq = minLockedCommitSeq.get() - 1L; + } } final long lastExpiredCommitSeq = expiredCommitSeq; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org