This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d88d6f0075c branch-3.0: [feat](binlog) Add lock binlog method #46887 
(#48045)
d88d6f0075c is described below

commit d88d6f0075c73bd1f35c178d39afb81a460ac3d0
Author: walter <maoch...@selectdb.com>
AuthorDate: Wed Feb 19 19:22:22 2025 +0800

    branch-3.0: [feat](binlog) Add lock binlog method #46887 (#48045)
    
    cherry pick from #46887
---
 .../org/apache/doris/binlog/BinlogManager.java     |  25 ++++-
 .../java/org/apache/doris/binlog/DBBinlog.java     |  54 +++++++++++
 .../java/org/apache/doris/binlog/TableBinlog.java  |  47 ++++++++++
 .../apache/doris/service/FrontendServiceImpl.java  | 104 ++++++++++++++++++++-
 gensrc/thrift/FrontendService.thrift               |  24 +++++
 5 files changed, 247 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 723262ff31b..b15e511e75c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -130,7 +130,7 @@ public class BinlogManager {
     }
 
     private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, 
long timestamp, TBinlogType type,
-                           String data, boolean removeEnableCache, Object raw) 
{
+            String data, boolean removeEnableCache, Object raw) {
         if (!Config.enable_feature_binlog) {
             return;
         }
@@ -447,7 +447,6 @@ public class BinlogManager {
         addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, 
info);
     }
 
-
     private boolean supportedRecoverInfo(RecoverInfo info) {
         //table name and partitionName added together.
         // recover table case, tablename must exist in newer version
@@ -518,6 +517,26 @@ public class BinlogManager {
         }
     }
 
+    public Pair<TStatus, Long> lockBinlog(long dbId, long tableId,
+            String jobUniqueId, long lockCommitSeq) {
+        LOG.debug("lock binlog. dbId: {}, tableId: {}, jobUniqueId: {}, 
lockCommitSeq: {}",
+                dbId, tableId, jobUniqueId, lockCommitSeq);
+
+        DBBinlog dbBinlog = null;
+        lock.readLock().lock();
+        try {
+            dbBinlog = dbBinlogMap.get(dbId);
+        } finally {
+            lock.readLock().unlock();
+        }
+
+        if (dbBinlog == null) {
+            LOG.warn("db binlog not found. dbId: {}", dbId);
+            return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_DB), -1L);
+        }
+        return dbBinlog.lockBinlog(tableId, jobUniqueId, lockCommitSeq);
+    }
+
     // get the dropped partitions of the db.
     public List<Long> getDroppedPartitions(long dbId) {
         lock.readLock().lock();
@@ -632,7 +651,6 @@ public class BinlogManager {
         }
     }
 
