This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new d88d6f0075c branch-3.0: [feat](binlog) Add lock binlog method #46887 (#48045) d88d6f0075c is described below commit d88d6f0075c73bd1f35c178d39afb81a460ac3d0 Author: walter <maoch...@selectdb.com> AuthorDate: Wed Feb 19 19:22:22 2025 +0800 branch-3.0: [feat](binlog) Add lock binlog method #46887 (#48045) cherry pick from #46887 --- .../org/apache/doris/binlog/BinlogManager.java | 25 ++++- .../java/org/apache/doris/binlog/DBBinlog.java | 54 +++++++++++ .../java/org/apache/doris/binlog/TableBinlog.java | 47 ++++++++++ .../apache/doris/service/FrontendServiceImpl.java | 104 ++++++++++++++++++++- gensrc/thrift/FrontendService.thrift | 24 +++++ 5 files changed, 247 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 723262ff31b..b15e511e75c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -130,7 +130,7 @@ public class BinlogManager { } private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp, TBinlogType type, - String data, boolean removeEnableCache, Object raw) { + String data, boolean removeEnableCache, Object raw) { if (!Config.enable_feature_binlog) { return; } @@ -447,7 +447,6 @@ public class BinlogManager { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); } - private boolean supportedRecoverInfo(RecoverInfo info) { //table name and partitionName added together. // recover table case, tablename must exist in newer version @@ -518,6 +517,26 @@ public class BinlogManager { } } + public Pair<TStatus, Long> lockBinlog(long dbId, long tableId, + String jobUniqueId, long lockCommitSeq) { + LOG.debug("lock binlog. dbId: {}, tableId: {}, jobUniqueId: {}, lockCommitSeq: {}", + dbId, tableId, jobUniqueId, lockCommitSeq); + + DBBinlog dbBinlog = null; + lock.readLock().lock(); + try { + dbBinlog = dbBinlogMap.get(dbId); + } finally { + lock.readLock().unlock(); + } + + if (dbBinlog == null) { + LOG.warn("db binlog not found. dbId: {}", dbId); + return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_DB), -1L); + } + return dbBinlog.lockBinlog(tableId, jobUniqueId, lockCommitSeq); + } + // get the dropped partitions of the db. public List<Long> getDroppedPartitions(long dbId) { lock.readLock().lock(); @@ -632,7 +651,6 @@ public class BinlogManager { } } - private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) throws TException, IOException { TMemoryBuffer buffer = new TMemoryBuffer(BUFFER_SIZE); TBinaryProtocol protocol = new TBinaryProtocol(buffer); @@ -642,7 +660,6 @@ public class BinlogManager { dos.write(data); } - // not thread safety, do this without lock public long write(DataOutputStream dos, long checksum) throws IOException { if (!Config.enable_feature_binlog) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index b5e8a48df84..2fbee550c91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -75,6 +75,10 @@ public class DBBinlog { private BinlogConfigCache binlogConfigCache; + // The binlogs that are locked by the syncer. + // syncer id => commit seq + private Map<String, Long> lockedBinlogs; + public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) { lock = new ReentrantReadWriteLock(); this.dbId = binlog.getDbId(); @@ -89,6 +93,7 @@ public class DBBinlog { droppedPartitions = Lists.newArrayList(); droppedTables = Lists.newArrayList(); droppedIndexes = Lists.newArrayList(); + lockedBinlogs = Maps.newHashMap(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -281,6 +286,55 @@ public class DBBinlog { } } + public Pair<TStatus, Long> lockBinlog(long tableId, String jobUniqueId, long lockCommitSeq) { + TableBinlog tableBinlog = null; + + lock.writeLock().lock(); + try { + if (tableId < 0) { + return lockDbBinlog(jobUniqueId, lockCommitSeq); + } + + tableBinlog = tableBinlogMap.get(tableId); + } finally { + lock.writeLock().unlock(); + } + + if (tableBinlog == null) { + LOG.warn("table binlog not found. dbId: {}, tableId: {}", dbId, tableId); + return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_TABLE), -1L); + } + return tableBinlog.lockBinlog(jobUniqueId, lockCommitSeq); + } + + // Require: the write lock is held by the caller. + private Pair<TStatus, Long> lockDbBinlog(String jobUniqueId, long lockCommitSeq) { + TBinlog firstBinlog = allBinlogs.first(); + TBinlog lastBinlog = allBinlogs.last(); + + if (lockCommitSeq < 0) { + // lock the latest binlog + lockCommitSeq = lastBinlog.getCommitSeq(); + } else if (lockCommitSeq < firstBinlog.getCommitSeq()) { + // lock the first binlog + lockCommitSeq = firstBinlog.getCommitSeq(); + } else if (lastBinlog.getCommitSeq() < lockCommitSeq) { + LOG.warn("try lock future binlogs, dbId: {}, lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}", + dbId, lockCommitSeq, lastBinlog.getCommitSeq(), jobUniqueId); + return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), -1L); + } + + // keep idempotent + Long commitSeq = lockedBinlogs.get(jobUniqueId); + if (commitSeq != null && lockCommitSeq <= commitSeq) { + LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}", commitSeq, jobUniqueId, dbId); + return Pair.of(new TStatus(TStatusCode.OK), commitSeq); + } + + lockedBinlogs.put(jobUniqueId, lockCommitSeq); + return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq); + } + public BinlogTombstone gc() { // check db BinlogConfig dbBinlogConfig = binlogConfigCache.getDBBinlogConfig(dbId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 755a0bfd171..718fce49ee1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -26,8 +26,10 @@ import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStatusCode; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,6 +38,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -54,6 +57,10 @@ public class TableBinlog { private BinlogConfigCache binlogConfigCache; + // The binlogs that are locked by the syncer. + // syncer id => commit seq + private Map<String, Long> lockedBinlogs; + public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbId, long tableId) { this.dbId = dbId; this.tableId = tableId; @@ -61,6 +68,7 @@ public class TableBinlog { lock = new ReentrantReadWriteLock(); binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); timestamps = Lists.newArrayList(); + lockedBinlogs = Maps.newHashMap(); TBinlog dummy; if (binlog.getType() == TBinlogType.DUMMY) { @@ -124,6 +132,45 @@ public class TableBinlog { } } + public Pair<TStatus, Long> lockBinlog(String jobUniqueId, long lockCommitSeq) { + lock.writeLock().lock(); + try { + return lockTableBinlog(jobUniqueId, lockCommitSeq); + } finally { + lock.writeLock().unlock(); + } + } + + // Require: the lock is held by the caller. + private Pair<TStatus, Long> lockTableBinlog(String jobUniqueId, long lockCommitSeq) { + TBinlog firstBinlog = binlogs.first(); + TBinlog lastBinlog = binlogs.last(); + + if (lockCommitSeq < 0) { + // lock the latest binlog + lockCommitSeq = lastBinlog.getCommitSeq(); + } else if (lockCommitSeq < firstBinlog.getCommitSeq()) { + // lock the first binlog + lockCommitSeq = firstBinlog.getCommitSeq(); + } else if (lastBinlog.getCommitSeq() < lockCommitSeq) { + LOG.warn( + "try lock future binlogs, dbId: {}, tableId: {}, lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}", + dbId, tableId, lockCommitSeq, lastBinlog.getCommitSeq(), jobUniqueId); + return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), -1L); + } + + // keep idempotent + Long commitSeq = lockedBinlogs.get(jobUniqueId); + if (commitSeq != null && lockCommitSeq <= commitSeq) { + LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}, tableId: {}", + commitSeq, jobUniqueId, dbId, tableId); + return Pair.of(new TStatus(TStatusCode.OK), commitSeq); + } + + lockedBinlogs.put(jobUniqueId, lockCommitSeq); + return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq); + } + private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(BinlogComparator checker) { if (binlogs.size() <= 1) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 6e744ca5621..5b288792adc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -195,6 +195,8 @@ import org.apache.doris.thrift.TLoadTxnCommitRequest; import org.apache.doris.thrift.TLoadTxnCommitResult; import org.apache.doris.thrift.TLoadTxnRollbackRequest; import org.apache.doris.thrift.TLoadTxnRollbackResult; +import org.apache.doris.thrift.TLockBinlogRequest; +import org.apache.doris.thrift.TLockBinlogResult; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TMasterResult; @@ -3283,7 +3285,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TException { String clientAddr = getClientAddrAsString(); if (LOG.isDebugEnabled()) { - LOG.debug("receive get binlog request: {}", request); + LOG.debug("receive get binlog lag request: {}", request); } TGetBinlogLagResult result = new TGetBinlogLagResult(); @@ -3294,14 +3296,14 @@ public class FrontendServiceImpl implements FrontendService.Iface { status.setStatusCode(TStatusCode.NOT_MASTER); status.addToErrorMsgs(NOT_MASTER_ERR_MSG); result.setMasterAddress(getMasterAddress()); - LOG.error("failed to get beginTxn: {}", NOT_MASTER_ERR_MSG); + LOG.error("failed to get binlog lag: {}", NOT_MASTER_ERR_MSG); return result; } try { result = getBinlogLagImpl(request, clientAddr); } catch (UserException e) { - LOG.warn("failed to get binlog: {}", e.getMessage()); + LOG.warn("failed to get binlog lag: {}", e.getMessage()); status.setStatusCode(TStatusCode.ANALYSIS_ERROR); status.addToErrorMsgs(e.getMessage()); } catch (Throwable e) { @@ -3381,6 +3383,102 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + public TLockBinlogResult lockBinlog(TLockBinlogRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + if (LOG.isDebugEnabled()) { + LOG.debug("receive lock binlog request: {}", request); + } + + TLockBinlogResult result = new TLockBinlogResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + + if (!Env.getCurrentEnv().isMaster()) { + status.setStatusCode(TStatusCode.NOT_MASTER); + status.addToErrorMsgs(NOT_MASTER_ERR_MSG); + result.setMasterAddress(getMasterAddress()); + LOG.error("failed to lock binlog: {}", NOT_MASTER_ERR_MSG); + return result; + } + + try { + result = lockBinlogImpl(request, clientAddr); + } catch (UserException e) { + LOG.warn("failed to lock binlog: {}", e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; + } + + return result; + } + + private TLockBinlogResult lockBinlogImpl(TLockBinlogRequest request, String clientIp) throws UserException { + /// Check all required arg: user, passwd, db, prev_commit_seq + if (!request.isSetUser()) { + throw new UserException("user is not set"); + } + if (!request.isSetPasswd()) { + throw new UserException("passwd is not set"); + } + if (!request.isSetDb()) { + throw new UserException("db is not set"); + } + if (!request.isSetJobUniqueId()) { + throw new UserException("job_unique_id is not set"); + } + + // step 1: check auth + if (Strings.isNullOrEmpty(request.getToken())) { + checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), + request.getTable(), clientIp, PrivPredicate.SELECT); + } + + // step 3: check database + Env env = Env.getCurrentEnv(); + String fullDbName = request.getDb(); + Database db = env.getInternalCatalog().getDbNullable(fullDbName); + if (db == null) { + String dbName = fullDbName; + if (Strings.isNullOrEmpty(request.getCluster())) { + dbName = request.getDb(); + } + throw new UserException("unknown database, database=" + dbName); + } + + // step 4: fetch all tableIds + // lookup tables && convert into tableIdList + long tableId = -1; + if (request.isSetTableId()) { + tableId = request.getTableId(); + } else if (request.isSetTable()) { + String tableName = request.getTable(); + Table table = db.getTableOrMetaException(tableName, TableType.OLAP); + if (table == null) { + throw new UserException("unknown table, table=" + tableName); + } + tableId = table.getId(); + } + + // step 6: lock binlog + long dbId = db.getId(); + String jobUniqueId = request.getJobUniqueId(); + long lockCommitSeq = -1L; + if (request.isSetLockCommitSeq()) { + lockCommitSeq = request.getLockCommitSeq(); + } + Pair<TStatus, Long> statusSeqPair = env.getBinlogManager().lockBinlog( + dbId, tableId, jobUniqueId, lockCommitSeq); + TLockBinlogResult result = new TLockBinlogResult(); + result.setStatus(statusSeqPair.first); + result.setLockedCommitSeq(statusSeqPair.second); + return result; + } + @Override public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException { StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 878eb4104e0..8f79b2b98ac 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1162,6 +1162,29 @@ struct TQueryStatsResult { 5: optional map<i64, i64> tablet_stats } +// Lock the binlogs, to avoid being GC during sync. +// +// The caller should lock the binlog before backup, and bumps lock commit seq intervally. +// +// The locked binlogs will be kept until the binlog properties ttl_seconds, max_bytes ... are reached. +struct TLockBinlogRequest { + 1: optional string cluster + 2: optional string user + 3: optional string passwd + 4: optional string db + 5: optional string table + 6: optional i64 table_id + 7: optional string token + 8: optional string job_unique_id + 9: optional i64 lock_commit_seq // if not set, lock the latest binlog +} + +struct TLockBinlogResult { + 1: optional Status.TStatus status + 2: optional i64 locked_commit_seq + 3: optional Types.TNetworkAddress master_address +} + struct TGetBinlogRequest { 1: optional string cluster 2: optional string user @@ -1750,6 +1773,7 @@ service FrontendService { TGetBinlogResult getBinlog(1: TGetBinlogRequest request) TGetSnapshotResult getSnapshot(1: TGetSnapshotRequest request) TRestoreSnapshotResult restoreSnapshot(1: TRestoreSnapshotRequest request) + TLockBinlogResult lockBinlog(1: TLockBinlogRequest request) TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest request) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org