This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 92176c46bf9 branch-2.1: [feat](binlog) filter the async mv binlogs #49028 (#49099) 92176c46bf9 is described below commit 92176c46bf9bf3c8474b2a24605a828d7b66234e Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Mar 28 10:01:00 2025 +0800 branch-2.1: [feat](binlog) filter the async mv binlogs #49028 (#49099) Cherry-picked from #49028 Co-authored-by: walter <maoch...@selectdb.com> --- .../org/apache/doris/binlog/BinlogConfigCache.java | 70 +++++++++++++++------- .../org/apache/doris/binlog/BinlogManager.java | 22 +++++++ 2 files changed, 69 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java index b07f5e5d87c..0bce3f375aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,10 +35,12 @@ public class BinlogConfigCache { private static final Logger LOG = LogManager.getLogger(BinlogConfigCache.class); private Map<Long, BinlogConfig> dbTableBinlogEnableMap; // db or table all use id + private Map<Long, TableIf.TableType> tableTypeMap; private ReentrantReadWriteLock lock; public BinlogConfigCache() { dbTableBinlogEnableMap = new HashMap<Long, BinlogConfig>(); + tableTypeMap = new HashMap<Long, TableIf.TableType>(); lock = new ReentrantReadWriteLock(); } @@ -93,29 +96,8 @@ public class BinlogConfigCache { lock.writeLock().lock(); try { - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - LOG.warn("db not found. dbId: {}", dbId); - return null; - } - - Table table = db.getTableNullable(tableId); - if (table == null) { - LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); - return null; - } - if (!(table instanceof OlapTable)) { - LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId); - return null; - } - - OlapTable olapTable = (OlapTable) table; - tableBinlogConfig = olapTable.getBinlogConfig(); - // get table binlog config, when table modify binlogConfig - // it create a new binlog, not update inplace, so we don't need to clone - // binlogConfig - dbTableBinlogEnableMap.put(tableId, tableBinlogConfig); - return tableBinlogConfig; + loadTableBinlogConfig(dbId, tableId); + return dbTableBinlogEnableMap.get(tableId); // null if not exists } catch (Exception e) { LOG.warn("fail to get table. db: {}, table id: {}", dbId, tableId); return null; @@ -124,6 +106,48 @@ public class BinlogConfigCache { } } + private void loadTableBinlogConfig(long dbId, long tableId) { + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + LOG.warn("db not found. dbId: {}", dbId); + return; + } + + Table table = db.getTableNullable(tableId); + if (table == null) { + LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); + return; + } + if (!(table instanceof OlapTable)) { // MTMV is an instance of OlapTable + LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId); + return; + } + + OlapTable olapTable = (OlapTable) table; + // get table binlog config, when table modify binlogConfig + // it create a new binlog, not update inplace, so we don't need to clone + // binlogConfig + dbTableBinlogEnableMap.put(tableId, olapTable.getBinlogConfig()); + tableTypeMap.put(tableId, table.getType()); + } + + public boolean isAsyncMvTable(long dbId, long tableId) { + lock.readLock().lock(); + TableIf.TableType tableType = tableTypeMap.get(tableId); + lock.readLock().unlock(); + if (tableType != null) { + return tableType == TableIf.TableType.MATERIALIZED_VIEW; + } + + lock.writeLock().lock(); + try { + loadTableBinlogConfig(dbId, tableId); + return tableTypeMap.get(tableId) == TableIf.TableType.MATERIALIZED_VIEW; + } finally { + lock.writeLock().unlock(); + } + } + public boolean isEnableTable(long dbId, long tableId) { BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId); if (tableBinlogConfig == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index b35092a830d..094a4b27c9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -107,11 +107,33 @@ public class BinlogManager { } } + private boolean isAsyncMvBinlog(TBinlog binlog) { + if (!binlog.isSetTableIds()) { + return false; + } + + // Filter the binlogs belong to async materialized view, since we don't support async mv right now. + for (long tableId : binlog.getTableIds()) { + if (binlogConfigCache.isAsyncMvTable(binlog.getDbId(), tableId)) { + LOG.debug("filter the async mv binlog, db {}, table {}, commit seq {}, ts {}, type {}, data {}", + binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), binlog.getTimestamp(), + binlog.getType(), binlog.getData()); + return true; + } + } + + return false; + } + private void addBinlog(TBinlog binlog, Object raw) { if (!Config.enable_feature_binlog) { return; } + if (isAsyncMvBinlog(binlog)) { + return; + } + LOG.debug("add binlog, db {}, table {}, commitSeq {}, timestamp {}, type {}, data {}", binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), binlog.getTimestamp(), binlog.getType(), binlog.getData()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org