This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f7b73509cc [ehmancement](binlog) Add show proc '/binlog' impl (#30770)
0f7b73509cc is described below

commit 0f7b73509cc75e7a1277d006ff628a95ea4c01e1
Author: Jack Drogon <[email protected]>
AuthorDate: Fri Feb 2 21:37:44 2024 +0800

    [ehmancement](binlog) Add show proc '/binlog' impl (#30770)
    
    Signed-off-by: Jack Drogon <[email protected]>
---
 .../org/apache/doris/binlog/BinlogConfigCache.java |  4 +-
 .../org/apache/doris/binlog/BinlogManager.java     | 24 +++++++
 .../java/org/apache/doris/binlog/BinlogUtils.java  |  4 ++
 .../java/org/apache/doris/binlog/DBBinlog.java     | 64 +++++++++++++++++
 .../java/org/apache/doris/binlog/TableBinlog.java  | 82 ++++++++++++++++++++++
 .../apache/doris/common/proc/BinlogProcDir.java    | 45 ++++++++++++
 .../org/apache/doris/common/proc/ProcService.java  |  1 +
 7 files changed, 223 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
index c414b853078..30641bae8c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
@@ -97,7 +97,7 @@ public class BinlogConfigCache {
                 return null;
             }
 
-            Table table = db.getTableOrMetaException(tableId);
+            Table table = db.getTableNullable(tableId);
             if (table == null) {
                 LOG.warn("fail to get table. db: {}, table id: {}", 
db.getFullName(), tableId);
                 return null;
@@ -109,6 +109,8 @@ public class BinlogConfigCache {
 
             OlapTable olapTable = (OlapTable) table;
             tableBinlogConfig = olapTable.getBinlogConfig();
+            // get table binlog config, when table modify binlogConfig
+            // it create a new binlog, not update inplace, so we don't need to 
clone binlogConfig
             dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
             return tableBinlogConfig;
         } catch (Exception e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 8c68f908ce5..8187e966561 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -22,6 +22,8 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.proc.BaseProcResult;
+import org.apache.doris.common.proc.ProcResult;
 import org.apache.doris.persist.AlterDatabasePropertyInfo;
 import org.apache.doris.persist.BarrierLog;
 import org.apache.doris.persist.BatchModifyPartitionsInfo;
@@ -36,6 +38,7 @@ import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
@@ -54,6 +57,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class BinlogManager {
     private static final int BUFFER_SIZE = 16 * 1024;
+    private static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>().add("Name")
+            
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime")
+            
.add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime")
+            .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds")
+            .build();
 
     private static final Logger LOG = 
LogManager.getLogger(BinlogManager.class);
 
@@ -545,6 +553,22 @@ public class BinlogManager {
         return checksum;
     }
 
+    public ProcResult getBinlogInfo() {
+        BaseProcResult result = new BaseProcResult();
+        result.setNames(TITLE_NAMES);
+
+        lock.readLock().lock();
+        try {
+            for (DBBinlog dbBinlog : dbBinlogMap.values()) {
+                dbBinlog.getBinlogInfo(result);
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+
+        return result;
+    }
+
     // remove DB
     // remove Table
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 4e134104b6d..0f6c2308248 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -90,4 +90,8 @@ public class BinlogUtils {
         long expireSeconds = currentSeconds - ttlSeconds;
         return expireSeconds * 1000;
     }
+
+    public static String convertTimeToReadable(long time) {
+        return new java.text.SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss").format(new java.util.Date(time));
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 4ba1416cd5c..a3133bfb5c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -18,7 +18,10 @@
 package org.apache.doris.binlog;
 
 import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
@@ -30,6 +33,7 @@ import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -448,4 +452,64 @@ public class DBBinlog {
             lock.writeLock().unlock();
         }
     }
+
+    public void getBinlogInfo(BaseProcResult result) {
+        BinlogConfig binlogConfig = binlogConfigCache.getDBBinlogConfig(dbId);
+
+        String dbName = "(dropped)";
+        String dropped = "true";
+        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+        if (db != null) {
+            dbName = db.getFullName();
+            dropped = "false";
+        }
+
+        lock.readLock().lock();
+        try {
+            boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
+            if (dbBinlogEnable) {
+                List<String> info = new ArrayList<>();
+
+                info.add(dbName);
+                String type = "db";
+                info.add(type);
+                String id = String.valueOf(dbId);
+                info.add(id);
+                info.add(dropped);
+                String binlogLength = String.valueOf(allBinlogs.size());
+                info.add(binlogLength);
+                String firstBinlogCommittedTime = null;
+                String readableFirstBinlogCommittedTime = null;
+                if (!timestamps.isEmpty()) {
+                    long timestamp = timestamps.get(0).second;
+                    firstBinlogCommittedTime = String.valueOf(timestamp);
+                    readableFirstBinlogCommittedTime = 
BinlogUtils.convertTimeToReadable(timestamp);
+                }
+                info.add(firstBinlogCommittedTime);
+                info.add(readableFirstBinlogCommittedTime);
+                String lastBinlogCommittedTime = null;
+                String readableLastBinlogCommittedTime = null;
+                if (!timestamps.isEmpty()) {
+                    long timestamp = timestamps.get(timestamps.size() - 
1).second;
+                    lastBinlogCommittedTime = String.valueOf(timestamp);
+                    readableLastBinlogCommittedTime = 
BinlogUtils.convertTimeToReadable(timestamp);
+                }
+                info.add(lastBinlogCommittedTime);
+                info.add(readableLastBinlogCommittedTime);
+                String binlogTtlSeconds = null;
+                if (binlogConfig != null) {
+                    binlogTtlSeconds = 
String.valueOf(binlogConfig.getTtlSeconds());
+                }
+                info.add(binlogTtlSeconds);
+
+                result.addRow(info);
+            } else {
+                for (TableBinlog tableBinlog : tableBinlogMap.values()) {
+                    tableBinlog.getBinlogInfo(db, result);
+                }
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 0857ae7abb1..3dd290a07f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -18,7 +18,11 @@
 package org.apache.doris.binlog;
 
 import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
@@ -27,8 +31,10 @@ import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -233,4 +239,80 @@ public class TableBinlog {
             lock.writeLock().unlock();
         }
     }
+
+    public void getBinlogInfo(Database db, BaseProcResult result) {
+        BinlogConfig binlogConfig = 
binlogConfigCache.getTableBinlogConfig(dbId, tableId);
+
+        String tableName = null;
+        String dropped = null;
+        if (db == null) {
+            tableName = "(dropped).(unknown)";
+            dropped = "true";
+        } else {
+            String dbName = db.getFullName();
+            Table table = db.getTableNullable(tableId);
+            if (table == null) {
+                dropped = "true";
+                tableName = dbName + ".(dropped)";
+            }
+
+            dropped = "false";
+            if (table instanceof OlapTable) {
+                OlapTable olapTable = (OlapTable) table;
+                tableName = dbName + "." + olapTable.getName();
+            } else {
+                tableName = dbName + ".(not_olaptable)";
+            }
+        }
+
+        lock.readLock().lock();
+        try {
+            List<String> info = new ArrayList<>();
+
+            info.add(tableName);
+            String type = "table";
+            info.add(type);
+
+            String id = String.valueOf(tableId);
+            info.add(id);
+            info.add(dropped);
+            String binlogLength = String.valueOf(binlogs.size());
+            info.add(binlogLength);
+            String firstBinlogCommittedTime = null;
+            String readableFirstBinlogCommittedTime = null;
+            for (TBinlog binlog : binlogs) {
+                long timestamp = binlog.getTimestamp();
+                if (timestamp != -1) {
+                    firstBinlogCommittedTime = String.valueOf(timestamp);
+                    readableFirstBinlogCommittedTime = 
BinlogUtils.convertTimeToReadable(timestamp);
+                    break;
+                }
+            }
+            info.add(firstBinlogCommittedTime);
+            info.add(readableFirstBinlogCommittedTime);
+            String lastBinlogCommittedTime = null;
+            String readableLastBinlogCommittedTime = null;
+            Iterator<TBinlog> iterator = binlogs.descendingIterator();
+            while (iterator.hasNext()) {
+                TBinlog binlog = iterator.next();
+                long timestamp = binlog.getTimestamp();
+                if (timestamp != -1) {
+                    lastBinlogCommittedTime = String.valueOf(timestamp);
+                    readableLastBinlogCommittedTime = 
BinlogUtils.convertTimeToReadable(timestamp);
+                    break;
+                }
+            }
+            info.add(lastBinlogCommittedTime);
+            info.add(readableLastBinlogCommittedTime);
+            String binlogTtlSeconds = null;
+            if (binlogConfig != null) {
+                binlogTtlSeconds = 
String.valueOf(binlogConfig.getTtlSeconds());
+            }
+            info.add(binlogTtlSeconds);
+
+            result.addRow(info);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java
new file mode 100644
index 00000000000..f72ed30d2b6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import org.apache.doris.binlog.BinlogManager;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+
+
+public class BinlogProcDir implements ProcDirInterface {
+    @Override
+    public boolean register(String name, ProcNodeInterface node) {
+        return false;
+    }
+
+    @Override
+    public ProcNodeInterface lookup(String name) throws AnalysisException {
+        throw new AnalysisException("not implemented");
+    }
+
+    @Override
+    public ProcResult fetchResult() throws AnalysisException {
+        BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager();
+        if (binlogManager == null) {
+            throw new AnalysisException("binlog manager is not initialized");
+        }
+
+        return binlogManager.getBinlogInfo();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
index e54ee4d5d11..42010ccbd20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
@@ -58,6 +58,7 @@ public final class ProcService {
         root.register("colocation_group", new ColocationGroupProcDir());
         root.register("bdbje", new BDBJEProcDir());
         root.register("diagnose", new DiagnoseProcDir());
+        root.register("binlog", new BinlogProcDir());
     }
 
     // 通过指定的路径获得对应的PROC Node


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to