This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 319ac5baccfd3679047d9c4a1d0d9b5755561099
Author: Jack Drogon <jack.xsuper...@gmail.com>
AuthorDate: Sat Jul 8 07:41:45 2023 +0800

    [Feature] (binlog) Add getBinlogLag (#21637)
    
    Signed-off-by: Jack Drogon <jack.xsuper...@gmail.com>
---
 .../org/apache/doris/binlog/BinlogManager.java     | 22 ++++-
 .../java/org/apache/doris/binlog/BinlogUtils.java  | 29 ++++++-
 .../java/org/apache/doris/binlog/DBBinlog.java     | 25 +++++-
 .../java/org/apache/doris/binlog/TableBinlog.java  | 13 ++-
 .../apache/doris/service/FrontendServiceImpl.java  | 96 ++++++++++++++++++++++
 gensrc/thrift/FrontendService.thrift               |  9 ++
 6 files changed, 183 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index bccc5dfc48..822f045da8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -141,7 +141,7 @@ public class BinlogManager {
     }
 
     // get binlog by dbId, return first binlog.version > version
-    public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long 
commitSeq) {
+    public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long 
prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
         lock.readLock().lock();
         try {
@@ -152,7 +152,25 @@ public class BinlogManager {
                 return Pair.of(status, null);
             }
 
-            return dbBinlog.getBinlog(tableId, commitSeq);
+            return dbBinlog.getBinlog(tableId, prevCommitSeq);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    // get binlog by dbId, return first binlog.version > version
+    public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long 
prevCommitSeq) {
+        TStatus status = new TStatus(TStatusCode.OK);
+        lock.readLock().lock();
+        try {
+            DBBinlog dbBinlog = dbBinlogMap.get(dbId);
+            if (dbBinlog == null) {
+                status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_DB);
+                LOG.warn("dbBinlog not found. dbId: {}", dbId);
+                return Pair.of(status, null);
+            }
+
+            return dbBinlog.getBinlogLag(tableId, prevCommitSeq);
         } finally {
             lock.readLock().unlock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 226fac2666..9742bed23d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -25,22 +25,22 @@ import org.apache.doris.thrift.TStatusCode;
 import java.util.TreeSet;
 
 public class BinlogUtils {
-    public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, 
long commitSeq) {
+    public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, 
long prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
         TBinlog firstBinlog = binlogs.first();
 
         // all commitSeq > commitSeq
-        if (firstBinlog.getCommitSeq() > commitSeq) {
+        if (firstBinlog.getCommitSeq() > prevCommitSeq) {
             status.setStatusCode(TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ);
             return Pair.of(status, firstBinlog);
         }
 
         // find first binlog whose commitSeq > commitSeq
         TBinlog guard = new TBinlog();
-        guard.setCommitSeq(commitSeq);
+        guard.setCommitSeq(prevCommitSeq);
         TBinlog binlog = binlogs.higher(guard);
 
-        // all commitSeq <= commitSeq
+        // all commitSeq <= prevCommitSeq
         if (binlog == null) {
             status.setStatusCode(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ);
             return Pair.of(status, null);
@@ -48,4 +48,25 @@ public class BinlogUtils {
             return Pair.of(status, binlog);
         }
     }
+
+    public static Pair<TStatus, Long> getBinlogLag(TreeSet<TBinlog> binlogs, 
long prevCommitSeq) {
+        TStatus status = new TStatus(TStatusCode.OK);
+        TBinlog firstBinlog = binlogs.first();
+
+        if (firstBinlog.getCommitSeq() > prevCommitSeq) {
+            return Pair.of(status, Long.valueOf(binlogs.size()));
+        }
+
+        // find first binlog whose commitSeq > commitSeq
+        TBinlog guard = new TBinlog();
+        guard.setCommitSeq(prevCommitSeq);
+        TBinlog binlog = binlogs.higher(guard);
+
+        // all prevCommitSeq <= commitSeq
+        if (binlog == null) {
+            return Pair.of(status, 0L);
+        } else {
+            return Pair.of(status, 
Long.valueOf(binlogs.tailSet(binlog).size()));
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index d6408b3076..48c20becaf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -100,7 +100,7 @@ public class DBBinlog {
         return dbId;
     }
 
-    public Pair<TStatus, TBinlog> getBinlog(long tableId, long commitSeq) {
+    public Pair<TStatus, TBinlog> getBinlog(long tableId, long prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
         lock.readLock().lock();
         try {
@@ -110,10 +110,29 @@ public class DBBinlog {
                     status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
                     return Pair.of(status, null);
                 }
-                return tableBinlog.getBinlog(commitSeq);
+                return tableBinlog.getBinlog(prevCommitSeq);
             }
 
-            return BinlogUtils.getBinlog(allBinlogs, commitSeq);
+            return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
+        TStatus status = new TStatus(TStatusCode.OK);
+        lock.readLock().lock();
+        try {
+            if (tableId >= 0) {
+                TableBinlog tableBinlog = tableBinlogMap.get(tableId);
+                if (tableBinlog == null) {
+                    status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
+                    return Pair.of(status, null);
+                }
+                return tableBinlog.getBinlogLag(prevCommitSeq);
+            }
+
+            return BinlogUtils.getBinlogLag(allBinlogs, prevCommitSeq);
         } finally {
             lock.readLock().unlock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 47c91f2a76..44545b6fb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -67,10 +67,19 @@ public class TableBinlog {
         }
     }
 
-    public Pair<TStatus, TBinlog> getBinlog(long commitSeq) {
+    public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) {
         lock.readLock().lock();
         try {
-            return BinlogUtils.getBinlog(binlogs, commitSeq);
+            return BinlogUtils.getBinlog(binlogs, prevCommitSeq);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public Pair<TStatus, Long> getBinlogLag(long prevCommitSeq) {
+        lock.readLock().lock();
+        try {
+            return BinlogUtils.getBinlogLag(binlogs, prevCommitSeq);
         } finally {
             lock.readLock().unlock();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index a9f81c8ee8..0ea0f7d839 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -106,6 +106,7 @@ import org.apache.doris.thrift.TFinishTaskRequest;
 import org.apache.doris.thrift.TFrontendPingFrontendRequest;
 import org.apache.doris.thrift.TFrontendPingFrontendResult;
 import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
+import org.apache.doris.thrift.TGetBinlogLagResult;
 import org.apache.doris.thrift.TGetBinlogRequest;
 import org.apache.doris.thrift.TGetBinlogResult;
 import org.apache.doris.thrift.TGetDbsParams;
@@ -2572,4 +2573,99 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
         return result;
     }
+
+    // getBinlogLag
+    public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws 
TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.debug("receive get binlog request: {}", request);
+
+        TGetBinlogLagResult result = new TGetBinlogLagResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            result = getBinlogLagImpl(request, clientAddr);
+        } catch (UserException e) {
+            LOG.warn("failed to get binlog: {}", e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;
+        }
+
+        return result;
+    }
+
+    private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, 
String clientIp) throws UserException {
+        /// Check all required arg: user, passwd, db, prev_commit_seq
+        if (!request.isSetUser()) {
+            throw new UserException("user is not set");
+        }
+        if (!request.isSetPasswd()) {
+            throw new UserException("passwd is not set");
+        }
+        if (!request.isSetDb()) {
+            throw new UserException("db is not set");
+        }
+        if (!request.isSetPrevCommitSeq()) {
+            throw new UserException("prev_commit_seq is not set");
+        }
+
+
+        // step 1: check auth
+        String cluster = request.getCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = SystemInfoService.DEFAULT_CLUSTER;
+        }
+        if (Strings.isNullOrEmpty(request.getToken())) {
+            checkPasswordAndPrivs(cluster, request.getUser(), 
request.getPasswd(), request.getDb(), request.getTable(),
+                    request.getUserIp(), PrivPredicate.SELECT);
+        }
+
+        // step 3: check database
+        Env env = Env.getCurrentEnv();
+        String fullDbName = ClusterNamespace.getFullName(cluster, 
request.getDb());
+        Database db = env.getInternalCatalog().getDbNullable(fullDbName);
+        if (db == null) {
+            String dbName = fullDbName;
+            if (Strings.isNullOrEmpty(request.getCluster())) {
+                dbName = request.getDb();
+            }
+            throw new UserException("unknown database, database=" + dbName);
+        }
+
+        // step 4: fetch all tableIds
+        // lookup tables && convert into tableIdList
+        long tableId = -1;
+        if (request.isSetTableId()) {
+            tableId = request.getTableId();
+        } else if (request.isSetTable()) {
+            String tableName = request.getTable();
+            Table table = db.getTableOrMetaException(tableName, 
TableType.OLAP);
+            if (table == null) {
+                throw new UserException("unknown table, table=" + tableName);
+            }
+            tableId = table.getId();
+        }
+
+        // step 6: get binlog
+        long dbId = db.getId();
+        TGetBinlogLagResult result = new TGetBinlogLagResult();
+        result.setStatus(new TStatus(TStatusCode.OK));
+        long prevCommitSeq = request.getPrevCommitSeq();
+
+        Pair<TStatus, Long> statusLagPair = 
env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq);
+        TStatus status = statusLagPair.first;
+        if (status != null && status.getStatusCode() != TStatusCode.OK) {
+            result.setStatus(status);
+        }
+        Long binlogLag = statusLagPair.second;
+        if (binlogLag != null) {
+            result.setLag(binlogLag);
+        }
+
+        return result;
+    }
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 6cbf075739..9c84ba6906 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1046,6 +1046,13 @@ struct TGetMasterTokenResult {
     2: optional string token
 }
 
+typedef TGetBinlogRequest TGetBinlogLagRequest
+
+struct TGetBinlogLagResult {
+    1: optional Status.TStatus status
+    2: optional i64 lag
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1108,4 +1115,6 @@ service FrontendService {
     TGetTabletReplicaInfosResult getTabletReplicaInfos(1: 
TGetTabletReplicaInfosRequest request)
 
     TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request)
+
+    TGetBinlogLagResult getBinlogLag(1: TGetBinlogLagRequest request)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to