This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 360933b885d branch-2.1: [feat](binlog) Speed binlog gc by locked 
binlogs #47547 (#48128)
360933b885d is described below

commit 360933b885d69d33723d56f44b73e846919a7a96
Author: walter <maoch...@selectdb.com>
AuthorDate: Thu Feb 20 17:08:26 2025 +0800

    branch-2.1: [feat](binlog) Speed binlog gc by locked binlogs #47547 (#48128)
    
    cherry pick from #47547
---
 .../main/java/org/apache/doris/common/Config.java  |  2 +-
 .../org/apache/doris/binlog/BinlogManager.java     |  5 ++-
 .../java/org/apache/doris/binlog/DBBinlog.java     | 45 +++++++++++++++++++---
 .../java/org/apache/doris/binlog/TableBinlog.java  | 20 +++++++++-
 4 files changed, 63 insertions(+), 9 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index bbfa2f30e3a..5deaf8b51d5 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2417,7 +2417,7 @@ public class Config extends ConfigBase {
     public static int max_binlog_messsage_size = 1024 * 1024 * 1024;
 
     @ConfField(mutable = true, masterOnly = true, description = {
-            "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
+            "是否禁止使用 WITH RESOURCE 语句创建 Catalog。",
             "Whether to disable creating catalog with WITH RESOURCE 
statement."})
     public static boolean disallow_create_catalog_with_resource = true;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 2403ede6fb3..d4bf72a1e9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -112,6 +112,10 @@ public class BinlogManager {
             return;
         }
 
+        LOG.debug("add binlog, db {}, table {}, commitSeq {}, timestamp {}, 
type {}, data {}",
+                binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), 
binlog.getTimestamp(), binlog.getType(),
+                binlog.getData());
+
         DBBinlog dbBinlog;
         lock.writeLock().lock();
         try {
@@ -589,7 +593,6 @@ public class BinlogManager {
         return tombstones;
     }
 
-
     public void replayGc(BinlogGcInfo binlogGcInfo) {
         lock.writeLock().lock();
         Map<Long, DBBinlog> gcDbBinlogMap;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 675c9dc78a8..bd7fd184426 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -43,6 +43,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
@@ -288,7 +289,6 @@ public class DBBinlog {
 
     public Pair<TStatus, Long> lockBinlog(long tableId, String jobUniqueId, 
long lockCommitSeq) {
         TableBinlog tableBinlog = null;
-
         lock.writeLock().lock();
         try {
             if (tableId < 0) {
@@ -457,20 +457,43 @@ public class DBBinlog {
         }
 
         if (lastExpiredBinlog != null) {
-            dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq());
+            final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
+            dummy.setCommitSeq(expiredCommitSeq);
 
             // release expired timestamps by commit seq.
             Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
-            while (timeIter.hasNext() && timeIter.next().first <= 
lastExpiredBinlog.getCommitSeq()) {
+            while (timeIter.hasNext() && timeIter.next().first <= 
expiredCommitSeq) {
                 timeIter.remove();
             }
 
-            gcDroppedResources(lastExpiredBinlog.getCommitSeq());
+            lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <= 
expiredCommitSeq);
+            gcDroppedResources(expiredCommitSeq);
         }
 
         return lastExpiredBinlog;
     }
 
+    private Optional<Long> getMinLockedCommitSeq() {
+        lock.readLock().lock();
+        try {
+            Optional<Long> minLockedCommitSeq = 
lockedBinlogs.values().stream().min(Long::compareTo);
+            for (TableBinlog tableBinlog : tableBinlogMap.values()) {
+                Optional<Long> tableMinLockedCommitSeq = 
tableBinlog.getMinLockedCommitSeq();
+                if (!tableMinLockedCommitSeq.isPresent()) {
+                    continue;
+                }
+                if (minLockedCommitSeq.isPresent()) {
+                    minLockedCommitSeq = 
Optional.of(Math.min(minLockedCommitSeq.get(), tableMinLockedCommitSeq.get()));
+                } else {
+                    minLockedCommitSeq = tableMinLockedCommitSeq;
+                }
+            }
+            return minLockedCommitSeq;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
         long ttlSeconds = dbBinlogConfig.getTtlSeconds();
         long maxBytes = dbBinlogConfig.getMaxBytes();
@@ -481,10 +504,12 @@ public class DBBinlog {
                 dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums);
 
         // step 1: get current tableBinlog info and expiredCommitSeq
+        Optional<Long> minLockedCommitSeq = getMinLockedCommitSeq();
         TBinlog lastExpiredBinlog = null;
+        List<TableBinlog> tableBinlogs = Lists.newArrayList();
         lock.writeLock().lock();
         try {
-            long expiredCommitSeq = -1;
+            long expiredCommitSeq = -1L;
             Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
             while (timeIter.hasNext()) {
                 Pair<Long, Long> pair = timeIter.next();
@@ -494,6 +519,13 @@ public class DBBinlog {
                 expiredCommitSeq = pair.first;
             }
 
+            // Speed up gc by recycling binlogs that are not locked by syncer.
+            // To keep compatible with the old version, if no binlog is locked 
here, fallthrough to the
+            // previous behavior (keep the entire binlogs until it is expired).
+            if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L < 
minLockedCommitSeq.get()) {
+                expiredCommitSeq = minLockedCommitSeq.get() - 1L;
+            }
+
             final long lastExpiredCommitSeq = expiredCommitSeq;
             BinlogComparator checker = (binlog) -> {
                 // NOTE: TreeSet read size during iterator remove is valid.
@@ -507,6 +539,7 @@ public class DBBinlog {
                         || maxHistoryNums < allBinlogs.size();
             };
             lastExpiredBinlog = getLastExpiredBinlog(checker);
+            tableBinlogs.addAll(tableBinlogMap.values());
         } finally {
             lock.writeLock().unlock();
         }
@@ -518,7 +551,7 @@ public class DBBinlog {
         // step 2: gc every tableBinlog in dbBinlog, get table tombstone to 
complete db
         // tombstone
         List<BinlogTombstone> tableTombstones = Lists.newArrayList();
-        for (TableBinlog tableBinlog : tableBinlogMap.values()) {
+        for (TableBinlog tableBinlog : tableBinlogs) {
             // step 2.1: gc tableBinlog,and get table tombstone
             BinlogTombstone tableTombstone = 
tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq());
             if (tableTombstone != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index b6cf328eccc..cef60c85ac4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -39,6 +39,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -171,6 +172,15 @@ public class TableBinlog {
         return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
     }
 
+    public Optional<Long> getMinLockedCommitSeq() {
+        lock.readLock().lock();
+        try {
+            return lockedBinlogs.values().stream().min(Long::compareTo);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     private Pair<TBinlog, Long> 
getLastUpsertAndLargestCommitSeq(BinlogComparator checker) {
         if (binlogs.size() <= 1) {
             return null;
@@ -199,7 +209,7 @@ public class TableBinlog {
             return null;
         }
 
-        long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
+        final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
         dummyBinlog.setCommitSeq(expiredCommitSeq);
 
         Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
@@ -207,6 +217,7 @@ public class TableBinlog {
             timeIterator.remove();
         }
 
+        lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <= 
expiredCommitSeq);
         return Pair.of(tombstoneUpsert, expiredCommitSeq);
     }
 
@@ -279,6 +290,13 @@ public class TableBinlog {
                     }
                     expiredCommitSeq = entry.first;
                 }
+
+                // find the min locked binlog commit seq, if not exists, use 
the last binlog commit seq.
+                Optional<Long> minLockedCommitSeq = 
lockedBinlogs.values().stream().min(Long::compareTo);
+                if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L < 
minLockedCommitSeq.get()) {
+                    // Speed up the gc progress by the min locked commit seq.
+                    expiredCommitSeq = minLockedCommitSeq.get() - 1L;
+                }
             }
 
             final long lastExpiredCommitSeq = expiredCommitSeq;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to