This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new acf741fa80a [feature](binlog) Support gc binlogs by history nums and size (#35250) acf741fa80a is described below commit acf741fa80a3d03dc3e4bd39f3d9ed122ffdcebc Author: walter <w41te...@gmail.com> AuthorDate: Thu May 23 14:39:57 2024 +0800 [feature](binlog) Support gc binlogs by history nums and size (#35250) * [chore](binlog) Add logs about binlog gc (#34359) * [feature](binlog) Support gc binlogs by history nums and size (#34888) --- .../org/apache/doris/binlog/BinlogComparator.java | 2 +- .../org/apache/doris/binlog/BinlogConfigCache.java | 5 +- .../java/org/apache/doris/binlog/BinlogGcer.java | 2 +- .../org/apache/doris/binlog/BinlogManager.java | 5 +- .../java/org/apache/doris/binlog/BinlogUtils.java | 8 ++ .../java/org/apache/doris/binlog/DBBinlog.java | 142 +++++++++++++-------- .../java/org/apache/doris/binlog/TableBinlog.java | 103 ++++++++++----- .../org/apache/doris/binlog/BinlogManagerTest.java | 11 +- .../org/apache/doris/binlog/TableBinlogTest.java | 2 +- 9 files changed, 180 insertions(+), 100 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java index 9e35cc3bd61..edc01782f31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java @@ -20,5 +20,5 @@ package org.apache.doris.binlog; import org.apache.doris.thrift.TBinlog; public interface BinlogComparator { - boolean isExpired(TBinlog binlog, long expired); + boolean isExpired(TBinlog binlog); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java index 30641bae8c6..b07f5e5d87c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java @@ -41,6 +41,8 @@ public class BinlogConfigCache { lock = new ReentrantReadWriteLock(); } + // Get the binlog config of the specified db, return null if no such database + // exists. public BinlogConfig getDBBinlogConfig(long dbId) { lock.readLock().lock(); BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId); @@ -110,7 +112,8 @@ public class BinlogConfigCache { OlapTable olapTable = (OlapTable) table; tableBinlogConfig = olapTable.getBinlogConfig(); // get table binlog config, when table modify binlogConfig - // it create a new binlog, not update inplace, so we don't need to clone binlogConfig + // it create a new binlog, not update inplace, so we don't need to clone + // binlogConfig dbTableBinlogEnableMap.put(tableId, tableBinlogConfig); return tableBinlogConfig; } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java index 70118076114..c3e14e4955b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java @@ -58,7 +58,7 @@ public class BinlogGcer extends MasterDaemon { try { List<BinlogTombstone> tombstones = Env.getCurrentEnv().getBinlogManager().gc(); if (tombstones != null && !tombstones.isEmpty()) { - LOG.info("tomebstones size: {}", tombstones.size()); + LOG.info("tombstones size: {}", tombstones.size()); } else { LOG.info("no gc binlog"); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 96d0f7f4e13..454f678e2e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -58,9 +58,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class BinlogManager { private static final int BUFFER_SIZE = 16 * 1024; private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("Name") - .add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime") + .add("Type").add("Id").add("Dropped").add("BinlogLength").add("BinlogSize").add("FirstBinlogCommittedTime") .add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime") - .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds") + .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds").add("BinlogMaxBytes") + .add("BinlogMaxHistoryNums") .build(); private static final Logger LOG = LogManager.getLogger(BinlogManager.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java index 0f6c2308248..6b79fab143b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java @@ -81,6 +81,7 @@ public class BinlogUtils { return dummy; } + // Compute the expired timestamp in milliseconds. public static long getExpiredMs(long ttlSeconds) { long currentSeconds = System.currentTimeMillis() / 1000; if (currentSeconds < ttlSeconds) { @@ -94,4 +95,11 @@ public class BinlogUtils { public static String convertTimeToReadable(long time) { return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(time)); } + + public static long getApproximateMemoryUsage(TBinlog binlog) { + /* object layout: header + body + padding */ + final long objSize = 80; // 9 fields and 1 header + String data = binlog.getData(); + return objSize + binlog.getTableIdsSize() * 8 + (data == null ? 0 : data.length()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index a3133bfb5c7..79e1adf20c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -45,6 +45,8 @@ public class DBBinlog { private static final Logger LOG = LogManager.getLogger(BinlogManager.class); private long dbId; + // The size of all binlogs. + private long binlogSize; // guard for allBinlogs && tableBinlogMap private ReentrantReadWriteLock lock; // all binlogs contain table binlogs && create table binlog etc ... @@ -64,6 +66,7 @@ public class DBBinlog { lock = new ReentrantReadWriteLock(); this.dbId = binlog.getDbId(); this.binlogConfigCache = binlogConfigCache; + this.binlogSize = 0; // allBinlogs treeset order by commitSeq allBinlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); @@ -81,7 +84,7 @@ public class DBBinlog { } public static DBBinlog recoverDbBinlog(BinlogConfigCache binlogConfigCache, TBinlog dbDummy, - List<TBinlog> tableDummies, boolean dbBinlogEnable) { + List<TBinlog> tableDummies, boolean dbBinlogEnable) { DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy); long dbId = dbDummy.getDbId(); for (TBinlog tableDummy : tableDummies) { @@ -105,6 +108,7 @@ public class DBBinlog { } allBinlogs.add(binlog); + binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); if (tableIds == null) { return; @@ -119,12 +123,13 @@ public class DBBinlog { } } - // TODO(Drogon): remove TableBinlog after DropTable, think table drop && recovery + // TODO(Drogon): remove TableBinlog after DropTable, think table drop && + // recovery private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlogEnable) { TableBinlog tableBinlog = tableBinlogMap.get(tableId); if (tableBinlog == null) { if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId, tableId)) { - tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId); + tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId); tableBinlogMap.put(tableId, tableBinlog); tableDummyBinlogs.add(tableBinlog.getDummyBinlog()); } @@ -132,7 +137,8 @@ public class DBBinlog { return tableBinlog; } - // guard by BinlogManager, if addBinlog called, more than one(db/tables) enable binlog + // guard by BinlogManager, if addBinlog called, more than one(db/tables) enable + // binlog public void addBinlog(TBinlog binlog) { boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId); List<Long> tableIds = binlog.getTableIds(); @@ -140,6 +146,7 @@ public class DBBinlog { lock.writeLock().lock(); try { allBinlogs.add(binlog); + binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); if (binlog.getTimestamp() > 0 && dbBinlogEnable) { timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); @@ -226,14 +233,10 @@ public class DBBinlog { return null; } - boolean dbBinlogEnable = dbBinlogConfig.isEnable(); BinlogTombstone tombstone; - if (dbBinlogEnable) { + if (dbBinlogConfig.isEnable()) { // db binlog is enabled, only one binlogTombstones - long ttlSeconds = dbBinlogConfig.getTtlSeconds(); - long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); - - tombstone = dbBinlogEnableGc(expiredMs); + tombstone = dbBinlogEnableGc(dbBinlogConfig); } else { tombstone = dbBinlogDisableGc(); } @@ -277,7 +280,7 @@ public class DBBinlog { } for (TableBinlog tableBinlog : tableBinlogs) { - BinlogTombstone tombstone = tableBinlog.ttlGc(); + BinlogTombstone tombstone = tableBinlog.gc(); if (tombstone != null) { tombstones.add(tombstone); } @@ -297,6 +300,7 @@ public class DBBinlog { TBinlog dummy = binlogIter.next(); boolean foundFirstUsingBinlog = false; long lastCommitSeq = -1; + long removed = 0; while (binlogIter.hasNext()) { TBinlog binlog = binlogIter.next(); @@ -304,6 +308,8 @@ public class DBBinlog { if (commitSeq <= largestExpiredCommitSeq) { if (binlog.table_ref <= 0) { binlogIter.remove(); + binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog); + ++removed; if (!foundFirstUsingBinlog) { lastCommitSeq = commitSeq; } @@ -318,52 +324,92 @@ public class DBBinlog { if (lastCommitSeq != -1) { dummy.setCommitSeq(lastCommitSeq); } + + LOG.info("remove {} expired binlogs, dbId: {}, left: {}", removed, dbId, allBinlogs.size()); } finally { lock.writeLock().unlock(); } } - private BinlogTombstone dbBinlogEnableGc(long expiredMs) { + private TBinlog getLastExpiredBinlog(BinlogComparator checker) { + TBinlog lastExpiredBinlog = null; + + Iterator<TBinlog> binlogIter = allBinlogs.iterator(); + TBinlog dummy = binlogIter.next(); + while (binlogIter.hasNext()) { + TBinlog binlog = binlogIter.next(); + if (checker.isExpired(binlog)) { + binlogIter.remove(); + binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog); + lastExpiredBinlog = binlog; + } else { + break; + } + } + + if (lastExpiredBinlog != null) { + dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq()); + + // release expired timestamps by commit seq. + Iterator<Pair<Long, Long>> timeIter = timestamps.iterator(); + while (timeIter.hasNext() && timeIter.next().first <= lastExpiredBinlog.getCommitSeq()) { + timeIter.remove(); + } + } + + return lastExpiredBinlog; + } + + private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) { + long ttlSeconds = dbBinlogConfig.getTtlSeconds(); + long maxBytes = dbBinlogConfig.getMaxBytes(); + long maxHistoryNums = dbBinlogConfig.getMaxHistoryNums(); + long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); + + LOG.info("gc db binlog. dbId: {}, expiredMs: {}, ttlSecond: {}, maxBytes: {}, maxHistoryNums: {}", + dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums); + // step 1: get current tableBinlog info and expiredCommitSeq - long expiredCommitSeq = -1; + TBinlog lastExpiredBinlog = null; lock.writeLock().lock(); try { + long expiredCommitSeq = -1; Iterator<Pair<Long, Long>> timeIter = timestamps.iterator(); while (timeIter.hasNext()) { Pair<Long, Long> pair = timeIter.next(); - if (pair.second <= expiredMs) { - expiredCommitSeq = pair.first; - timeIter.remove(); - } else { + if (pair.second > expiredMs) { break; } + expiredCommitSeq = pair.first; } - Iterator<TBinlog> binlogIter = allBinlogs.iterator(); - TBinlog dummy = binlogIter.next(); - dummy.setCommitSeq(expiredCommitSeq); - - while (binlogIter.hasNext()) { - TBinlog binlog = binlogIter.next(); - if (binlog.getCommitSeq() <= expiredCommitSeq) { - binlogIter.remove(); - } else { - break; - } - } + final long lastExpiredCommitSeq = expiredCommitSeq; + BinlogComparator checker = (binlog) -> { + // NOTE: TreeSet read size during iterator remove is valid. + // + // The expired conditions in order: + // 1. expired time + // 2. the max bytes + // 3. the max history num + return binlog.getCommitSeq() <= lastExpiredCommitSeq + || maxBytes < binlogSize + || maxHistoryNums < allBinlogs.size(); + }; + lastExpiredBinlog = getLastExpiredBinlog(checker); } finally { lock.writeLock().unlock(); } - if (expiredCommitSeq == -1) { + if (lastExpiredBinlog == null) { return null; } - // step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db tombstone + // step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db + // tombstone List<BinlogTombstone> tableTombstones = Lists.newArrayList(); for (TableBinlog tableBinlog : tableBinlogMap.values()) { // step 2.1: gc tableBinlogļ¼and get table tombstone - BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(expiredCommitSeq); + BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq()); if (tableTombstone != null) { tableTombstones.add(tableTombstone); } @@ -386,28 +432,8 @@ public class DBBinlog { lock.writeLock().lock(); try { - Iterator<Pair<Long, Long>> timeIter = timestamps.iterator(); - while (timeIter.hasNext()) { - long commitSeq = timeIter.next().first; - if (commitSeq <= largestExpiredCommitSeq) { - timeIter.remove(); - } else { - break; - } - } - - Iterator<TBinlog> binlogIter = allBinlogs.iterator(); - TBinlog dummy = binlogIter.next(); - dummy.setCommitSeq(largestExpiredCommitSeq); - - while (binlogIter.hasNext()) { - TBinlog binlog = binlogIter.next(); - if (binlog.getCommitSeq() <= largestExpiredCommitSeq) { - binlogIter.remove(); - } else { - break; - } - } + BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= largestExpiredCommitSeq; + getLastExpiredBinlog(checker); } finally { lock.writeLock().unlock(); } @@ -478,6 +504,8 @@ public class DBBinlog { info.add(dropped); String binlogLength = String.valueOf(allBinlogs.size()); info.add(binlogLength); + String binlogSize = String.valueOf(this.binlogSize); + info.add(binlogSize); String firstBinlogCommittedTime = null; String readableFirstBinlogCommittedTime = null; if (!timestamps.isEmpty()) { @@ -497,10 +525,16 @@ public class DBBinlog { info.add(lastBinlogCommittedTime); info.add(readableLastBinlogCommittedTime); String binlogTtlSeconds = null; + String binlogMaxBytes = null; + String binlogMaxHistoryNums = null; if (binlogConfig != null) { binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds()); + binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes()); + binlogMaxHistoryNums = String.valueOf(binlogConfig.getMaxHistoryNums()); } info.add(binlogTtlSeconds); + info.add(binlogMaxBytes); + info.add(binlogMaxHistoryNums); result.addRow(info); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 3dd290a07f8..36ec4f733ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -27,6 +27,7 @@ import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -43,15 +44,23 @@ public class TableBinlog { private long dbId; private long tableId; + private long binlogSize; private ReentrantReadWriteLock lock; private TreeSet<TBinlog> binlogs; + + // Pair(commitSeq, timestamp), used for gc + // need UpsertRecord to add timestamps for gc + private List<Pair<Long, Long>> timestamps; + private BinlogConfigCache binlogConfigCache; public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbId, long tableId) { this.dbId = dbId; this.tableId = tableId; + this.binlogSize = 0; lock = new ReentrantReadWriteLock(); binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); + timestamps = Lists.newArrayList(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -77,6 +86,10 @@ public class TableBinlog { if (binlog.getCommitSeq() > dummy.getCommitSeq()) { binlogs.add(binlog); ++binlog.table_ref; + binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); + if (binlog.getTimestamp() > 0) { + timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); + } } } @@ -85,6 +98,10 @@ public class TableBinlog { try { binlogs.add(binlog); ++binlog.table_ref; + binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); + if (binlog.getTimestamp() > 0) { + timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); + } } finally { lock.writeLock().unlock(); } @@ -108,7 +125,7 @@ public class TableBinlog { } } - private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired, BinlogComparator checker) { + private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(BinlogComparator checker) { if (binlogs.size() <= 1) { return null; } @@ -119,9 +136,10 @@ public class TableBinlog { TBinlog lastExpiredBinlog = null; while (iter.hasNext()) { TBinlog binlog = iter.next(); - if (checker.isExpired(binlog, expired)) { + if (checker.isExpired(binlog)) { lastExpiredBinlog = binlog; --binlog.table_ref; + binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog); if (binlog.getType() == TBinlogType.UPSERT) { tombstoneUpsert = binlog; } @@ -135,9 +153,15 @@ public class TableBinlog { return null; } - dummyBinlog.setCommitSeq(lastExpiredBinlog.getCommitSeq()); + long expiredCommitSeq = lastExpiredBinlog.getCommitSeq(); + dummyBinlog.setCommitSeq(expiredCommitSeq); - return Pair.of(tombstoneUpsert, lastExpiredBinlog.getCommitSeq()); + Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator(); + while (timeIterator.hasNext() && timeIterator.next().first <= expiredCommitSeq) { + timeIterator.remove(); + } + + return Pair.of(tombstoneUpsert, expiredCommitSeq); } // this method call when db binlog enable @@ -147,8 +171,8 @@ public class TableBinlog { // step 1: get tombstoneUpsertBinlog and dummyBinlog lock.writeLock().lock(); try { - BinlogComparator check = (binlog, expire) -> binlog.getCommitSeq() <= expire; - tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredCommitSeq, check); + BinlogComparator check = (binlog) -> binlog.getCommitSeq() <= expiredCommitSeq; + tombstoneInfo = getLastUpsertAndLargestCommitSeq(check); } finally { lock.writeLock().unlock(); } @@ -171,7 +195,7 @@ public class TableBinlog { } // this method call when db binlog disable - public BinlogTombstone ttlGc() { + public BinlogTombstone gc() { // step 1: get expire time BinlogConfig tableBinlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId); if (tableBinlogConfig == null) { @@ -179,19 +203,43 @@ public class TableBinlog { } long ttlSeconds = tableBinlogConfig.getTtlSeconds(); + long maxBytes = tableBinlogConfig.getMaxBytes(); + long maxHistoryNums = tableBinlogConfig.getMaxHistoryNums(); long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); - if (expiredMs < 0) { - return null; - } - LOG.info("ttl gc. dbId: {}, tableId: {}, expiredMs: {}", dbId, tableId, expiredMs); + LOG.info( + "gc table binlog. dbId: {}, tableId: {}, expiredMs: {}, ttlSecond: {}, maxBytes: {}, " + + "maxHistoryNums: {}, now: {}", + dbId, tableId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums, System.currentTimeMillis()); // step 2: get tombstoneUpsertBinlog and dummyBinlog Pair<TBinlog, Long> tombstoneInfo; lock.writeLock().lock(); try { - BinlogComparator check = (binlog, expire) -> binlog.getTimestamp() <= expire; - tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredMs, check); + // find the last expired commit seq. + long expiredCommitSeq = -1; + Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator(); + while (timeIterator.hasNext()) { + Pair<Long, Long> entry = timeIterator.next(); + if (expiredMs < entry.second) { + break; + } + expiredCommitSeq = entry.first; + } + + final long lastExpiredCommitSeq = expiredCommitSeq; + BinlogComparator check = (binlog) -> { + // NOTE: TreeSet read size during iterator remove is valid. + // + // The expired conditions in order: + // 1. expired time + // 2. the max bytes + // 3. the max history num + return binlog.getCommitSeq() <= lastExpiredCommitSeq + || maxBytes < binlogSize + || maxHistoryNums < binlogs.size(); + }; + tombstoneInfo = getLastUpsertAndLargestCommitSeq(check); } finally { lock.writeLock().unlock(); } @@ -216,25 +264,8 @@ public class TableBinlog { public void replayGc(long largestExpiredCommitSeq) { lock.writeLock().lock(); try { - long lastSeq = -1; - Iterator<TBinlog> iter = binlogs.iterator(); - TBinlog dummyBinlog = iter.next(); - - while (iter.hasNext()) { - TBinlog binlog = iter.next(); - long commitSeq = binlog.getCommitSeq(); - if (commitSeq <= largestExpiredCommitSeq) { - lastSeq = commitSeq; - --binlog.table_ref; - iter.remove(); - } else { - break; - } - } - - if (lastSeq != -1) { - dummyBinlog.setCommitSeq(lastSeq); - } + BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= largestExpiredCommitSeq; + getLastUpsertAndLargestCommitSeq(checker); } finally { lock.writeLock().unlock(); } @@ -278,6 +309,8 @@ public class TableBinlog { info.add(dropped); String binlogLength = String.valueOf(binlogs.size()); info.add(binlogLength); + String binlogSize = String.valueOf(this.binlogSize); + info.add(binlogSize); String firstBinlogCommittedTime = null; String readableFirstBinlogCommittedTime = null; for (TBinlog binlog : binlogs) { @@ -305,10 +338,16 @@ public class TableBinlog { info.add(lastBinlogCommittedTime); info.add(readableLastBinlogCommittedTime); String binlogTtlSeconds = null; + String binlogMaxBytes = null; + String binlogMaxHistoryNums = null; if (binlogConfig != null) { binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds()); + binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes()); + binlogMaxHistoryNums = String.valueOf(binlogConfig.getMaxHistoryNums()); } info.add(binlogTtlSeconds); + info.add(binlogMaxBytes); + info.add(binlogMaxHistoryNums); result.addRow(info); } finally { diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java index 03f8d325d77..9542972359a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java @@ -277,14 +277,9 @@ public class BinlogManagerTest { for (Map.Entry<Long, List<Long>> dbEntry : frameWork.entrySet()) { long dbId = dbEntry.getKey(); for (long tableId : dbEntry.getValue()) { - if ((tableId / tableBaseId) % 2 != 0) { - addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); - addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); - ++commitSeq; - } else { - addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, 0, 0)); - addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, 0, 0)); - } + addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); + addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); + ++commitSeq; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java index b4ecd8a90c5..cd86c5935e1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java @@ -75,7 +75,7 @@ public class TableBinlogTest { } // trigger ttlGc - BinlogTombstone tombstone = tableBinlog.ttlGc(); + BinlogTombstone tombstone = tableBinlog.gc(); // check binlog status for (TBinlog binlog : testBinlogs) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org