This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e8531621c00 branch-2.1: [feat](binlog) Support getting binlogs in 
batch #47557 (#47640)
e8531621c00 is described below

commit e8531621c0080e42990c7d1474a9493060dc7a45
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 12 14:38:55 2025 +0800

    branch-2.1: [feat](binlog) Support getting binlogs in batch #47557 (#47640)
    
    Cherry-picked from #47557
    
    Co-authored-by: walter <maoch...@selectdb.com>
---
 .../java/org/apache/doris/binlog/BinlogManager.java     | 11 ++++++++++-
 .../main/java/org/apache/doris/binlog/BinlogUtils.java  | 17 ++++++++++++++---
 .../src/main/java/org/apache/doris/binlog/DBBinlog.java |  6 +++---
 .../main/java/org/apache/doris/binlog/TableBinlog.java  |  4 ++--
 .../org/apache/doris/service/FrontendServiceImpl.java   | 10 +++++-----
 gensrc/thrift/FrontendService.thrift                    |  1 +
 6 files changed, 35 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 3ec914abe63..0d5d5c249d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -459,6 +459,15 @@ public class BinlogManager {
 
     // get binlog by dbId, return first binlog.version > version
     public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long 
prevCommitSeq) {
+        Pair<TStatus, List<TBinlog>> result = getBinlog(dbId, tableId, 
prevCommitSeq, 1);
+        if (result.second != null && result.second.size() > 0) {
+            return Pair.of(result.first, result.second.get(0));
+        }
+        return Pair.of(result.first, null);
+    }
+
+    // get binlogs by dbId, return the first N binlogs, which first 
binlog.version > prevCommitSeq
+    public Pair<TStatus, List<TBinlog>> getBinlog(long dbId, long tableId, 
long prevCommitSeq, long numAcquired) {
         TStatus status = new TStatus(TStatusCode.OK);
         lock.readLock().lock();
         try {
@@ -469,7 +478,7 @@ public class BinlogManager {
                 return Pair.of(status, null);
             }
 
-            return dbBinlog.getBinlog(tableId, prevCommitSeq);
+            return dbBinlog.getBinlog(tableId, prevCommitSeq, numAcquired);
         } finally {
             lock.readLock().unlock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 6b79fab143b..e51bc931759 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -23,17 +23,23 @@ import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 public class BinlogUtils {
-    public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, 
long prevCommitSeq) {
+    public static Pair<TStatus, List<TBinlog>> getBinlog(
+            TreeSet<TBinlog> binlogs, long prevCommitSeq, long numAcquired) {
         TStatus status = new TStatus(TStatusCode.OK);
         TBinlog firstBinlog = binlogs.first();
 
         // all commitSeq > commitSeq
         if (firstBinlog.getCommitSeq() > prevCommitSeq) {
             status.setStatusCode(TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ);
-            return Pair.of(status, firstBinlog);
+            List<TBinlog> array = new ArrayList<>();
+            array.add(firstBinlog);
+            return Pair.of(status, array);
         }
 
         // find first binlog whose commitSeq > commitSeq
@@ -46,7 +52,12 @@ public class BinlogUtils {
             status.setStatusCode(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ);
             return Pair.of(status, null);
         } else {
-            return Pair.of(status, binlog);
+            numAcquired = Math.min(Math.max(numAcquired, 1), 255);
+            List<TBinlog> obtain = binlogs.tailSet(binlog)
+                    .stream()
+                    .limit(numAcquired)
+                    .collect(Collectors.toList());
+            return Pair.of(status, obtain);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 9ffc20412fe..6c45189fbbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -205,7 +205,7 @@ public class DBBinlog {
         return dbId;
     }
 
-    public Pair<TStatus, TBinlog> getBinlog(long tableId, long prevCommitSeq) {
+    public Pair<TStatus, List<TBinlog>> getBinlog(long tableId, long 
prevCommitSeq, long numAcquired) {
         TStatus status = new TStatus(TStatusCode.OK);
         lock.readLock().lock();
         try {
@@ -216,10 +216,10 @@ public class DBBinlog {
                     status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
                     return Pair.of(status, null);
                 }
-                return tableBinlog.getBinlog(prevCommitSeq);
+                return tableBinlog.getBinlog(prevCommitSeq, numAcquired);
             }
 
-            return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq);
+            return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq, 
numAcquired);
         } finally {
             lock.readLock().unlock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 162adc2603b..33aa17b1f7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -106,10 +106,10 @@ public class TableBinlog {
         }
     }
 
-    public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) {
+    public Pair<TStatus, List<TBinlog>> getBinlog(long prevCommitSeq, long 
numAcquired) {
         lock.readLock().lock();
         try {
-            return BinlogUtils.getBinlog(binlogs, prevCommitSeq);
+            return BinlogUtils.getBinlog(binlogs, prevCommitSeq, numAcquired);
         } finally {
             lock.readLock().unlock();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 408c1d1bf48..313125ce933 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2946,7 +2946,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         TGetBinlogResult result = new TGetBinlogResult();
         result.setStatus(new TStatus(TStatusCode.OK));
         long prevCommitSeq = request.getPrevCommitSeq();
-        Pair<TStatus, TBinlog> statusBinlogPair = 
env.getBinlogManager().getBinlog(dbId, tableId, prevCommitSeq);
+        long numAcquired = request.getNumAcquired();
+        Pair<TStatus, List<TBinlog>> statusBinlogPair = env.getBinlogManager()
+                .getBinlog(dbId, tableId, prevCommitSeq, numAcquired);
         TStatus status = statusBinlogPair.first;
         if (status != null && status.getStatusCode() != TStatusCode.OK) {
             result.setStatus(status);
@@ -2955,10 +2957,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 return result;
             }
         }
-        TBinlog binlog = statusBinlogPair.second;
-        if (binlog != null) {
-            List<TBinlog> binlogs = Lists.newArrayList();
-            binlogs.add(binlog);
+        List<TBinlog> binlogs = statusBinlogPair.second;
+        if (binlogs != null) {
             result.setBinlogs(binlogs);
         }
         return result;
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 8b92c7da483..e73e6c2408f 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1125,6 +1125,7 @@ struct TGetBinlogRequest {
     7: optional string user_ip
     8: optional string token
     9: optional i64 prev_commit_seq
+    10: optional i64 num_acquired // the max num of binlogs in a batch
 }
 
 enum TBinlogType {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to