This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e8531621c00 branch-2.1: [feat](binlog) Support getting binlogs in batch #47557 (#47640) e8531621c00 is described below commit e8531621c0080e42990c7d1474a9493060dc7a45 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Feb 12 14:38:55 2025 +0800 branch-2.1: [feat](binlog) Support getting binlogs in batch #47557 (#47640) Cherry-picked from #47557 Co-authored-by: walter <maoch...@selectdb.com> --- .../java/org/apache/doris/binlog/BinlogManager.java | 11 ++++++++++- .../main/java/org/apache/doris/binlog/BinlogUtils.java | 17 ++++++++++++++--- .../src/main/java/org/apache/doris/binlog/DBBinlog.java | 6 +++--- .../main/java/org/apache/doris/binlog/TableBinlog.java | 4 ++-- .../org/apache/doris/service/FrontendServiceImpl.java | 10 +++++----- gensrc/thrift/FrontendService.thrift | 1 + 6 files changed, 35 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 3ec914abe63..0d5d5c249d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -459,6 +459,15 @@ public class BinlogManager { // get binlog by dbId, return first binlog.version > version public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) { + Pair<TStatus, List<TBinlog>> result = getBinlog(dbId, tableId, prevCommitSeq, 1); + if (result.second != null && result.second.size() > 0) { + return Pair.of(result.first, result.second.get(0)); + } + return Pair.of(result.first, null); + } + + // get binlogs by dbId, return the first N binlogs, which first binlog.version > prevCommitSeq + public Pair<TStatus, List<TBinlog>> getBinlog(long dbId, long tableId, long prevCommitSeq, long numAcquired) { TStatus status = new TStatus(TStatusCode.OK); lock.readLock().lock(); try { @@ -469,7 +478,7 @@ public class BinlogManager { return Pair.of(status, null); } - return dbBinlog.getBinlog(tableId, prevCommitSeq); + return dbBinlog.getBinlog(tableId, prevCommitSeq, numAcquired); } finally { lock.readLock().unlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java index 6b79fab143b..e51bc931759 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java @@ -23,17 +23,23 @@ import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import java.util.ArrayList; +import java.util.List; import java.util.TreeSet; +import java.util.stream.Collectors; public class BinlogUtils { - public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, long prevCommitSeq) { + public static Pair<TStatus, List<TBinlog>> getBinlog( + TreeSet<TBinlog> binlogs, long prevCommitSeq, long numAcquired) { TStatus status = new TStatus(TStatusCode.OK); TBinlog firstBinlog = binlogs.first(); // all commitSeq > commitSeq if (firstBinlog.getCommitSeq() > prevCommitSeq) { status.setStatusCode(TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ); - return Pair.of(status, firstBinlog); + List<TBinlog> array = new ArrayList<>(); + array.add(firstBinlog); + return Pair.of(status, array); } // find first binlog whose commitSeq > commitSeq @@ -46,7 +52,12 @@ public class BinlogUtils { status.setStatusCode(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ); return Pair.of(status, null); } else { - return Pair.of(status, binlog); + numAcquired = Math.min(Math.max(numAcquired, 1), 255); + List<TBinlog> obtain = binlogs.tailSet(binlog) + .stream() + .limit(numAcquired) + .collect(Collectors.toList()); + return Pair.of(status, obtain); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 9ffc20412fe..6c45189fbbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -205,7 +205,7 @@ public class DBBinlog { return dbId; } - public Pair<TStatus, TBinlog> getBinlog(long tableId, long prevCommitSeq) { + public Pair<TStatus, List<TBinlog>> getBinlog(long tableId, long prevCommitSeq, long numAcquired) { TStatus status = new TStatus(TStatusCode.OK); lock.readLock().lock(); try { @@ -216,10 +216,10 @@ public class DBBinlog { status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE); return Pair.of(status, null); } - return tableBinlog.getBinlog(prevCommitSeq); + return tableBinlog.getBinlog(prevCommitSeq, numAcquired); } - return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq); + return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq, numAcquired); } finally { lock.readLock().unlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 162adc2603b..33aa17b1f7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -106,10 +106,10 @@ public class TableBinlog { } } - public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) { + public Pair<TStatus, List<TBinlog>> getBinlog(long prevCommitSeq, long numAcquired) { lock.readLock().lock(); try { - return BinlogUtils.getBinlog(binlogs, prevCommitSeq); + return BinlogUtils.getBinlog(binlogs, prevCommitSeq, numAcquired); } finally { lock.readLock().unlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 408c1d1bf48..313125ce933 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2946,7 +2946,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { TGetBinlogResult result = new TGetBinlogResult(); result.setStatus(new TStatus(TStatusCode.OK)); long prevCommitSeq = request.getPrevCommitSeq(); - Pair<TStatus, TBinlog> statusBinlogPair = env.getBinlogManager().getBinlog(dbId, tableId, prevCommitSeq); + long numAcquired = request.getNumAcquired(); + Pair<TStatus, List<TBinlog>> statusBinlogPair = env.getBinlogManager() + .getBinlog(dbId, tableId, prevCommitSeq, numAcquired); TStatus status = statusBinlogPair.first; if (status != null && status.getStatusCode() != TStatusCode.OK) { result.setStatus(status); @@ -2955,10 +2957,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } } - TBinlog binlog = statusBinlogPair.second; - if (binlog != null) { - List<TBinlog> binlogs = Lists.newArrayList(); - binlogs.add(binlog); + List<TBinlog> binlogs = statusBinlogPair.second; + if (binlogs != null) { result.setBinlogs(binlogs); } return result; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 8b92c7da483..e73e6c2408f 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1125,6 +1125,7 @@ struct TGetBinlogRequest { 7: optional string user_ip 8: optional string token 9: optional i64 prev_commit_seq + 10: optional i64 num_acquired // the max num of binlogs in a batch } enum TBinlogType { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org