-
     private static void writeTBinlogToStream(DataOutputStream dos, TBinlog 
binlog) throws TException, IOException {
         TMemoryBuffer buffer = new TMemoryBuffer(BUFFER_SIZE);
         TBinaryProtocol protocol = new TBinaryProtocol(buffer);
@@ -642,7 +660,6 @@ public class BinlogManager {
         dos.write(data);
     }
 
-
     // not thread safety, do this without lock
     public long write(DataOutputStream dos, long checksum) throws IOException {
         if (!Config.enable_feature_binlog) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index b5e8a48df84..2fbee550c91 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -75,6 +75,10 @@ public class DBBinlog {
 
     private BinlogConfigCache binlogConfigCache;
 
+    // The binlogs that are locked by the syncer.
+    // syncer id => commit seq
+    private Map<String, Long> lockedBinlogs;
+
     public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
         lock = new ReentrantReadWriteLock();
         this.dbId = binlog.getDbId();
@@ -89,6 +93,7 @@ public class DBBinlog {
         droppedPartitions = Lists.newArrayList();
         droppedTables = Lists.newArrayList();
         droppedIndexes = Lists.newArrayList();
+        lockedBinlogs = Maps.newHashMap();
 
         TBinlog dummy;
         if (binlog.getType() == TBinlogType.DUMMY) {
@@ -281,6 +286,55 @@ public class DBBinlog {
         }
     }
 
+    public Pair<TStatus, Long> lockBinlog(long tableId, String jobUniqueId, 
long lockCommitSeq) {
+        TableBinlog tableBinlog = null;
+
+        lock.writeLock().lock();
+        try {
+            if (tableId < 0) {
+                return lockDbBinlog(jobUniqueId, lockCommitSeq);
+            }
+
+            tableBinlog = tableBinlogMap.get(tableId);
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        if (tableBinlog == null) {
+            LOG.warn("table binlog not found. dbId: {}, tableId: {}", dbId, 
tableId);
+            return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_TABLE), 
-1L);
+        }
+        return tableBinlog.lockBinlog(jobUniqueId, lockCommitSeq);
+    }
+
+    // Require: the write lock is held by the caller.
+    private Pair<TStatus, Long> lockDbBinlog(String jobUniqueId, long 
lockCommitSeq) {
+        TBinlog firstBinlog = allBinlogs.first();
+        TBinlog lastBinlog = allBinlogs.last();
+
+        if (lockCommitSeq < 0) {
+            // lock the latest binlog
+            lockCommitSeq = lastBinlog.getCommitSeq();
+        } else if (lockCommitSeq < firstBinlog.getCommitSeq()) {
+            // lock the first binlog
+            lockCommitSeq = firstBinlog.getCommitSeq();
+        } else if (lastBinlog.getCommitSeq() < lockCommitSeq) {
+            LOG.warn("try lock future binlogs, dbId: {}, lockCommitSeq: {}, 
lastCommitSeq: {}, jobId: {}",
+                    dbId, lockCommitSeq, lastBinlog.getCommitSeq(), 
jobUniqueId);
+            return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), 
-1L);
+        }
+
+        // keep idempotent
+        Long commitSeq = lockedBinlogs.get(jobUniqueId);
+        if (commitSeq != null && lockCommitSeq <= commitSeq) {
+            LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}", 
commitSeq, jobUniqueId, dbId);
+            return Pair.of(new TStatus(TStatusCode.OK), commitSeq);
+        }
+
+        lockedBinlogs.put(jobUniqueId, lockCommitSeq);
+        return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
+    }
+
     public BinlogTombstone gc() {
         // check db
         BinlogConfig dbBinlogConfig = 
binlogConfigCache.getDBBinlogConfig(dbId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 755a0bfd171..718fce49ee1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -26,8 +26,10 @@ import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -36,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -54,6 +57,10 @@ public class TableBinlog {
 
     private BinlogConfigCache binlogConfigCache;
 
+    // The binlogs that are locked by the syncer.
+    // syncer id => commit seq
+    private Map<String, Long> lockedBinlogs;
+
     public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, 
long dbId, long tableId) {
         this.dbId = dbId;
         this.tableId = tableId;
@@ -61,6 +68,7 @@ public class TableBinlog {
         lock = new ReentrantReadWriteLock();
         binlogs = 
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
         timestamps = Lists.newArrayList();
+        lockedBinlogs = Maps.newHashMap();
 
         TBinlog dummy;
         if (binlog.getType() == TBinlogType.DUMMY) {
@@ -124,6 +132,45 @@ public class TableBinlog {
         }
     }
 
+    public Pair<TStatus, Long> lockBinlog(String jobUniqueId, long 
lockCommitSeq) {
+        lock.writeLock().lock();
+        try {
+            return lockTableBinlog(jobUniqueId, lockCommitSeq);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    // Require: the lock is held by the caller.
+    private Pair<TStatus, Long> lockTableBinlog(String jobUniqueId, long 
lockCommitSeq) {
+        TBinlog firstBinlog = binlogs.first();
+        TBinlog lastBinlog = binlogs.last();
+
+        if (lockCommitSeq < 0) {
+            // lock the latest binlog
+            lockCommitSeq = lastBinlog.getCommitSeq();
+        } else if (lockCommitSeq < firstBinlog.getCommitSeq()) {
+            // lock the first binlog
+            lockCommitSeq = firstBinlog.getCommitSeq();
+        } else if (lastBinlog.getCommitSeq() < lockCommitSeq) {
+            LOG.warn(
+                    "try lock future binlogs, dbId: {}, tableId: {}, 
lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}",
+                    dbId, tableId, lockCommitSeq, lastBinlog.getCommitSeq(), 
jobUniqueId);
+            return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), 
-1L);
+        }
+
+        // keep idempotent
+        Long commitSeq = lockedBinlogs.get(jobUniqueId);
+        if (commitSeq != null && lockCommitSeq <= commitSeq) {
+            LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}, 
tableId: {}",
+                    commitSeq, jobUniqueId, dbId, tableId);
+            return Pair.of(new TStatus(TStatusCode.OK), commitSeq);
+        }
+
+        lockedBinlogs.put(jobUniqueId, lockCommitSeq);
+        return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
+    }
+
     private Pair<TBinlog, Long> 
getLastUpsertAndLargestCommitSeq(BinlogComparator checker) {
         if (binlogs.size() <= 1) {
             return null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 6e744ca5621..5b288792adc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -195,6 +195,8 @@ import org.apache.doris.thrift.TLoadTxnCommitRequest;
 import org.apache.doris.thrift.TLoadTxnCommitResult;
 import org.apache.doris.thrift.TLoadTxnRollbackRequest;
 import org.apache.doris.thrift.TLoadTxnRollbackResult;
+import org.apache.doris.thrift.TLockBinlogRequest;
+import org.apache.doris.thrift.TLockBinlogResult;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TMasterResult;
@@ -3283,7 +3285,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws 
TException {
         String clientAddr = getClientAddrAsString();
         if (LOG.isDebugEnabled()) {
-            LOG.debug("receive get binlog request: {}", request);
+            LOG.debug("receive get binlog lag request: {}", request);
         }
 
         TGetBinlogLagResult result = new TGetBinlogLagResult();
@@ -3294,14 +3296,14 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             status.setStatusCode(TStatusCode.NOT_MASTER);
             status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
             result.setMasterAddress(getMasterAddress());
-            LOG.error("failed to get beginTxn: {}", NOT_MASTER_ERR_MSG);
+            LOG.error("failed to get binlog lag: {}", NOT_MASTER_ERR_MSG);
             return result;
         }
 
         try {
             result = getBinlogLagImpl(request, clientAddr);
         } catch (UserException e) {
-            LOG.warn("failed to get binlog: {}", e.getMessage());
+            LOG.warn("failed to get binlog lag: {}", e.getMessage());
             status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
             status.addToErrorMsgs(e.getMessage());
         } catch (Throwable e) {
@@ -3381,6 +3383,102 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
+    public TLockBinlogResult lockBinlog(TLockBinlogRequest request) throws 
TException {
+        String clientAddr = getClientAddrAsString();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("receive lock binlog request: {}", request);
+        }
+
+        TLockBinlogResult result = new TLockBinlogResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+
+        if (!Env.getCurrentEnv().isMaster()) {
+            status.setStatusCode(TStatusCode.NOT_MASTER);
+            status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
+            result.setMasterAddress(getMasterAddress());
+            LOG.error("failed to lock binlog: {}", NOT_MASTER_ERR_MSG);
+            return result;
+        }
+
+        try {
+            result = lockBinlogImpl(request, clientAddr);
+        } catch (UserException e) {
+            LOG.warn("failed to lock binlog: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;
+        }
+
+        return result;
+    }
+
+    private TLockBinlogResult lockBinlogImpl(TLockBinlogRequest request, 
String clientIp) throws UserException {
+        /// Check all required arg: user, passwd, db, prev_commit_seq
+        if (!request.isSetUser()) {
+            throw new UserException("user is not set");
+        }
+        if (!request.isSetPasswd()) {
+            throw new UserException("passwd is not set");
+        }
+        if (!request.isSetDb()) {
+            throw new UserException("db is not set");
+        }
+        if (!request.isSetJobUniqueId()) {
+            throw new UserException("job_unique_id is not set");
+        }
+
+        // step 1: check auth
+        if (Strings.isNullOrEmpty(request.getToken())) {
+            checkSingleTablePasswordAndPrivs(request.getUser(), 
request.getPasswd(), request.getDb(),
+                    request.getTable(), clientIp, PrivPredicate.SELECT);
+        }
+
+        // step 3: check database
+        Env env = Env.getCurrentEnv();
+        String fullDbName = request.getDb();
+        Database db = env.getInternalCatalog().getDbNullable(fullDbName);
+        if (db == null) {
+            String dbName = fullDbName;
+            if (Strings.isNullOrEmpty(request.getCluster())) {
+                dbName = request.getDb();
+            }
+            throw new UserException("unknown database, database=" + dbName);
+        }
+
+        // step 4: fetch all tableIds
+        // lookup tables && convert into tableIdList
+        long tableId = -1;
+        if (request.isSetTableId()) {
+            tableId = request.getTableId();
+        } else if (request.isSetTable()) {
+            String tableName = request.getTable();
+            Table table = db.getTableOrMetaException(tableName, 
TableType.OLAP);
+            if (table == null) {
+                throw new UserException("unknown table, table=" + tableName);
+            }
+            tableId = table.getId();
+        }
+
+        // step 6: lock binlog
+        long dbId = db.getId();
+        String jobUniqueId = request.getJobUniqueId();
+        long lockCommitSeq = -1L;
+        if (request.isSetLockCommitSeq()) {
+            lockCommitSeq = request.getLockCommitSeq();
+        }
+        Pair<TStatus, Long> statusSeqPair = env.getBinlogManager().lockBinlog(
+                dbId, tableId, jobUniqueId, lockCommitSeq);
+        TLockBinlogResult result = new TLockBinlogResult();
+        result.setStatus(statusSeqPair.first);
+        result.setLockedCommitSeq(statusSeqPair.second);
+        return result;
+    }
+
     @Override
     public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) 
throws TException {
         StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, 
StatisticsCacheKey.class);
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 878eb4104e0..8f79b2b98ac 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1162,6 +1162,29 @@ struct TQueryStatsResult {
     5: optional map<i64, i64> tablet_stats
 }
 
+// Lock the binlogs, to avoid being GC during sync.
+//
+// The caller should lock the binlog before backup, and bumps lock commit seq 
intervally.
+//
+// The locked binlogs will be kept until the binlog properties ttl_seconds, 
max_bytes ... are reached.
+struct TLockBinlogRequest {
+    1: optional string cluster
+    2: optional string user
+    3: optional string passwd
+    4: optional string db
+    5: optional string table
+    6: optional i64 table_id
+    7: optional string token
+    8: optional string job_unique_id
+    9: optional i64 lock_commit_seq // if not set, lock the latest binlog
+}
+
+struct TLockBinlogResult {
+    1: optional Status.TStatus status
+    2: optional i64 locked_commit_seq
+    3: optional Types.TNetworkAddress master_address
+}
+
 struct TGetBinlogRequest {
     1: optional string cluster
     2: optional string user
@@ -1750,6 +1773,7 @@ service FrontendService {
     TGetBinlogResult getBinlog(1: TGetBinlogRequest request)
     TGetSnapshotResult getSnapshot(1: TGetSnapshotRequest request)
     TRestoreSnapshotResult restoreSnapshot(1: TRestoreSnapshotRequest request)
+    TLockBinlogResult lockBinlog(1: TLockBinlogRequest request)
 
     TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest 
request)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to