This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0f7b73509cc [ehmancement](binlog) Add show proc '/binlog' impl (#30770)
0f7b73509cc is described below
commit 0f7b73509cc75e7a1277d006ff628a95ea4c01e1
Author: Jack Drogon <[email protected]>
AuthorDate: Fri Feb 2 21:37:44 2024 +0800
[ehmancement](binlog) Add show proc '/binlog' impl (#30770)
Signed-off-by: Jack Drogon <[email protected]>
---
.../org/apache/doris/binlog/BinlogConfigCache.java | 4 +-
.../org/apache/doris/binlog/BinlogManager.java | 24 +++++++
.../java/org/apache/doris/binlog/BinlogUtils.java | 4 ++
.../java/org/apache/doris/binlog/DBBinlog.java | 64 +++++++++++++++++
.../java/org/apache/doris/binlog/TableBinlog.java | 82 ++++++++++++++++++++++
.../apache/doris/common/proc/BinlogProcDir.java | 45 ++++++++++++
.../org/apache/doris/common/proc/ProcService.java | 1 +
7 files changed, 223 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
index c414b853078..30641bae8c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
@@ -97,7 +97,7 @@ public class BinlogConfigCache {
return null;
}
- Table table = db.getTableOrMetaException(tableId);
+ Table table = db.getTableNullable(tableId);
if (table == null) {
LOG.warn("fail to get table. db: {}, table id: {}",
db.getFullName(), tableId);
return null;
@@ -109,6 +109,8 @@ public class BinlogConfigCache {
OlapTable olapTable = (OlapTable) table;
tableBinlogConfig = olapTable.getBinlogConfig();
+ // get table binlog config, when table modify binlogConfig
+ // it create a new binlog, not update inplace, so we don't need to
clone binlogConfig
dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
return tableBinlogConfig;
} catch (Exception e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 8c68f908ce5..8187e966561 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -22,6 +22,8 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.proc.BaseProcResult;
+import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
@@ -36,6 +38,7 @@ import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
@@ -54,6 +57,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class BinlogManager {
private static final int BUFFER_SIZE = 16 * 1024;
+ private static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>().add("Name")
+
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime")
+
.add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime")
+ .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds")
+ .build();
private static final Logger LOG =
LogManager.getLogger(BinlogManager.class);
@@ -545,6 +553,22 @@ public class BinlogManager {
return checksum;
}
+ public ProcResult getBinlogInfo() {
+ BaseProcResult result = new BaseProcResult();
+ result.setNames(TITLE_NAMES);
+
+ lock.readLock().lock();
+ try {
+ for (DBBinlog dbBinlog : dbBinlogMap.values()) {
+ dbBinlog.getBinlogInfo(result);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ return result;
+ }
+
// remove DB
// remove Table
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 4e134104b6d..0f6c2308248 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -90,4 +90,8 @@ public class BinlogUtils {
long expireSeconds = currentSeconds - ttlSeconds;
return expireSeconds * 1000;
}
+
+ public static String convertTimeToReadable(long time) {
+ return new java.text.SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").format(new java.util.Date(time));
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 4ba1416cd5c..a3133bfb5c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -18,7 +18,10 @@
package org.apache.doris.binlog;
import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
@@ -30,6 +33,7 @@ import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -448,4 +452,64 @@ public class DBBinlog {
lock.writeLock().unlock();
}
}
+
+ public void getBinlogInfo(BaseProcResult result) {
+ BinlogConfig binlogConfig = binlogConfigCache.getDBBinlogConfig(dbId);
+
+ String dbName = "(dropped)";
+ String dropped = "true";
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db != null) {
+ dbName = db.getFullName();
+ dropped = "false";
+ }
+
+ lock.readLock().lock();
+ try {
+ boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
+ if (dbBinlogEnable) {
+ List<String> info = new ArrayList<>();
+
+ info.add(dbName);
+ String type = "db";
+ info.add(type);
+ String id = String.valueOf(dbId);
+ info.add(id);
+ info.add(dropped);
+ String binlogLength = String.valueOf(allBinlogs.size());
+ info.add(binlogLength);
+ String firstBinlogCommittedTime = null;
+ String readableFirstBinlogCommittedTime = null;
+ if (!timestamps.isEmpty()) {
+ long timestamp = timestamps.get(0).second;
+ firstBinlogCommittedTime = String.valueOf(timestamp);
+ readableFirstBinlogCommittedTime =
BinlogUtils.convertTimeToReadable(timestamp);
+ }
+ info.add(firstBinlogCommittedTime);
+ info.add(readableFirstBinlogCommittedTime);
+ String lastBinlogCommittedTime = null;
+ String readableLastBinlogCommittedTime = null;
+ if (!timestamps.isEmpty()) {
+ long timestamp = timestamps.get(timestamps.size() -
1).second;
+ lastBinlogCommittedTime = String.valueOf(timestamp);
+ readableLastBinlogCommittedTime =
BinlogUtils.convertTimeToReadable(timestamp);
+ }
+ info.add(lastBinlogCommittedTime);
+ info.add(readableLastBinlogCommittedTime);
+ String binlogTtlSeconds = null;
+ if (binlogConfig != null) {
+ binlogTtlSeconds =
String.valueOf(binlogConfig.getTtlSeconds());
+ }
+ info.add(binlogTtlSeconds);
+
+ result.addRow(info);
+ } else {
+ for (TableBinlog tableBinlog : tableBinlogMap.values()) {
+ tableBinlog.getBinlogInfo(db, result);
+ }
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 0857ae7abb1..3dd290a07f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -18,7 +18,11 @@
package org.apache.doris.binlog;
import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
@@ -27,8 +31,10 @@ import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -233,4 +239,80 @@ public class TableBinlog {
lock.writeLock().unlock();
}
}
+
+ public void getBinlogInfo(Database db, BaseProcResult result) {
+ BinlogConfig binlogConfig =
binlogConfigCache.getTableBinlogConfig(dbId, tableId);
+
+ String tableName = null;
+ String dropped = null;
+ if (db == null) {
+ tableName = "(dropped).(unknown)";
+ dropped = "true";
+ } else {
+ String dbName = db.getFullName();
+ Table table = db.getTableNullable(tableId);
+ if (table == null) {
+ dropped = "true";
+ tableName = dbName + ".(dropped)";
+ }
+
+ dropped = "false";
+ if (table instanceof OlapTable) {
+ OlapTable olapTable = (OlapTable) table;
+ tableName = dbName + "." + olapTable.getName();
+ } else {
+ tableName = dbName + ".(not_olaptable)";
+ }
+ }
+
+ lock.readLock().lock();
+ try {
+ List<String> info = new ArrayList<>();
+
+ info.add(tableName);
+ String type = "table";
+ info.add(type);
+
+ String id = String.valueOf(tableId);
+ info.add(id);
+ info.add(dropped);
+ String binlogLength = String.valueOf(binlogs.size());
+ info.add(binlogLength);
+ String firstBinlogCommittedTime = null;
+ String readableFirstBinlogCommittedTime = null;
+ for (TBinlog binlog : binlogs) {
+ long timestamp = binlog.getTimestamp();
+ if (timestamp != -1) {
+ firstBinlogCommittedTime = String.valueOf(timestamp);
+ readableFirstBinlogCommittedTime =
BinlogUtils.convertTimeToReadable(timestamp);
+ break;
+ }
+ }
+ info.add(firstBinlogCommittedTime);
+ info.add(readableFirstBinlogCommittedTime);
+ String lastBinlogCommittedTime = null;
+ String readableLastBinlogCommittedTime = null;
+ Iterator<TBinlog> iterator = binlogs.descendingIterator();
+ while (iterator.hasNext()) {
+ TBinlog binlog = iterator.next();
+ long timestamp = binlog.getTimestamp();
+ if (timestamp != -1) {
+ lastBinlogCommittedTime = String.valueOf(timestamp);
+ readableLastBinlogCommittedTime =
BinlogUtils.convertTimeToReadable(timestamp);
+ break;
+ }
+ }
+ info.add(lastBinlogCommittedTime);
+ info.add(readableLastBinlogCommittedTime);
+ String binlogTtlSeconds = null;
+ if (binlogConfig != null) {
+ binlogTtlSeconds =
String.valueOf(binlogConfig.getTtlSeconds());
+ }
+ info.add(binlogTtlSeconds);
+
+ result.addRow(info);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java
new file mode 100644
index 00000000000..f72ed30d2b6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import org.apache.doris.binlog.BinlogManager;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+
+
+public class BinlogProcDir implements ProcDirInterface {
+ @Override
+ public boolean register(String name, ProcNodeInterface node) {
+ return false;
+ }
+
+ @Override
+ public ProcNodeInterface lookup(String name) throws AnalysisException {
+ throw new AnalysisException("not implemented");
+ }
+
+ @Override
+ public ProcResult fetchResult() throws AnalysisException {
+ BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager();
+ if (binlogManager == null) {
+ throw new AnalysisException("binlog manager is not initialized");
+ }
+
+ return binlogManager.getBinlogInfo();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
index e54ee4d5d11..42010ccbd20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java
@@ -58,6 +58,7 @@ public final class ProcService {
root.register("colocation_group", new ColocationGroupProcDir());
root.register("bdbje", new BDBJEProcDir());
root.register("diagnose", new DiagnoseProcDir());
+ root.register("binlog", new BinlogProcDir());
}
// 通过指定的路径获得对应的PROC Node
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]