This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 816fd50d1d [Enhancement](binlog) Add binlog enable diable check in 
BinlogManager (#22173)
816fd50d1d is described below

commit 816fd50d1d0eb7f1af5ea02e9290388505b5b06f
Author: Jack Drogon <jack.xsuper...@gmail.com>
AuthorDate: Thu Jul 27 20:16:21 2023 +0800

    [Enhancement](binlog) Add binlog enable diable check in BinlogManager 
(#22173)
    
    Signed-off-by: Jack Drogon <jack.xsuper...@gmail.com>
---
 .../org/apache/doris/binlog/BinlogConfigCache.java | 146 +++++++++++++++++++++
 .../java/org/apache/doris/binlog/BinlogGcer.java   |   2 +-
 .../org/apache/doris/binlog/BinlogManager.java     | 102 +++++++++++---
 .../org/apache/doris/binlog/BinlogTombstone.java   |   4 +-
 .../java/org/apache/doris/binlog/BinlogUtils.java  |  36 -----
 .../java/org/apache/doris/binlog/DBBinlog.java     |  49 ++++---
 .../java/org/apache/doris/binlog/TableBinlog.java  |  37 ++----
 .../main/java/org/apache/doris/catalog/Env.java    |  21 +--
 .../apache/doris/datasource/InternalCatalog.java   |   3 +-
 .../doris/persist/AlterDatabasePropertyInfo.java   |  10 +-
 .../java/org/apache/doris/persist/EditLog.java     |  26 ++--
 .../persist/ModifyTablePropertyOperationLog.java   |  20 ++-
 .../persist/ModifyDynamicPartitionInfoTest.java    |   2 +-
 gensrc/thrift/FrontendService.thrift               |   3 +
 14 files changed, 339 insertions(+), 122 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
new file mode 100644
index 0000000000..c414b85307
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.binlog;
+
+import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class BinlogConfigCache {
+    private static final Logger LOG = 
LogManager.getLogger(BinlogConfigCache.class);
+
+    private Map<Long, BinlogConfig> dbTableBinlogEnableMap; // db or table all 
use id
+    private ReentrantReadWriteLock lock;
+
+    public BinlogConfigCache() {
+        dbTableBinlogEnableMap = new HashMap<Long, BinlogConfig>();
+        lock = new ReentrantReadWriteLock();
+    }
+
+    public BinlogConfig getDBBinlogConfig(long dbId) {
+        lock.readLock().lock();
+        BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId);
+        lock.readLock().unlock();
+        if (binlogConfig != null) {
+            return binlogConfig;
+        }
+
+        lock.writeLock().lock();
+        try {
+            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                LOG.warn("db not found. dbId: {}", dbId);
+                return null;
+            }
+
+            binlogConfig = db.getBinlogConfig();
+            dbTableBinlogEnableMap.put(dbId, binlogConfig);
+        } finally {
+            lock.writeLock().unlock();
+        }
+        return binlogConfig;
+    }
+
+    public boolean isEnableDB(long dbId) {
+        BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId);
+        if (dBinlogConfig == null) {
+            return false;
+        }
+        return dBinlogConfig.isEnable();
+    }
+
+    public long getDBTtlSeconds(long dbId) {
+        BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId);
+        if (dBinlogConfig == null) {
+            return BinlogConfig.TTL_SECONDS;
+        }
+        return dBinlogConfig.getTtlSeconds();
+    }
+
+    public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
+        lock.readLock().lock();
+        BinlogConfig tableBinlogConfig = dbTableBinlogEnableMap.get(tableId);
+        lock.readLock().unlock();
+        if (tableBinlogConfig != null) {
+            return tableBinlogConfig;
+        }
+
+        lock.writeLock().lock();
+        try {
+            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                LOG.warn("db not found. dbId: {}", dbId);
+                return null;
+            }
+
+            Table table = db.getTableOrMetaException(tableId);
+            if (table == null) {
+                LOG.warn("fail to get table. db: {}, table id: {}", 
db.getFullName(), tableId);
+                return null;
+            }
+            if (!(table instanceof OlapTable)) {
+                LOG.warn("table is not olap table. db: {}, table id: {}", 
db.getFullName(), tableId);
+                return null;
+            }
+
+            OlapTable olapTable = (OlapTable) table;
+            tableBinlogConfig = olapTable.getBinlogConfig();
+            dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
+            return tableBinlogConfig;
+        } catch (Exception e) {
+            LOG.warn("fail to get table. db: {}, table id: {}", dbId, tableId);
+            return null;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public boolean isEnableTable(long dbId, long tableId) {
+        BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
+        if (tableBinlogConfig == null) {
+            return false;
+        }
+        return tableBinlogConfig.isEnable();
+    }
+
+    public long getTableTtlSeconds(long dbId, long tableId) {
+        BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
+        if (tableBinlogConfig == null) {
+            return BinlogConfig.TTL_SECONDS;
+        }
+        return tableBinlogConfig.getTtlSeconds();
+    }
+
+    public void remove(long id) {
+        lock.writeLock().lock();
+        try {
+            dbTableBinlogEnableMap.remove(id);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
index 96d41946ff..6dbe47ea22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
@@ -41,7 +41,7 @@ import java.util.Map;
 
 public class BinlogGcer extends MasterDaemon {
     private static final Logger LOG = LogManager.getLogger(BinlogGcer.class);
-    private static final long GC_DURATION_MS = 313 * 1000L; // 313s
+    private static final long GC_DURATION_MS = 15 * 1000L; // 15s
 
     // TODO(Drogon): use this to control gc frequency by real gc time waste 
sample
     private long lastGcTime = 0L;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 11075c4fc4..43a95ed28e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -22,8 +22,10 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
+import org.apache.doris.persist.AlterDatabasePropertyInfo;
 import org.apache.doris.persist.BinlogGcInfo;
 import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.ModifyTablePropertyOperationLog;
 import org.apache.doris.persist.TableAddOrDropColumnsInfo;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
@@ -53,43 +55,63 @@ public class BinlogManager {
 
     private ReentrantReadWriteLock lock;
     private Map<Long, DBBinlog> dbBinlogMap;
+    private BinlogConfigCache binlogConfigCache;
 
     public BinlogManager() {
         lock = new ReentrantReadWriteLock();
         dbBinlogMap = Maps.newHashMap();
+        binlogConfigCache = new BinlogConfigCache();
     }
 
-    private void addBinlog(TBinlog binlog) {
-        if (!Config.enable_feature_binlog) {
+    private void afterAddBinlog(TBinlog binlog) {
+        if (!binlog.isSetRemoveEnableCache()) {
+            return;
+        }
+        if (!binlog.isRemoveEnableCache()) {
             return;
         }
 
-        // find db BinlogConfig
         long dbId = binlog.getDbId();
-        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
-        if (db == null) {
-            LOG.warn("db not found. dbId: {}", dbId);
+        boolean onlyDb = true;
+        if (binlog.isSetTableIds()) {
+            for (long tableId : binlog.getTableIds()) {
+                binlogConfigCache.remove(tableId);
+                onlyDb = false;
+            }
+        }
+        if (onlyDb) {
+            binlogConfigCache.remove(dbId);
+        }
+    }
+
+    private void addBinlog(TBinlog binlog) {
+        if (!Config.enable_feature_binlog) {
             return;
         }
-        boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
 
         DBBinlog dbBinlog;
         lock.writeLock().lock();
         try {
+            long dbId = binlog.getDbId();
             dbBinlog = dbBinlogMap.get(dbId);
+
             if (dbBinlog == null) {
-                dbBinlog = new DBBinlog(binlog);
+                dbBinlog = new DBBinlog(binlogConfigCache, binlog);
                 dbBinlogMap.put(dbId, dbBinlog);
             }
         } finally {
             lock.writeLock().unlock();
         }
 
-        dbBinlog.addBinlog(binlog, dbBinlogEnable);
+        dbBinlog.addBinlog(binlog);
     }
 
     private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, 
long timestamp, TBinlogType type,
-                           String data) {
+                           String data, boolean removeEnableCache) {
+        if (!Config.enable_feature_binlog) {
+            return;
+        }
+
         TBinlog binlog = new TBinlog();
         // set commitSeq, timestamp, type, dbId, data
         binlog.setCommitSeq(commitSeq);
@@ -101,7 +123,26 @@ public class BinlogManager {
             binlog.setTableIds(tableIds);
         }
         binlog.setTableRef(0);
-        addBinlog(binlog);
+        binlog.setRemoveEnableCache(removeEnableCache);
+
+        // Check if all db or table binlog is disable, return
+        boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
+        boolean anyEnable = dbBinlogEnable;
+        if (tableIds != null) {
+            for (long tableId : tableIds) {
+                boolean tableBinlogEnable = 
binlogConfigCache.isEnableTable(dbId, tableId);
+                anyEnable = anyEnable || tableBinlogEnable;
+                if (anyEnable) {
+                    break;
+                }
+            }
+        }
+
+        if (anyEnable) {
+            addBinlog(binlog);
+        }
+
+        afterAddBinlog(binlog);
     }
 
     public void addUpsertRecord(UpsertRecord upsertRecord) {
@@ -112,7 +153,7 @@ public class BinlogManager {
         TBinlogType type = TBinlogType.UPSERT;
         String data = upsertRecord.toJson();
 
-        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
     }
 
     public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) {
@@ -124,7 +165,7 @@ public class BinlogManager {
         TBinlogType type = TBinlogType.ADD_PARTITION;
         String data = addPartitionRecord.toJson();
 
-        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
     }
 
     public void addCreateTableRecord(CreateTableRecord createTableRecord) {
@@ -136,7 +177,7 @@ public class BinlogManager {
         TBinlogType type = TBinlogType.CREATE_TABLE;
         String data = createTableRecord.toJson();
 
-        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
     }
 
     public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, 
long commitSeq) {
@@ -147,7 +188,7 @@ public class BinlogManager {
         TBinlogType type = TBinlogType.DROP_PARTITION;
         String data = dropPartitionInfo.toJson();
 
-        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
     }
 
     public void addDropTableRecord(DropTableRecord record) {
@@ -159,7 +200,7 @@ public class BinlogManager {
         TBinlogType type = TBinlogType.DROP_TABLE;
         String data = record.toJson();
 
-        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
     }
 
     public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
@@ -170,7 +211,7 @@ public class BinlogManager {
         TBinlogType type = TBinlogType.ALTER_JOB;
         String data = alterJob.toJson();
 
-        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
     }
 
     public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, 
long commitSeq) {
@@ -181,7 +222,29 @@ public class BinlogManager {
         TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS;
         String data = info.toJson();
 
-        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
+    }
+
+    public void addAlterDatabaseProperty(AlterDatabasePropertyInfo info, long 
commitSeq) {
+        long dbId = info.getDbId();
+        List<Long> tableIds = Lists.newArrayList();
+
+        long timestamp = -1;
+        TBinlogType type = TBinlogType.ALTER_DATABASE_PROPERTY;
+        String data = info.toJson();
+
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
+    }
+
+    public void addModifyTableProperty(ModifyTablePropertyOperationLog info, 
long commitSeq) {
+        long dbId = info.getDbId();
+        List<Long> tableIds = Lists.newArrayList();
+        tableIds.add(info.getTableId());
+        long timestamp = -1;
+        TBinlogType type = TBinlogType.MODIFY_TABLE_PROPERTY;
+        String data = info.toJson();
+
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
     }
 
     // get binlog by dbId, return first binlog.version > version
@@ -383,7 +446,8 @@ public class BinlogManager {
                 if (binlog.getType() == TBinlogType.DUMMY) {
                     // collect tableDummyBinlogs and dbDummyBinlog to recover 
DBBinlog and TableBinlog
                     if (binlog.getBelong() == -1) {
-                        DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlog, 
tableDummies, currentDbBinlogEnable);
+                        DBBinlog dbBinlog = 
DBBinlog.recoverDbBinlog(binlogConfigCache, binlog, tableDummies,
+                                currentDbBinlogEnable);
                         dbBinlogMap.put(dbId, dbBinlog);
                     } else {
                         tableDummies.add(binlog);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
index 2b6e3cb8e1..48d5e04244 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
@@ -92,7 +92,7 @@ public class BinlogTombstone {
         return dbId;
     }
 
-    // TODO(deadlinefen): delete this code later
+    // TODO(deadlinefen): deprecated this code later
     public List<Long> getTableIds() {
         if (tableIds == null) {
             tableIds = Collections.emptyList();
@@ -102,7 +102,7 @@ public class BinlogTombstone {
 
     public Map<Long, Long> getTableCommitSeqMap() {
         if (tableCommitSeqMap == null) {
-            tableCommitSeqMap = Collections.emptyMap();
+            tableCommitSeqMap = Maps.newHashMap();
         }
         return tableCommitSeqMap;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 30cbfd0e15..4e134104b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -17,24 +17,15 @@
 
 package org.apache.doris.binlog;
 
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
 import org.apache.doris.common.Pair;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.util.TreeSet;
 
 public class BinlogUtils {
-    private static final Logger LOG = LogManager.getLogger(BinlogUtils.class);
-
     public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, 
long prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
         TBinlog firstBinlog = binlogs.first();
@@ -90,33 +81,6 @@ public class BinlogUtils {
         return dummy;
     }
 
-    public static boolean tableEnabledBinlog(long dbId, long tableId) {
-        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
-        if (db == null) {
-            LOG.error("db not found. dbId: {}", dbId);
-            return false;
-        }
-
-        OlapTable table;
-        try {
-            Table tbl = db.getTableOrMetaException(tableId);
-            if (tbl == null) {
-                LOG.warn("fail to get table. db: {}, table id: {}", 
db.getFullName(), tableId);
-                return false;
-            }
-            if (!(tbl instanceof OlapTable)) {
-                LOG.warn("table is not olap table. db: {}, table id: {}", 
db.getFullName(), tableId);
-                return false;
-            }
-            table = (OlapTable) tbl;
-        } catch (Exception e) {
-            LOG.warn("fail to get table. db: {}, table id: {}", 
db.getFullName(), tableId);
-            return false;
-        }
-
-        return table.getBinlogConfig().isEnable();
-    }
-
     public static long getExpiredMs(long ttlSeconds) {
         long currentSeconds = System.currentTimeMillis() / 1000;
         if (currentSeconds < ttlSeconds) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 151c5e5be9..35134eca87 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -17,8 +17,7 @@
 
 package org.apache.doris.binlog;
 
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.common.Pair;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
@@ -55,9 +54,12 @@ public class DBBinlog {
 
     private List<TBinlog> tableDummyBinlogs;
 
-    public DBBinlog(TBinlog binlog) {
+    private BinlogConfigCache binlogConfigCache;
+
+    public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
         lock = new ReentrantReadWriteLock();
         this.dbId = binlog.getDbId();
+        this.binlogConfigCache = binlogConfigCache;
 
         // allBinlogs treeset order by commitSeq
         allBinlogs = 
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
@@ -74,14 +76,16 @@ public class DBBinlog {
         allBinlogs.add(dummy);
     }
 
-    public static DBBinlog recoverDbBinlog(TBinlog dbDummy, List<TBinlog> 
tableDummies, boolean dbBinlogEnable) {
-        DBBinlog dbBinlog = new DBBinlog(dbDummy);
+    public static DBBinlog recoverDbBinlog(BinlogConfigCache 
binlogConfigCache, TBinlog dbDummy,
+                                           List<TBinlog> tableDummies, boolean 
dbBinlogEnable) {
+        DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy);
+        long dbId = dbDummy.getDbId();
         for (TBinlog tableDummy : tableDummies) {
             long tableId = tableDummy.getBelong();
-            if (!dbBinlogEnable && 
!BinlogUtils.tableEnabledBinlog(dbBinlog.getDbId(), tableId)) {
+            if (!dbBinlogEnable && !binlogConfigCache.isEnableTable(dbId, 
tableId)) {
                 continue;
             }
-            dbBinlog.tableBinlogMap.put(tableId, new TableBinlog(tableDummy, 
tableId));
+            dbBinlog.tableBinlogMap.put(tableId, new 
TableBinlog(binlogConfigCache, tableDummy, dbId, tableId));
             dbBinlog.tableDummyBinlogs.add(tableDummy);
         }
 
@@ -111,11 +115,12 @@ public class DBBinlog {
         }
     }
 
+    // TODO(Drogon): remove TableBinlog after DropTable, think table drop && 
recovery
     private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean 
dbBinlogEnable) {
         TableBinlog tableBinlog = tableBinlogMap.get(tableId);
         if (tableBinlog == null) {
-            if (dbBinlogEnable || BinlogUtils.tableEnabledBinlog(dbId, 
tableId)) {
-                tableBinlog = new TableBinlog(binlog, tableId);
+            if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId, 
tableId)) {
+                tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, 
 tableId);
                 tableBinlogMap.put(tableId, tableBinlog);
                 tableDummyBinlogs.add(tableBinlog.getDummyBinlog());
             }
@@ -123,21 +128,25 @@ public class DBBinlog {
         return tableBinlog;
     }
 
-    public void addBinlog(TBinlog binlog, boolean dbBinlogEnable) {
+    // guard by BinlogManager, if addBinlog called, more than one(db/tables) 
enable binlog
+    public void addBinlog(TBinlog binlog) {
+        boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
         List<Long> tableIds = binlog.getTableIds();
+
         lock.writeLock().lock();
         try {
+            allBinlogs.add(binlog);
+
             if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
                 timestamps.add(Pair.of(binlog.getCommitSeq(), 
binlog.getTimestamp()));
             }
 
-            allBinlogs.add(binlog);
-
             if (tableIds == null) {
                 return;
             }
 
             // HACK: for metadata fix
+            // we should not add binlog for create table and drop table in 
table binlog
             if (!binlog.isSetType()) {
                 return;
             }
@@ -205,22 +214,22 @@ public class DBBinlog {
 
     public BinlogTombstone gc() {
         // check db
-        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
-        if (db == null) {
+        BinlogConfig dbBinlogConfig = 
binlogConfigCache.getDBBinlogConfig(dbId);
+        if (dbBinlogConfig == null) {
             LOG.error("db not found. dbId: {}", dbId);
             return null;
         }
 
-        boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
+        boolean dbBinlogEnable = dbBinlogConfig.isEnable();
         BinlogTombstone tombstone;
         if (dbBinlogEnable) {
             // db binlog is enabled, only one binlogTombstones
-            long ttlSeconds = db.getBinlogConfig().getTtlSeconds();
+            long ttlSeconds = dbBinlogConfig.getTtlSeconds();
             long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
 
             tombstone = dbBinlogEnableGc(expiredMs);
         } else {
-            tombstone = dbBinlogDisableGc(db);
+            tombstone = dbBinlogDisableGc();
         }
 
         return tombstone;
@@ -250,7 +259,7 @@ public class DBBinlog {
         return dbTombstone;
     }
 
-    private BinlogTombstone dbBinlogDisableGc(Database db) {
+    private BinlogTombstone dbBinlogDisableGc() {
         List<BinlogTombstone> tombstones = Lists.newArrayList();
         List<TableBinlog> tableBinlogs;
 
@@ -262,7 +271,7 @@ public class DBBinlog {
         }
 
         for (TableBinlog tableBinlog : tableBinlogs) {
-            BinlogTombstone tombstone = tableBinlog.gc(db);
+            BinlogTombstone tombstone = tableBinlog.ttlGc();
             if (tombstone != null) {
                 tombstones.add(tombstone);
             }
@@ -348,7 +357,7 @@ public class DBBinlog {
         List<BinlogTombstone> tableTombstones = Lists.newArrayList();
         for (TableBinlog tableBinlog : tableBinlogMap.values()) {
             // step 2.1: gc tableBinlog,and get table tombstone
-            BinlogTombstone tableTombstone = tableBinlog.gc(expiredCommitSeq);
+            BinlogTombstone tableTombstone = 
tableBinlog.commitSeqGc(expiredCommitSeq);
             if (tableTombstone != null) {
                 tableTombstones.add(tableTombstone);
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 8934084e99..0857ae7abb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -17,9 +17,7 @@
 
 package org.apache.doris.binlog;
 
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.common.Pair;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
@@ -37,11 +35,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class TableBinlog {
     private static final Logger LOG = LogManager.getLogger(TableBinlog.class);
 
+    private long dbId;
     private long tableId;
     private ReentrantReadWriteLock lock;
     private TreeSet<TBinlog> binlogs;
+    private BinlogConfigCache binlogConfigCache;
 
-    public TableBinlog(TBinlog binlog, long tableId) {
+    public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, 
long dbId, long tableId) {
+        this.dbId = dbId;
         this.tableId = tableId;
         lock = new ReentrantReadWriteLock();
         binlogs = 
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
@@ -53,6 +54,7 @@ public class TableBinlog {
             dummy = BinlogUtils.newDummyBinlog(binlog.getDbId(), tableId);
         }
         binlogs.add(dummy);
+        this.binlogConfigCache = binlogConfigCache;
     }
 
     public TBinlog getDummyBinlog() {
@@ -100,7 +102,7 @@ public class TableBinlog {
         }
     }
 
-    private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired, 
BinlogComparator check) {
+    private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired, 
BinlogComparator checker) {
         if (binlogs.size() <= 1) {
             return null;
         }
@@ -111,7 +113,7 @@ public class TableBinlog {
         TBinlog lastExpiredBinlog = null;
         while (iter.hasNext()) {
             TBinlog binlog = iter.next();
-            if (check.isExpired(binlog, expired)) {
+            if (checker.isExpired(binlog, expired)) {
                 lastExpiredBinlog = binlog;
                 --binlog.table_ref;
                 if (binlog.getType() == TBinlogType.UPSERT) {
@@ -133,7 +135,7 @@ public class TableBinlog {
     }
 
     // this method call when db binlog enable
-    public BinlogTombstone gc(long expiredCommitSeq) {
+    public BinlogTombstone commitSeqGc(long expiredCommitSeq) {
         Pair<TBinlog, Long> tombstoneInfo;
 
         // step 1: get tombstoneUpsertBinlog and dummyBinlog
@@ -163,31 +165,20 @@ public class TableBinlog {
     }
 
     // this method call when db binlog disable
-    public BinlogTombstone gc(Database db) {
+    public BinlogTombstone ttlGc() {
         // step 1: get expire time
-        OlapTable table;
-        try {
-            Table tbl = db.getTableOrMetaException(tableId);
-            if (tbl == null) {
-                LOG.warn("fail to get table. db: {}, table id: {}", 
db.getFullName(), tableId);
-                return null;
-            }
-            if (!(tbl instanceof OlapTable)) {
-                LOG.warn("table is not olap table. db: {}, table id: {}", 
db.getFullName(), tableId);
-                return null;
-            }
-            table = (OlapTable) tbl;
-        } catch (Exception e) {
-            LOG.warn("fail to get table. db: {}, table id: {}", 
db.getFullName(), tableId);
+        BinlogConfig tableBinlogConfig = 
binlogConfigCache.getTableBinlogConfig(dbId, tableId);
+        if (tableBinlogConfig == null) {
             return null;
         }
 
-        long ttlSeconds = table.getBinlogConfig().getTtlSeconds();
+        long ttlSeconds = tableBinlogConfig.getTtlSeconds();
         long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
 
         if (expiredMs < 0) {
             return null;
         }
+        LOG.info("ttl gc. dbId: {}, tableId: {}, expiredMs: {}", dbId, 
tableId, expiredMs);
 
         // step 2: get tombstoneUpsertBinlog and dummyBinlog
         Pair<TBinlog, Long> tombstoneInfo;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 967e129728..e956cf9f0a 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4400,8 +4400,9 @@ public class Env {
         DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), 
table, false);
         dynamicPartitionScheduler.createOrUpdateRuntimeInfo(table.getId(), 
DynamicPartitionScheduler.LAST_UPDATE_TIME,
                 TimeUtils.getCurrentFormatTime());
-        ModifyTablePropertyOperationLog info = new 
ModifyTablePropertyOperationLog(db.getId(), table.getId(),
-                logProperties);
+        ModifyTablePropertyOperationLog info =
+                new ModifyTablePropertyOperationLog(db.getId(), table.getId(), 
table.getName(),
+                        logProperties);
         editLog.logDynamicPartition(info);
     }
 
@@ -4473,9 +4474,9 @@ public class Env {
     public void modifyTableDefaultReplicaAllocation(Database db, OlapTable 
table, Map<String, String> properties) {
         Preconditions.checkArgument(table.isWriteLockHeldByCurrentThread());
         table.setReplicaAllocation(properties);
-        // log
-        ModifyTablePropertyOperationLog info = new 
ModifyTablePropertyOperationLog(db.getId(), table.getId(),
-                properties);
+        ModifyTablePropertyOperationLog info =
+                new ModifyTablePropertyOperationLog(db.getId(), table.getId(), 
table.getName(),
+                        properties);
         editLog.logModifyReplicationNum(info);
         LOG.debug("modify table[{}] replication num to {}", table.getName(),
                 properties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM));
@@ -4500,8 +4501,9 @@ public class Env {
             table.getPartitionInfo().setStoragePolicy(partition.getId(), 
tableProperty.getStoragePolicy());
         }
 
-        ModifyTablePropertyOperationLog info = new 
ModifyTablePropertyOperationLog(db.getId(), table.getId(),
-                properties);
+        ModifyTablePropertyOperationLog info =
+                new ModifyTablePropertyOperationLog(db.getId(), table.getId(), 
table.getName(),
+                        properties);
         editLog.logModifyInMemory(info);
     }
 
@@ -4510,8 +4512,9 @@ public class Env {
 
         table.setBinlogConfig(newBinlogConfig);
 
-        ModifyTablePropertyOperationLog info = new 
ModifyTablePropertyOperationLog(db.getId(), table.getId(),
-                newBinlogConfig.toProperties());
+        ModifyTablePropertyOperationLog info =
+                new ModifyTablePropertyOperationLog(db.getId(), table.getId(), 
table.getName(),
+                        newBinlogConfig.toProperties());
         editLog.logUpdateBinlogConfig(info);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index c030940a80..5c095636df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -759,6 +759,7 @@ public class InternalCatalog implements CatalogIf<Database> 
{
     public void alterDatabaseProperty(AlterDatabasePropertyStmt stmt) throws 
DdlException {
         String dbName = stmt.getDbName();
         Database db = (Database) getDbOrDdlException(dbName);
+        long dbId = db.getId();
         Map<String, String> properties = stmt.getProperties();
 
         db.writeLockOrDdlException();
@@ -768,7 +769,7 @@ public class InternalCatalog implements CatalogIf<Database> 
{
                 return;
             }
 
-            AlterDatabasePropertyInfo info = new 
AlterDatabasePropertyInfo(dbName, properties);
+            AlterDatabasePropertyInfo info = new 
AlterDatabasePropertyInfo(dbId, dbName, properties);
             Env.getCurrentEnv().getEditLog().logAlterDatabaseProperty(info);
         } finally {
             db.writeUnlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java
index 56858c86a6..5f1c2bec5b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterDatabasePropertyInfo.java
@@ -29,6 +29,9 @@ import java.io.IOException;
 import java.util.Map;
 
 public class AlterDatabasePropertyInfo implements Writable {
+    @SerializedName(value = "dbId")
+    private long dbId;
+
     @SerializedName(value = "dbName")
     private String dbName;
 
@@ -41,11 +44,16 @@ public class AlterDatabasePropertyInfo implements Writable {
         this.properties = null;
     }
 
-    public AlterDatabasePropertyInfo(String dbName, Map<String, String> 
properties) {
+    public AlterDatabasePropertyInfo(long dbId, String dbName, Map<String, 
String> properties) {
+        this.dbId = dbId;
         this.dbName = dbName;
         this.properties = properties;
     }
 
+    public long getDbId() {
+        return dbId;
+    }
+
     public String getDbName() {
         return dbName;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index eda39bc4f6..9b5a54323e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -750,6 +750,7 @@ public class EditLog {
                 case OperationType.OP_MODIFY_REPLICATION_NUM: {
                     ModifyTablePropertyOperationLog log = 
(ModifyTablePropertyOperationLog) journal.getData();
                     env.replayModifyTableProperty(opCode, log);
+                    env.getBinlogManager().addModifyTableProperty(log, logId);
                     break;
                 }
                 case OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM: {
@@ -1039,6 +1040,7 @@ public class EditLog {
                     LOG.info("replay alter database property: {}", 
alterDatabasePropertyInfo);
                     
env.replayAlterDatabaseProperty(alterDatabasePropertyInfo.getDbName(),
                             alterDatabasePropertyInfo.getProperties());
+                    
env.getBinlogManager().addAlterDatabaseProperty(alterDatabasePropertyInfo, 
logId);
                     break;
                 }
                 case OperationType.OP_GC_BINLOG: {
@@ -1623,24 +1625,30 @@ public class EditLog {
         logEdit(OperationType.OP_MODIFY_DISTRIBUTION_TYPE, tableInfo);
     }
 
+    private long logModifyTableProperty(short op, 
ModifyTablePropertyOperationLog info) {
+        long logId = logEdit(op, info);
+        Env.getCurrentEnv().getBinlogManager().addModifyTableProperty(info, 
logId);
+        return logId;
+    }
+
     public void logDynamicPartition(ModifyTablePropertyOperationLog info) {
-        logEdit(OperationType.OP_DYNAMIC_PARTITION, info);
+        logModifyTableProperty(OperationType.OP_DYNAMIC_PARTITION, info);
     }
 
-    public void logModifyReplicationNum(ModifyTablePropertyOperationLog info) {
-        logEdit(OperationType.OP_MODIFY_REPLICATION_NUM, info);
+    public long logModifyReplicationNum(ModifyTablePropertyOperationLog info) {
+        return logModifyTableProperty(OperationType.OP_MODIFY_REPLICATION_NUM, 
info);
     }
 
     public void 
logModifyDefaultDistributionBucketNum(ModifyTableDefaultDistributionBucketNumOperationLog
 info) {
         logEdit(OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM, info);
     }
 
-    public void logModifyInMemory(ModifyTablePropertyOperationLog info) {
-        logEdit(OperationType.OP_MODIFY_IN_MEMORY, info);
+    public long logModifyInMemory(ModifyTablePropertyOperationLog info) {
+        return logModifyTableProperty(OperationType.OP_MODIFY_IN_MEMORY, info);
     }
 
-    public void logUpdateBinlogConfig(ModifyTablePropertyOperationLog info) {
-        logEdit(OperationType.OP_UPDATE_BINLOG_CONFIG, info);
+    public long logUpdateBinlogConfig(ModifyTablePropertyOperationLog info) {
+        return logModifyTableProperty(OperationType.OP_UPDATE_BINLOG_CONFIG, 
info);
     }
 
     public void logAlterLightSchemaChange(AlterLightSchemaChangeInfo info) {
@@ -1829,7 +1837,9 @@ public class EditLog {
     }
 
     public long logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
-        return logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
+        long logId = logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
+        Env.getCurrentEnv().getBinlogManager().addAlterDatabaseProperty(log, 
logId);
+        return logId;
     }
 
     public long logGcBinlog(BinlogGcInfo log) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java
index f5a0a5d59d..a782db9f9c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/ModifyTablePropertyOperationLog.java
@@ -35,13 +35,27 @@ public class ModifyTablePropertyOperationLog implements 
Writable {
     private long dbId;
     @SerializedName(value = "tableId")
     private long tableId;
+    @SerializedName(value = "tableName")
+    private String tableName;
     @SerializedName(value = "properties")
     private Map<String, String> properties = new HashMap<>();
+    @SerializedName(value = "sql")
+    private String sql;
 
-    public ModifyTablePropertyOperationLog(long dbId, long tableId, 
Map<String, String> properties) {
+    public ModifyTablePropertyOperationLog(long dbId, long tableId, String 
tableName, Map<String, String> properties) {
         this.dbId = dbId;
         this.tableId = tableId;
+        this.tableName = tableName;
         this.properties = properties;
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("SET (");
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(",");
+        }
+        sb.deleteCharAt(sb.length() - 1); // remove last ','
+        sb.append(")");
+        this.sql = sb.toString();
     }
 
     public long getDbId() {
@@ -64,4 +78,8 @@ public class ModifyTablePropertyOperationLog implements 
Writable {
     public static ModifyTablePropertyOperationLog read(DataInput in) throws 
IOException {
         return GsonUtils.GSON.fromJson(Text.readString(in), 
ModifyTablePropertyOperationLog.class);
     }
+
+    public String toJson()  {
+        return GsonUtils.GSON.toJson(this);
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java
index 87ae23470a..bff50dcf76 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/persist/ModifyDynamicPartitionInfoTest.java
@@ -54,7 +54,7 @@ public class ModifyDynamicPartitionInfoTest {
         properties.put(DynamicPartitionProperty.END, "3");
         properties.put(DynamicPartitionProperty.PREFIX, "p");
         properties.put(DynamicPartitionProperty.BUCKETS, "30");
-        ModifyTablePropertyOperationLog modifyDynamicPartitionInfo = new 
ModifyTablePropertyOperationLog(100L, 200L, properties);
+        ModifyTablePropertyOperationLog modifyDynamicPartitionInfo = new 
ModifyTablePropertyOperationLog(100L, 200L, "test", properties);
         modifyDynamicPartitionInfo.write(out);
         out.flush();
         out.close();
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 06c3d96c72..0ea96a15f4 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -965,6 +965,8 @@ enum TBinlogType {
   ALTER_JOB = 5,
   MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 6,
   DUMMY = 7,
+  ALTER_DATABASE_PROPERTY = 8,
+  MODIFY_TABLE_PROPERTY = 9,
 }
 
 struct TBinlog {
@@ -976,6 +978,7 @@ struct TBinlog {
     6: optional string data
     7: optional i64 belong  // belong == -1 if type is not DUMMY
     8: optional i64 table_ref // only use for gc
+    9: optional bool remove_enable_cache
 }
 
 struct TGetBinlogResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to