This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 319ac5baccfd3679047d9c4a1d0d9b5755561099 Author: Jack Drogon <jack.xsuper...@gmail.com> AuthorDate: Sat Jul 8 07:41:45 2023 +0800 [Feature] (binlog) Add getBinlogLag (#21637) Signed-off-by: Jack Drogon <jack.xsuper...@gmail.com> --- .../org/apache/doris/binlog/BinlogManager.java | 22 ++++- .../java/org/apache/doris/binlog/BinlogUtils.java | 29 ++++++- .../java/org/apache/doris/binlog/DBBinlog.java | 25 +++++- .../java/org/apache/doris/binlog/TableBinlog.java | 13 ++- .../apache/doris/service/FrontendServiceImpl.java | 96 ++++++++++++++++++++++ gensrc/thrift/FrontendService.thrift | 9 ++ 6 files changed, 183 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index bccc5dfc48..822f045da8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -141,7 +141,7 @@ public class BinlogManager { } // get binlog by dbId, return first binlog.version > version - public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long commitSeq) { + public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); lock.readLock().lock(); try { @@ -152,7 +152,25 @@ public class BinlogManager { return Pair.of(status, null); } - return dbBinlog.getBinlog(tableId, commitSeq); + return dbBinlog.getBinlog(tableId, prevCommitSeq); + } finally { + lock.readLock().unlock(); + } + } + + // get binlog by dbId, return first binlog.version > version + public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long prevCommitSeq) { + TStatus status = new TStatus(TStatusCode.OK); + lock.readLock().lock(); + try { + DBBinlog dbBinlog = dbBinlogMap.get(dbId); + if (dbBinlog == null) { + status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_DB); + LOG.warn("dbBinlog not found. dbId: {}", dbId); + return Pair.of(status, null); + } + + return dbBinlog.getBinlogLag(tableId, prevCommitSeq); } finally { lock.readLock().unlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java index 226fac2666..9742bed23d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java @@ -25,22 +25,22 @@ import org.apache.doris.thrift.TStatusCode; import java.util.TreeSet; public class BinlogUtils { - public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, long commitSeq) { + public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); TBinlog firstBinlog = binlogs.first(); // all commitSeq > commitSeq - if (firstBinlog.getCommitSeq() > commitSeq) { + if (firstBinlog.getCommitSeq() > prevCommitSeq) { status.setStatusCode(TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ); return Pair.of(status, firstBinlog); } // find first binlog whose commitSeq > commitSeq TBinlog guard = new TBinlog(); - guard.setCommitSeq(commitSeq); + guard.setCommitSeq(prevCommitSeq); TBinlog binlog = binlogs.higher(guard); - // all commitSeq <= commitSeq + // all commitSeq <= prevCommitSeq if (binlog == null) { status.setStatusCode(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ); return Pair.of(status, null); @@ -48,4 +48,25 @@ public class BinlogUtils { return Pair.of(status, binlog); } } + + public static Pair<TStatus, Long> getBinlogLag(TreeSet<TBinlog> binlogs, long prevCommitSeq) { + TStatus status = new TStatus(TStatusCode.OK); + TBinlog firstBinlog = binlogs.first(); + + if (firstBinlog.getCommitSeq() > prevCommitSeq) { + return Pair.of(status, Long.valueOf(binlogs.size())); + } + + // find first binlog whose commitSeq > commitSeq + TBinlog guard = new TBinlog(); + guard.setCommitSeq(prevCommitSeq); + TBinlog binlog = binlogs.higher(guard); + + // all prevCommitSeq <= commitSeq + if (binlog == null) { + return Pair.of(status, 0L); + } else { + return Pair.of(status, Long.valueOf(binlogs.tailSet(binlog).size())); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index d6408b3076..48c20becaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -100,7 +100,7 @@ public class DBBinlog { return dbId; } - public Pair<TStatus, TBinlog> getBinlog(long tableId, long commitSeq) { + public Pair<TStatus, TBinlog> getBinlog(long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); lock.readLock().lock(); try { @@ -110,10 +110,29 @@ public class DBBinlog { status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE); return Pair.of(status, null); } - return tableBinlog.getBinlog(commitSeq); + return tableBinlog.getBinlog(prevCommitSeq); } - return BinlogUtils.getBinlog(allBinlogs, commitSeq); + return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq); + } finally { + lock.readLock().unlock(); + } + } + + public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) { + TStatus status = new TStatus(TStatusCode.OK); + lock.readLock().lock(); + try { + if (tableId >= 0) { + TableBinlog tableBinlog = tableBinlogMap.get(tableId); + if (tableBinlog == null) { + status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE); + return Pair.of(status, null); + } + return tableBinlog.getBinlogLag(prevCommitSeq); + } + + return BinlogUtils.getBinlogLag(allBinlogs, prevCommitSeq); } finally { lock.readLock().unlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 47c91f2a76..44545b6fb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -67,10 +67,19 @@ public class TableBinlog { } } - public Pair<TStatus, TBinlog> getBinlog(long commitSeq) { + public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) { lock.readLock().lock(); try { - return BinlogUtils.getBinlog(binlogs, commitSeq); + return BinlogUtils.getBinlog(binlogs, prevCommitSeq); + } finally { + lock.readLock().unlock(); + } + } + + public Pair<TStatus, Long> getBinlogLag(long prevCommitSeq) { + lock.readLock().lock(); + try { + return BinlogUtils.getBinlogLag(binlogs, prevCommitSeq); } finally { lock.readLock().unlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index a9f81c8ee8..0ea0f7d839 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -106,6 +106,7 @@ import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.TFrontendPingFrontendRequest; import org.apache.doris.thrift.TFrontendPingFrontendResult; import org.apache.doris.thrift.TFrontendPingFrontendStatusCode; +import org.apache.doris.thrift.TGetBinlogLagResult; import org.apache.doris.thrift.TGetBinlogRequest; import org.apache.doris.thrift.TGetBinlogResult; import org.apache.doris.thrift.TGetDbsParams; @@ -2572,4 +2573,99 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + + // getBinlogLag + public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("receive get binlog request: {}", request); + + TGetBinlogLagResult result = new TGetBinlogLagResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + try { + result = getBinlogLagImpl(request, clientAddr); + } catch (UserException e) { + LOG.warn("failed to get binlog: {}", e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; + } + + return result; + } + + private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String clientIp) throws UserException { + /// Check all required arg: user, passwd, db, prev_commit_seq + if (!request.isSetUser()) { + throw new UserException("user is not set"); + } + if (!request.isSetPasswd()) { + throw new UserException("passwd is not set"); + } + if (!request.isSetDb()) { + throw new UserException("db is not set"); + } + if (!request.isSetPrevCommitSeq()) { + throw new UserException("prev_commit_seq is not set"); + } + + + // step 1: check auth + String cluster = request.getCluster(); + if (Strings.isNullOrEmpty(cluster)) { + cluster = SystemInfoService.DEFAULT_CLUSTER; + } + if (Strings.isNullOrEmpty(request.getToken())) { + checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTable(), + request.getUserIp(), PrivPredicate.SELECT); + } + + // step 3: check database + Env env = Env.getCurrentEnv(); + String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb()); + Database db = env.getInternalCatalog().getDbNullable(fullDbName); + if (db == null) { + String dbName = fullDbName; + if (Strings.isNullOrEmpty(request.getCluster())) { + dbName = request.getDb(); + } + throw new UserException("unknown database, database=" + dbName); + } + + // step 4: fetch all tableIds + // lookup tables && convert into tableIdList + long tableId = -1; + if (request.isSetTableId()) { + tableId = request.getTableId(); + } else if (request.isSetTable()) { + String tableName = request.getTable(); + Table table = db.getTableOrMetaException(tableName, TableType.OLAP); + if (table == null) { + throw new UserException("unknown table, table=" + tableName); + } + tableId = table.getId(); + } + + // step 6: get binlog + long dbId = db.getId(); + TGetBinlogLagResult result = new TGetBinlogLagResult(); + result.setStatus(new TStatus(TStatusCode.OK)); + long prevCommitSeq = request.getPrevCommitSeq(); + + Pair<TStatus, Long> statusLagPair = env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq); + TStatus status = statusLagPair.first; + if (status != null && status.getStatusCode() != TStatusCode.OK) { + result.setStatus(status); + } + Long binlogLag = statusLagPair.second; + if (binlogLag != null) { + result.setLag(binlogLag); + } + + return result; + } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 6cbf075739..9c84ba6906 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1046,6 +1046,13 @@ struct TGetMasterTokenResult { 2: optional string token } +typedef TGetBinlogRequest TGetBinlogLagRequest + +struct TGetBinlogLagResult { + 1: optional Status.TStatus status + 2: optional i64 lag +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1108,4 +1115,6 @@ service FrontendService { TGetTabletReplicaInfosResult getTabletReplicaInfos(1: TGetTabletReplicaInfosRequest request) TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request) + + TGetBinlogLagResult getBinlogLag(1: TGetBinlogLagRequest request) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org