This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new acf741fa80a [feature](binlog) Support gc binlogs by history nums and 
size (#35250)
acf741fa80a is described below

commit acf741fa80a3d03dc3e4bd39f3d9ed122ffdcebc
Author: walter <w41te...@gmail.com>
AuthorDate: Thu May 23 14:39:57 2024 +0800

    [feature](binlog) Support gc binlogs by history nums and size (#35250)
    
    * [chore](binlog) Add logs about binlog gc (#34359)
    
    * [feature](binlog) Support gc binlogs by history nums and size (#34888)
---
 .../org/apache/doris/binlog/BinlogComparator.java  |   2 +-
 .../org/apache/doris/binlog/BinlogConfigCache.java |   5 +-
 .../java/org/apache/doris/binlog/BinlogGcer.java   |   2 +-
 .../org/apache/doris/binlog/BinlogManager.java     |   5 +-
 .../java/org/apache/doris/binlog/BinlogUtils.java  |   8 ++
 .../java/org/apache/doris/binlog/DBBinlog.java     | 142 +++++++++++++--------
 .../java/org/apache/doris/binlog/TableBinlog.java  | 103 ++++++++++-----
 .../org/apache/doris/binlog/BinlogManagerTest.java |  11 +-
 .../org/apache/doris/binlog/TableBinlogTest.java   |   2 +-
 9 files changed, 180 insertions(+), 100 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java
index 9e35cc3bd61..edc01782f31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java
@@ -20,5 +20,5 @@ package org.apache.doris.binlog;
 import org.apache.doris.thrift.TBinlog;
 
 public interface BinlogComparator {
-    boolean isExpired(TBinlog binlog, long expired);
+    boolean isExpired(TBinlog binlog);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
index 30641bae8c6..b07f5e5d87c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
@@ -41,6 +41,8 @@ public class BinlogConfigCache {
         lock = new ReentrantReadWriteLock();
     }
 
+    // Get the binlog config of the specified db, return null if no such 
database
+    // exists.
     public BinlogConfig getDBBinlogConfig(long dbId) {
         lock.readLock().lock();
         BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId);
@@ -110,7 +112,8 @@ public class BinlogConfigCache {
             OlapTable olapTable = (OlapTable) table;
             tableBinlogConfig = olapTable.getBinlogConfig();
             // get table binlog config, when table modify binlogConfig
-            // it create a new binlog, not update inplace, so we don't need to 
clone binlogConfig
+            // it create a new binlog, not update inplace, so we don't need to 
clone
+            // binlogConfig
             dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
             return tableBinlogConfig;
         } catch (Exception e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
index 70118076114..c3e14e4955b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
@@ -58,7 +58,7 @@ public class BinlogGcer extends MasterDaemon {
         try {
             List<BinlogTombstone> tombstones = 
Env.getCurrentEnv().getBinlogManager().gc();
             if (tombstones != null && !tombstones.isEmpty()) {
-                LOG.info("tomebstones size: {}", tombstones.size());
+                LOG.info("tombstones size: {}", tombstones.size());
             } else {
                 LOG.info("no gc binlog");
                 return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 96d0f7f4e13..454f678e2e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -58,9 +58,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class BinlogManager {
     private static final int BUFFER_SIZE = 16 * 1024;
     private static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>().add("Name")
-            
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime")
+            
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("BinlogSize").add("FirstBinlogCommittedTime")
             
.add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime")
-            .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds")
+            
.add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds").add("BinlogMaxBytes")
+            .add("BinlogMaxHistoryNums")
             .build();
 
     private static final Logger LOG = 
LogManager.getLogger(BinlogManager.class);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 0f6c2308248..6b79fab143b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -81,6 +81,7 @@ public class BinlogUtils {
         return dummy;
     }
 
+    // Compute the expired timestamp in milliseconds.
     public static long getExpiredMs(long ttlSeconds) {
         long currentSeconds = System.currentTimeMillis() / 1000;
         if (currentSeconds < ttlSeconds) {
@@ -94,4 +95,11 @@ public class BinlogUtils {
     public static String convertTimeToReadable(long time) {
         return new java.text.SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss").format(new java.util.Date(time));
     }
+
+    public static long getApproximateMemoryUsage(TBinlog binlog) {
+        /* object layout: header + body + padding */
+        final long objSize = 80; // 9 fields and 1 header
+        String data = binlog.getData();
+        return objSize + binlog.getTableIdsSize() * 8 + (data == null ? 0 : 
data.length());
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index a3133bfb5c7..79e1adf20c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -45,6 +45,8 @@ public class DBBinlog {
     private static final Logger LOG = 
LogManager.getLogger(BinlogManager.class);
 
     private long dbId;
+    // The size of all binlogs.
+    private long binlogSize;
     // guard for allBinlogs && tableBinlogMap
     private ReentrantReadWriteLock lock;
     // all binlogs contain table binlogs && create table binlog etc ...
@@ -64,6 +66,7 @@ public class DBBinlog {
         lock = new ReentrantReadWriteLock();
         this.dbId = binlog.getDbId();
         this.binlogConfigCache = binlogConfigCache;
+        this.binlogSize = 0;
 
         // allBinlogs treeset order by commitSeq
         allBinlogs = 
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
@@ -81,7 +84,7 @@ public class DBBinlog {
     }
 
     public static DBBinlog recoverDbBinlog(BinlogConfigCache 
binlogConfigCache, TBinlog dbDummy,
-                                           List<TBinlog> tableDummies, boolean 
dbBinlogEnable) {
+            List<TBinlog> tableDummies, boolean dbBinlogEnable) {
         DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy);
         long dbId = dbDummy.getDbId();
         for (TBinlog tableDummy : tableDummies) {
@@ -105,6 +108,7 @@ public class DBBinlog {
         }
 
         allBinlogs.add(binlog);
+        binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
 
         if (tableIds == null) {
             return;
@@ -119,12 +123,13 @@ public class DBBinlog {
         }
     }
 
-    // TODO(Drogon): remove TableBinlog after DropTable, think table drop && 
recovery
+    // TODO(Drogon): remove TableBinlog after DropTable, think table drop &&
+    // recovery
     private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean 
dbBinlogEnable) {
         TableBinlog tableBinlog = tableBinlogMap.get(tableId);
         if (tableBinlog == null) {
             if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId, 
tableId)) {
-                tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, 
 tableId);
+                tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, 
tableId);
                 tableBinlogMap.put(tableId, tableBinlog);
                 tableDummyBinlogs.add(tableBinlog.getDummyBinlog());
             }
@@ -132,7 +137,8 @@ public class DBBinlog {
         return tableBinlog;
     }
 
-    // guard by BinlogManager, if addBinlog called, more than one(db/tables) 
enable binlog
+    // guard by BinlogManager, if addBinlog called, more than one(db/tables) 
enable
+    // binlog
     public void addBinlog(TBinlog binlog) {
         boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
         List<Long> tableIds = binlog.getTableIds();
@@ -140,6 +146,7 @@ public class DBBinlog {
         lock.writeLock().lock();
         try {
             allBinlogs.add(binlog);
+            binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
 
             if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
                 timestamps.add(Pair.of(binlog.getCommitSeq(), 
binlog.getTimestamp()));
@@ -226,14 +233,10 @@ public class DBBinlog {
             return null;
         }
 
-        boolean dbBinlogEnable = dbBinlogConfig.isEnable();
         BinlogTombstone tombstone;
-        if (dbBinlogEnable) {
+        if (dbBinlogConfig.isEnable()) {
             // db binlog is enabled, only one binlogTombstones
-            long ttlSeconds = dbBinlogConfig.getTtlSeconds();
-            long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
-
-            tombstone = dbBinlogEnableGc(expiredMs);
+            tombstone = dbBinlogEnableGc(dbBinlogConfig);
         } else {
             tombstone = dbBinlogDisableGc();
         }
@@ -277,7 +280,7 @@ public class DBBinlog {
         }
 
         for (TableBinlog tableBinlog : tableBinlogs) {
-            BinlogTombstone tombstone = tableBinlog.ttlGc();
+            BinlogTombstone tombstone = tableBinlog.gc();
             if (tombstone != null) {
                 tombstones.add(tombstone);
             }
@@ -297,6 +300,7 @@ public class DBBinlog {
             TBinlog dummy = binlogIter.next();
             boolean foundFirstUsingBinlog = false;
             long lastCommitSeq = -1;
+            long removed = 0;
 
             while (binlogIter.hasNext()) {
                 TBinlog binlog = binlogIter.next();
@@ -304,6 +308,8 @@ public class DBBinlog {
                 if (commitSeq <= largestExpiredCommitSeq) {
                     if (binlog.table_ref <= 0) {
                         binlogIter.remove();
+                        binlogSize -= 
BinlogUtils.getApproximateMemoryUsage(binlog);
+                        ++removed;
                         if (!foundFirstUsingBinlog) {
                             lastCommitSeq = commitSeq;
                         }
@@ -318,52 +324,92 @@ public class DBBinlog {
             if (lastCommitSeq != -1) {
                 dummy.setCommitSeq(lastCommitSeq);
             }
+
+            LOG.info("remove {} expired binlogs, dbId: {}, left: {}", removed, 
dbId, allBinlogs.size());
         } finally {
             lock.writeLock().unlock();
         }
     }
 
-    private BinlogTombstone dbBinlogEnableGc(long expiredMs) {
+    private TBinlog getLastExpiredBinlog(BinlogComparator checker) {
+        TBinlog lastExpiredBinlog = null;
+
+        Iterator<TBinlog> binlogIter = allBinlogs.iterator();
+        TBinlog dummy = binlogIter.next();
+        while (binlogIter.hasNext()) {
+            TBinlog binlog = binlogIter.next();
+            if (checker.isExpired(binlog)) {
+                binlogIter.remove();
+                binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
+                lastExpiredBinlog = binlog;
+            } else {
+                break;
+            }
+        }
+
+        if (lastExpiredBinlog != null) {
+            dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq());
+
+            // release expired timestamps by commit seq.
+            Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
+            while (timeIter.hasNext() && timeIter.next().first <= 
lastExpiredBinlog.getCommitSeq()) {
+                timeIter.remove();
+            }
+        }
+
+        return lastExpiredBinlog;
+    }
+
+    private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
+        long ttlSeconds = dbBinlogConfig.getTtlSeconds();
+        long maxBytes = dbBinlogConfig.getMaxBytes();
+        long maxHistoryNums = dbBinlogConfig.getMaxHistoryNums();
+        long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
+
+        LOG.info("gc db binlog. dbId: {}, expiredMs: {}, ttlSecond: {}, 
maxBytes: {}, maxHistoryNums: {}",
+                dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums);
+
         // step 1: get current tableBinlog info and expiredCommitSeq
-        long expiredCommitSeq = -1;
+        TBinlog lastExpiredBinlog = null;
         lock.writeLock().lock();
         try {
+            long expiredCommitSeq = -1;
             Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
             while (timeIter.hasNext()) {
                 Pair<Long, Long> pair = timeIter.next();
-                if (pair.second <= expiredMs) {
-                    expiredCommitSeq = pair.first;
-                    timeIter.remove();
-                } else {
+                if (pair.second > expiredMs) {
                     break;
                 }
+                expiredCommitSeq = pair.first;
             }
 
-            Iterator<TBinlog> binlogIter = allBinlogs.iterator();
-            TBinlog dummy = binlogIter.next();
-            dummy.setCommitSeq(expiredCommitSeq);
-
-            while (binlogIter.hasNext()) {
-                TBinlog binlog = binlogIter.next();
-                if (binlog.getCommitSeq() <= expiredCommitSeq) {
-                    binlogIter.remove();
-                } else {
-                    break;
-                }
-            }
+            final long lastExpiredCommitSeq = expiredCommitSeq;
+            BinlogComparator checker = (binlog) -> {
+                // NOTE: TreeSet read size during iterator remove is valid.
+                //
+                // The expired conditions in order:
+                // 1. expired time
+                // 2. the max bytes
+                // 3. the max history num
+                return binlog.getCommitSeq() <= lastExpiredCommitSeq
+                        || maxBytes < binlogSize
+                        || maxHistoryNums < allBinlogs.size();
+            };
+            lastExpiredBinlog = getLastExpiredBinlog(checker);
         } finally {
             lock.writeLock().unlock();
         }
 
-        if (expiredCommitSeq == -1) {
+        if (lastExpiredBinlog == null) {
             return null;
         }
 
-        // step 2: gc every tableBinlog in dbBinlog, get table tombstone to 
complete db tombstone
+        // step 2: gc every tableBinlog in dbBinlog, get table tombstone to 
complete db
+        // tombstone
         List<BinlogTombstone> tableTombstones = Lists.newArrayList();
         for (TableBinlog tableBinlog : tableBinlogMap.values()) {
             // step 2.1: gc tableBinlog,and get table tombstone
-            BinlogTombstone tableTombstone = 
tableBinlog.commitSeqGc(expiredCommitSeq);
+            BinlogTombstone tableTombstone = 
tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq());
             if (tableTombstone != null) {
                 tableTombstones.add(tableTombstone);
             }
@@ -386,28 +432,8 @@ public class DBBinlog {
 
         lock.writeLock().lock();
         try {
-            Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
-            while (timeIter.hasNext()) {
-                long commitSeq = timeIter.next().first;
-                if (commitSeq <= largestExpiredCommitSeq) {
-                    timeIter.remove();
-                } else {
-                    break;
-                }
-            }
-
-            Iterator<TBinlog> binlogIter = allBinlogs.iterator();
-            TBinlog dummy = binlogIter.next();
-            dummy.setCommitSeq(largestExpiredCommitSeq);
-
-            while (binlogIter.hasNext()) {
-                TBinlog binlog = binlogIter.next();
-                if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
-                    binlogIter.remove();
-                } else {
-                    break;
-                }
-            }
+            BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= 
largestExpiredCommitSeq;
+            getLastExpiredBinlog(checker);
         } finally {
             lock.writeLock().unlock();
         }
@@ -478,6 +504,8 @@ public class DBBinlog {
                 info.add(dropped);
                 String binlogLength = String.valueOf(allBinlogs.size());
                 info.add(binlogLength);
+                String binlogSize = String.valueOf(this.binlogSize);
+                info.add(binlogSize);
                 String firstBinlogCommittedTime = null;
                 String readableFirstBinlogCommittedTime = null;
                 if (!timestamps.isEmpty()) {
@@ -497,10 +525,16 @@ public class DBBinlog {
                 info.add(lastBinlogCommittedTime);
                 info.add(readableLastBinlogCommittedTime);
                 String binlogTtlSeconds = null;
+                String binlogMaxBytes = null;
+                String binlogMaxHistoryNums = null;
                 if (binlogConfig != null) {
                     binlogTtlSeconds = 
String.valueOf(binlogConfig.getTtlSeconds());
+                    binlogMaxBytes = 
String.valueOf(binlogConfig.getMaxBytes());
+                    binlogMaxHistoryNums = 
String.valueOf(binlogConfig.getMaxHistoryNums());
                 }
                 info.add(binlogTtlSeconds);
+                info.add(binlogMaxBytes);
+                info.add(binlogMaxHistoryNums);
 
                 result.addRow(info);
             } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 3dd290a07f8..36ec4f733ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -27,6 +27,7 @@ import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -43,15 +44,23 @@ public class TableBinlog {
 
     private long dbId;
     private long tableId;
+    private long binlogSize;
     private ReentrantReadWriteLock lock;
     private TreeSet<TBinlog> binlogs;
+
+    // Pair(commitSeq, timestamp), used for gc
+    // need UpsertRecord to add timestamps for gc
+    private List<Pair<Long, Long>> timestamps;
+
     private BinlogConfigCache binlogConfigCache;
 
     public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, 
long dbId, long tableId) {
         this.dbId = dbId;
         this.tableId = tableId;
+        this.binlogSize = 0;
         lock = new ReentrantReadWriteLock();
         binlogs = 
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
+        timestamps = Lists.newArrayList();
 
         TBinlog dummy;
         if (binlog.getType() == TBinlogType.DUMMY) {
@@ -77,6 +86,10 @@ public class TableBinlog {
         if (binlog.getCommitSeq() > dummy.getCommitSeq()) {
             binlogs.add(binlog);
             ++binlog.table_ref;
+            binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
+            if (binlog.getTimestamp() > 0) {
+                timestamps.add(Pair.of(binlog.getCommitSeq(), 
binlog.getTimestamp()));
+            }
         }
     }
 
@@ -85,6 +98,10 @@ public class TableBinlog {
         try {
             binlogs.add(binlog);
             ++binlog.table_ref;
+            binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
+            if (binlog.getTimestamp() > 0) {
+                timestamps.add(Pair.of(binlog.getCommitSeq(), 
binlog.getTimestamp()));
+            }
         } finally {
             lock.writeLock().unlock();
         }
@@ -108,7 +125,7 @@ public class TableBinlog {
         }
     }
 
-    private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired, 
BinlogComparator checker) {
+    private Pair<TBinlog, Long> 
getLastUpsertAndLargestCommitSeq(BinlogComparator checker) {
         if (binlogs.size() <= 1) {
             return null;
         }
@@ -119,9 +136,10 @@ public class TableBinlog {
         TBinlog lastExpiredBinlog = null;
         while (iter.hasNext()) {
             TBinlog binlog = iter.next();
-            if (checker.isExpired(binlog, expired)) {
+            if (checker.isExpired(binlog)) {
                 lastExpiredBinlog = binlog;
                 --binlog.table_ref;
+                binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
                 if (binlog.getType() == TBinlogType.UPSERT) {
                     tombstoneUpsert = binlog;
                 }
@@ -135,9 +153,15 @@ public class TableBinlog {
             return null;
         }
 
-        dummyBinlog.setCommitSeq(lastExpiredBinlog.getCommitSeq());
+        long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
+        dummyBinlog.setCommitSeq(expiredCommitSeq);
 
-        return Pair.of(tombstoneUpsert, lastExpiredBinlog.getCommitSeq());
+        Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
+        while (timeIterator.hasNext() && timeIterator.next().first <= 
expiredCommitSeq) {
+            timeIterator.remove();
+        }
+
+        return Pair.of(tombstoneUpsert, expiredCommitSeq);
     }
 
     // this method call when db binlog enable
@@ -147,8 +171,8 @@ public class TableBinlog {
         // step 1: get tombstoneUpsertBinlog and dummyBinlog
         lock.writeLock().lock();
         try {
-            BinlogComparator check = (binlog, expire) -> binlog.getCommitSeq() 
<= expire;
-            tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredCommitSeq, 
check);
+            BinlogComparator check = (binlog) -> binlog.getCommitSeq() <= 
expiredCommitSeq;
+            tombstoneInfo = getLastUpsertAndLargestCommitSeq(check);
         } finally {
             lock.writeLock().unlock();
         }
@@ -171,7 +195,7 @@ public class TableBinlog {
     }
 
     // this method call when db binlog disable
-    public BinlogTombstone ttlGc() {
+    public BinlogTombstone gc() {
         // step 1: get expire time
         BinlogConfig tableBinlogConfig = 
binlogConfigCache.getTableBinlogConfig(dbId, tableId);
         if (tableBinlogConfig == null) {
@@ -179,19 +203,43 @@ public class TableBinlog {
         }
 
         long ttlSeconds = tableBinlogConfig.getTtlSeconds();
+        long maxBytes = tableBinlogConfig.getMaxBytes();
+        long maxHistoryNums = tableBinlogConfig.getMaxHistoryNums();
         long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
 
-        if (expiredMs < 0) {
-            return null;
-        }
-        LOG.info("ttl gc. dbId: {}, tableId: {}, expiredMs: {}", dbId, 
tableId, expiredMs);
+        LOG.info(
+                "gc table binlog. dbId: {}, tableId: {}, expiredMs: {}, 
ttlSecond: {}, maxBytes: {}, "
+                        + "maxHistoryNums: {}, now: {}",
+                dbId, tableId, expiredMs, ttlSeconds, maxBytes, 
maxHistoryNums, System.currentTimeMillis());
 
         // step 2: get tombstoneUpsertBinlog and dummyBinlog
         Pair<TBinlog, Long> tombstoneInfo;
         lock.writeLock().lock();
         try {
-            BinlogComparator check = (binlog, expire) -> binlog.getTimestamp() 
<= expire;
-            tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredMs, check);
+            // find the last expired commit seq.
+            long expiredCommitSeq = -1;
+            Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
+            while (timeIterator.hasNext()) {
+                Pair<Long, Long> entry = timeIterator.next();
+                if (expiredMs < entry.second) {
+                    break;
+                }
+                expiredCommitSeq = entry.first;
+            }
+
+            final long lastExpiredCommitSeq = expiredCommitSeq;
+            BinlogComparator check = (binlog) -> {
+                // NOTE: TreeSet read size during iterator remove is valid.
+                //
+                // The expired conditions in order:
+                // 1. expired time
+                // 2. the max bytes
+                // 3. the max history num
+                return binlog.getCommitSeq() <= lastExpiredCommitSeq
+                        || maxBytes < binlogSize
+                        || maxHistoryNums < binlogs.size();
+            };
+            tombstoneInfo = getLastUpsertAndLargestCommitSeq(check);
         } finally {
             lock.writeLock().unlock();
         }
@@ -216,25 +264,8 @@ public class TableBinlog {
     public void replayGc(long largestExpiredCommitSeq) {
         lock.writeLock().lock();
         try {
-            long lastSeq = -1;
-            Iterator<TBinlog> iter = binlogs.iterator();
-            TBinlog dummyBinlog = iter.next();
-
-            while (iter.hasNext()) {
-                TBinlog binlog = iter.next();
-                long commitSeq = binlog.getCommitSeq();
-                if (commitSeq <= largestExpiredCommitSeq) {
-                    lastSeq = commitSeq;
-                    --binlog.table_ref;
-                    iter.remove();
-                } else {
-                    break;
-                }
-            }
-
-            if (lastSeq != -1) {
-                dummyBinlog.setCommitSeq(lastSeq);
-            }
+            BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= 
largestExpiredCommitSeq;
+            getLastUpsertAndLargestCommitSeq(checker);
         } finally {
             lock.writeLock().unlock();
         }
@@ -278,6 +309,8 @@ public class TableBinlog {
             info.add(dropped);
             String binlogLength = String.valueOf(binlogs.size());
             info.add(binlogLength);
+            String binlogSize = String.valueOf(this.binlogSize);
+            info.add(binlogSize);
             String firstBinlogCommittedTime = null;
             String readableFirstBinlogCommittedTime = null;
             for (TBinlog binlog : binlogs) {
@@ -305,10 +338,16 @@ public class TableBinlog {
             info.add(lastBinlogCommittedTime);
             info.add(readableLastBinlogCommittedTime);
             String binlogTtlSeconds = null;
+            String binlogMaxBytes = null;
+            String binlogMaxHistoryNums = null;
             if (binlogConfig != null) {
                 binlogTtlSeconds = 
String.valueOf(binlogConfig.getTtlSeconds());
+                binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes());
+                binlogMaxHistoryNums = 
String.valueOf(binlogConfig.getMaxHistoryNums());
             }
             info.add(binlogTtlSeconds);
+            info.add(binlogMaxBytes);
+            info.add(binlogMaxHistoryNums);
 
             result.addRow(info);
         } finally {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java
index 03f8d325d77..9542972359a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java
@@ -277,14 +277,9 @@ public class BinlogManagerTest {
         for (Map.Entry<Long, List<Long>> dbEntry : frameWork.entrySet()) {
             long dbId = dbEntry.getKey();
             for (long tableId : dbEntry.getValue()) {
-                if ((tableId / tableBaseId) % 2 != 0) {
-                    addBinlog.invoke(originManager, 
BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
-                    addBinlog.invoke(newManager, 
BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
-                    ++commitSeq;
-                } else {
-                    addBinlog.invoke(originManager, 
BinlogTestUtils.newBinlog(dbId, tableId, 0, 0));
-                    addBinlog.invoke(newManager, 
BinlogTestUtils.newBinlog(dbId, tableId, 0, 0));
-                }
+                addBinlog.invoke(originManager, 
BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
+                addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, 
tableId, commitSeq, timeNow));
+                ++commitSeq;
             }
         }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
index b4ecd8a90c5..cd86c5935e1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
@@ -75,7 +75,7 @@ public class TableBinlogTest {
         }
 
         // trigger ttlGc
-        BinlogTombstone tombstone = tableBinlog.ttlGc();
+        BinlogTombstone tombstone = tableBinlog.gc();
 
         // check binlog status
         for (TBinlog binlog : testBinlogs) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to