This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 92176c46bf9 branch-2.1: [feat](binlog) filter the async mv binlogs 
#49028 (#49099)
92176c46bf9 is described below

commit 92176c46bf9bf3c8474b2a24605a828d7b66234e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 28 10:01:00 2025 +0800

    branch-2.1: [feat](binlog) filter the async mv binlogs #49028 (#49099)
    
    Cherry-picked from #49028
    
    Co-authored-by: walter <maoch...@selectdb.com>
---
 .../org/apache/doris/binlog/BinlogConfigCache.java | 70 +++++++++++++++-------
 .../org/apache/doris/binlog/BinlogManager.java     | 22 +++++++
 2 files changed, 69 insertions(+), 23 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
index b07f5e5d87c..0bce3f375aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -34,10 +35,12 @@ public class BinlogConfigCache {
     private static final Logger LOG = 
LogManager.getLogger(BinlogConfigCache.class);
 
     private Map<Long, BinlogConfig> dbTableBinlogEnableMap; // db or table all 
use id
+    private Map<Long, TableIf.TableType> tableTypeMap;
     private ReentrantReadWriteLock lock;
 
     public BinlogConfigCache() {
         dbTableBinlogEnableMap = new HashMap<Long, BinlogConfig>();
+        tableTypeMap = new HashMap<Long, TableIf.TableType>();
         lock = new ReentrantReadWriteLock();
     }
 
@@ -93,29 +96,8 @@ public class BinlogConfigCache {
 
         lock.writeLock().lock();
         try {
-            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
-            if (db == null) {
-                LOG.warn("db not found. dbId: {}", dbId);
-                return null;
-            }
-
-            Table table = db.getTableNullable(tableId);
-            if (table == null) {
-                LOG.warn("fail to get table. db: {}, table id: {}", 
db.getFullName(), tableId);
-                return null;
-            }
-            if (!(table instanceof OlapTable)) {
-                LOG.warn("table is not olap table. db: {}, table id: {}", 
db.getFullName(), tableId);
-                return null;
-            }
-
-            OlapTable olapTable = (OlapTable) table;
-            tableBinlogConfig = olapTable.getBinlogConfig();
-            // get table binlog config, when table modify binlogConfig
-            // it create a new binlog, not update inplace, so we don't need to 
clone
-            // binlogConfig
-            dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
-            return tableBinlogConfig;
+            loadTableBinlogConfig(dbId, tableId);
+            return dbTableBinlogEnableMap.get(tableId); // null if not exists
         } catch (Exception e) {
             LOG.warn("fail to get table. db: {}, table id: {}", dbId, tableId);
             return null;
@@ -124,6 +106,48 @@ public class BinlogConfigCache {
         }
     }
 
+    private void loadTableBinlogConfig(long dbId, long tableId) {
+        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+        if (db == null) {
+            LOG.warn("db not found. dbId: {}", dbId);
+            return;
+        }
+
+        Table table = db.getTableNullable(tableId);
+        if (table == null) {
+            LOG.warn("fail to get table. db: {}, table id: {}", 
db.getFullName(), tableId);
+            return;
+        }
+        if (!(table instanceof OlapTable)) { // MTMV is an instance of 
OlapTable
+            LOG.warn("table is not olap table. db: {}, table id: {}", 
db.getFullName(), tableId);
+            return;
+        }
+
+        OlapTable olapTable = (OlapTable) table;
+        // get table binlog config, when table modify binlogConfig
+        // it create a new binlog, not update inplace, so we don't need to 
clone
+        // binlogConfig
+        dbTableBinlogEnableMap.put(tableId, olapTable.getBinlogConfig());
+        tableTypeMap.put(tableId, table.getType());
+    }
+
+    public boolean isAsyncMvTable(long dbId, long tableId) {
+        lock.readLock().lock();
+        TableIf.TableType tableType = tableTypeMap.get(tableId);
+        lock.readLock().unlock();
+        if (tableType != null) {
+            return tableType == TableIf.TableType.MATERIALIZED_VIEW;
+        }
+
+        lock.writeLock().lock();
+        try {
+            loadTableBinlogConfig(dbId, tableId);
+            return tableTypeMap.get(tableId) == 
TableIf.TableType.MATERIALIZED_VIEW;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     public boolean isEnableTable(long dbId, long tableId) {
         BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
         if (tableBinlogConfig == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index b35092a830d..094a4b27c9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -107,11 +107,33 @@ public class BinlogManager {
         }
     }
 
+    private boolean isAsyncMvBinlog(TBinlog binlog) {
+        if (!binlog.isSetTableIds()) {
+            return false;
+        }
+
+        // Filter the binlogs belong to async materialized view, since we 
don't support async mv right now.
+        for (long tableId : binlog.getTableIds()) {
+            if (binlogConfigCache.isAsyncMvTable(binlog.getDbId(), tableId)) {
+                LOG.debug("filter the async mv binlog, db {}, table {}, commit 
seq {}, ts {}, type {}, data {}",
+                        binlog.getDbId(), binlog.getTableIds(), 
binlog.getCommitSeq(), binlog.getTimestamp(),
+                        binlog.getType(), binlog.getData());
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     private void addBinlog(TBinlog binlog, Object raw) {
         if (!Config.enable_feature_binlog) {
             return;
         }
 
+        if (isAsyncMvBinlog(binlog)) {
+            return;
+        }
+
         LOG.debug("add binlog, db {}, table {}, commitSeq {}, timestamp {}, 
type {}, data {}",
                 binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), 
binlog.getTimestamp(), binlog.getType(),
                 binlog.getData());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to