This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a261c2788e0 [enhance-wip](multi-catalog) Speed up consume rate of hms 
events. (#27666)
a261c2788e0 is described below

commit a261c2788e03472e4ee8f46831ad4e2f4762559a
Author: Xiangyu Wang <dut.xian...@gmail.com>
AuthorDate: Fri Jan 19 15:24:12 2024 +0800

    [enhance-wip](multi-catalog) Speed up consume rate of hms events. (#27666)
    
    ## Proposed changes
    
    The current implement will persist all catalogs/databases of external 
catalogs, and only the master FE can handle hms events and make all slave nodes 
replay these events, this will bring some problems:
    
    - The hms event processor ( `MetastoreEventsProcessor` ) can not consume 
the events in time. (Add journal log is a synchronized method, we can not speed 
up the consume rate by using concurrent processing, and each add-journal-log 
operation costs about tens of milliseconds) So the meta info of hive maybe out 
of date.
    
    - Slave FE nodes maybe crashed if FE replays the journal logs of hms events 
failed. (In fact we have fixed some issues about this, but we can not make sure 
all the issues have been resolved)
    
    - There are many journal logs which are produced by hms events, but in fact 
these logs are not used anymore after FE restart. It makes the start time of 
all FE nodes very long.
    
    Now doris try to persis all databases/tables of external catalogs just to 
make sure that the dbId/tableId of databases/tables are the same through all FE 
nodes, it will be used by analysis jobs.
    
    In this pr, we use a meta id manager called `ExternalMetaIdMgr` to manage 
these meta ids. On every loop when master fetches a batch of hms events, it 
handles the meta ids first and produce only one meta id mappings log, slave FE 
nodes will replay this log to sync the changes about these meta ids. 
`MetastoreEventsProcessor` will start on every FE nodes and try to consume 
these hms events as soon as possible.
    
    ## Further comments
    
    I've submitted two prs ( #22869 #21589 ) to speed up the consume rate of 
hms events before, it works fine when there are many `AlterTableEvent` / 
`DropTableEvent` on hive cluster. But the improvement is not that significant 
when most of hms events are partition-events.  Unfortunately, we performed a 
cluster upgrade (upgrade spark 2.x to spark 3.x), maybe this is the reason that 
resulting in the majority of Hive Metastore events became partition-events. 
This is also the reason for the  [...]
    
    Based on our observation, after merging this pull request, Doris is now 
capable of processing thousands of Hive Metastore events per second, compared 
to the previous capability of handling only a few dozen events.
    
    ```java
    2023-12-07 05:17:03,518 INFO (replayer|105) [Env.replayJournal():2614] 
replayed journal id is 18287902, replay to journal id is 18287903
    2023-12-07 05:17:03,735 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEventFactory.mergeEvents():188] Event size on catalog [xxx] before 
merge is [1947], after merge is [1849]
    2023-12-07 05:17:03,735 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEvent.infoLog():193] EventId: 357955309 EventType: ALTER_PARTITION 
catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2022-05-27],partitionNameAfter:[partitions=2022-05-27]
    2023-12-07 05:17:03,735 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEvent.infoLog():193] EventId: 357955310 EventType: ALTER_PARTITION 
catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20230318],partitionNameAfter:[pday=20230318]
    2023-12-07 05:17:03,735 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEvent.infoLog():193] EventId: 357955311 EventType: ALTER_PARTITION 
catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20190826],partitionNameAfter:[pday=20190826]
    2023-12-07 05:17:03,735 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEvent.infoLog():193] EventId: 357955312 EventType: ALTER_PARTITION 
catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2021-09-16],partitionNameAfter:[partitions=2021-09-16]
    2023-12-07 05:17:03,735 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEvent.infoLog():193] EventId: 357955314 EventType: ALTER_PARTITION 
catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2020-04-26],partitionNameAfter:[partitions=2020-04-26]
    2023-12-07 05:17:03,735 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEvent.infoLog():193] EventId: 357955315 EventType: ALTER_PARTITION 
catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20230702],partitionNameAfter:[pday=20230702]
    2023-12-07 05:17:03,735 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEvent.infoLog():193] EventId: 357955317 EventType: ALTER_PARTITION 
catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20211019],partitionNameAfter:[pday=20211019]
    ...
    2023-12-07 05:17:03,989 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEvent.infoLog():193] EventId: 357957252 EventType: ALTER_PARTITION 
catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2021-08-27],partitionNameAfter:[partitions=2021-08-27]
    2023-12-07 05:17:03,989 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEvent.infoLog():193] EventId: 357957253 EventType: ALTER_PARTITION 
catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2022-02-05],partitionNameAfter:[partitions=2022-02-05]
    2023-12-07 05:17:04,661 INFO (replayer|105) [Env.replayJournal():2614] 
replayed journal id is 18287903, replay to journal id is 18287904
    2023-12-07 05:17:05,028 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEventsProcessor.realRun():116] Events size are 587 on catalog [xxx]
    2023-12-07 05:17:05,662 INFO 
(org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) 
[MetastoreEventFactory.mergeEvents():188] Event size on catalog [xxx] before 
merge is [587], after merge is [587]
    ```
---
 .../main/java/org/apache/doris/catalog/Env.java    |  17 +-
 .../doris/catalog/external/ExternalDatabase.java   |  20 +-
 .../catalog/external/HMSExternalDatabase.java      |  13 +-
 .../catalog/external/IcebergExternalDatabase.java  |   4 +-
 .../catalog/external/PaimonExternalDatabase.java   |   4 +-
 .../org/apache/doris/datasource/CatalogMgr.java    | 257 +++----------------
 .../apache/doris/datasource/ExternalCatalog.java   |  14 +-
 .../apache/doris/datasource/ExternalMetaIdMgr.java | 263 ++++++++++++++++++++
 .../doris/datasource/HMSExternalCatalog.java       | 107 ++------
 .../org/apache/doris/datasource/InitTableLog.java  |  67 -----
 .../apache/doris/datasource/MetaIdMappingsLog.java | 274 +++++++++++++++++++++
 .../datasource/hive/event/AddPartitionEvent.java   |  16 ++
 .../datasource/hive/event/AlterTableEvent.java     |   6 +-
 .../datasource/hive/event/CreateDatabaseEvent.java |  12 +
 .../datasource/hive/event/CreateTableEvent.java    |  14 +-
 .../datasource/hive/event/DropDatabaseEvent.java   |  11 +
 .../datasource/hive/event/DropPartitionEvent.java  |  17 +-
 .../datasource/hive/event/DropTableEvent.java      |  15 +-
 .../datasource/hive/event/MetastoreEvent.java      |   9 +
 .../hive/event/MetastoreEventFactory.java          |  26 +-
 .../hive/event/MetastoreEventsProcessor.java       | 225 ++++++++++++++---
 .../org/apache/doris/journal/JournalEntity.java    |  20 +-
 .../java/org/apache/doris/persist/EditLog.java     |  29 +--
 .../org/apache/doris/persist/OperationType.java    |   9 +
 .../doris/datasource/ExternalMetaIdMgrTest.java    |  76 ++++++
 .../doris/datasource/MetaIdMappingsLogTest.java    |  97 ++++++++
 .../external/hms/MetastoreEventFactoryTest.java    |   2 +-
 27 files changed, 1138 insertions(+), 486 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 222eb195b34..37930708cc8 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -132,6 +132,7 @@ import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.CatalogMgr;
 import org.apache.doris.datasource.EsExternalCatalog;
 import org.apache.doris.datasource.ExternalMetaCacheMgr;
+import org.apache.doris.datasource.ExternalMetaIdMgr;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.hive.HiveTransactionMgr;
 import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
@@ -362,6 +363,7 @@ public class Env {
     private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
     private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
     private CooldownConfHandler cooldownConfHandler;
+    private ExternalMetaIdMgr externalMetaIdMgr;
     private MetastoreEventsProcessor metastoreEventsProcessor;
 
     private ExportTaskRegister exportTaskRegister;
@@ -649,6 +651,7 @@ public class Env {
         if (Config.enable_storage_policy) {
             this.cooldownConfHandler = new CooldownConfHandler();
         }
+        this.externalMetaIdMgr = new ExternalMetaIdMgr();
         this.metastoreEventsProcessor = new MetastoreEventsProcessor();
         this.jobManager = new JobManager<>();
         this.labelProcessor = new LabelProcessor();
@@ -844,6 +847,14 @@ public class Env {
         return workloadRuntimeStatusMgr;
     }
 
+    public ExternalMetaIdMgr getExternalMetaIdMgr() {
+        return externalMetaIdMgr;
+    }
+
+    public MetastoreEventsProcessor getMetastoreEventsProcessor() {
+        return metastoreEventsProcessor;
+    }
+
     // use this to get correct ClusterInfoService instance
     public static SystemInfoService getCurrentSystemInfo() {
         return getCurrentEnv().getClusterInfo();
@@ -1638,9 +1649,6 @@ public class Env {
         streamLoadRecordMgr.start();
         tabletLoadIndexRecorderMgr.start();
         new InternalSchemaInitializer().start();
-        if (Config.enable_hms_events_incremental_sync) {
-            metastoreEventsProcessor.start();
-        }
         getRefreshManager().start();
 
         // binlog gcer
@@ -1662,6 +1670,9 @@ public class Env {
         domainResolver.start();
         // fe disk updater
         feDiskUpdater.start();
+        if (Config.enable_hms_events_incremental_sync) {
+            metastoreEventsProcessor.start();
+        }
     }
 
     private void transferToNonMaster(FrontendNodeType newType) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index e6584d4a0b6..4af08e9b384 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -146,8 +146,15 @@ public abstract class ExternalDatabase<T extends 
ExternalTable>
         Map<Long, T> tmpIdToTbl = Maps.newConcurrentMap();
         for (int i = 0; i < log.getRefreshCount(); i++) {
             T table = getTableForReplay(log.getRefreshTableIds().get(i));
-            tmpTableNameToId.put(table.getName(), table.getId());
-            tmpIdToTbl.put(table.getId(), table);
+            // When upgrade cluster with this pr: 
https://github.com/apache/doris/pull/27666
+            // Maybe there are some create table events will be skipped
+            // if the cluster has any hms catalog(s) with hms event listener 
enabled.
+            // So we need add a validation here to avoid table(s) not found, 
this is just a temporary solution
+            // because later we will remove all the logics about 
InitCatalogLog/InitDatabaseLog.
+            if (table != null) {
+                tmpTableNameToId.put(table.getName(), table.getId());
+                tmpIdToTbl.put(table.getId(), table);
+            }
         }
         for (int i = 0; i < log.getCreateCount(); i++) {
             T table = getExternalTable(log.getCreateTableNames().get(i), 
log.getCreateTableIds().get(i), catalog);
@@ -195,8 +202,7 @@ public abstract class ExternalDatabase<T extends 
ExternalTable>
             idToTbl = tmpIdToTbl;
         }
 
-        long currentTime = System.currentTimeMillis();
-        lastUpdateTime = currentTime;
+        lastUpdateTime = System.currentTimeMillis();
         initDatabaseLog.setLastUpdateTime(lastUpdateTime);
         initialized = true;
         Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
@@ -370,17 +376,13 @@ public abstract class ExternalDatabase<T extends 
ExternalTable>
         throw new NotImplementedException("dropTable() is not implemented");
     }
 
-    public void dropTableForReplay(String tableName) {
-        throw new NotImplementedException("replayDropTableFromEvent() is not 
implemented");
-    }
-
     @Override
     public CatalogIf getCatalog() {
         return extCatalog;
     }
 
     // Only used for sync hive metastore event
-    public void createTableForReplay(String tableName, long tableId) {
+    public void createTable(String tableName, long tableId) {
         throw new NotImplementedException("createTable() is not implemented");
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index d75f86bd088..f586ea7ed8a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -64,17 +64,6 @@ public class HMSExternalDatabase extends 
ExternalDatabase<HMSExternalTable> {
 
     @Override
     public void dropTable(String tableName) {
-        LOG.debug("drop table [{}]", tableName);
-        makeSureInitialized();
-        Long tableId = tableNameToId.remove(tableName);
-        if (tableId == null) {
-            LOG.warn("drop table [{}] failed", tableName);
-        }
-        idToTbl.remove(tableId);
-    }
-
-    @Override
-    public void dropTableForReplay(String tableName) {
         LOG.debug("replayDropTableFromEvent [{}]", tableName);
         Long tableId = tableNameToId.remove(tableName);
         if (tableId == null) {
@@ -85,7 +74,7 @@ public class HMSExternalDatabase extends 
ExternalDatabase<HMSExternalTable> {
     }
 
     @Override
-    public void createTableForReplay(String tableName, long tableId) {
+    public void createTable(String tableName, long tableId) {
         LOG.debug("create table [{}]", tableName);
         tableNameToId.put(tableName, tableId);
         HMSExternalTable table = getExternalTable(tableName, tableId, 
extCatalog);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
index a915b3b2418..1b5cd805a0f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
@@ -49,7 +49,7 @@ public class IcebergExternalDatabase extends 
ExternalDatabase<IcebergExternalTab
     }
 
     @Override
-    public void dropTableForReplay(String tableName) {
+    public void dropTable(String tableName) {
         LOG.debug("drop table [{}]", tableName);
         Long tableId = tableNameToId.remove(tableName);
         if (tableId == null) {
@@ -59,7 +59,7 @@ public class IcebergExternalDatabase extends 
ExternalDatabase<IcebergExternalTab
     }
 
     @Override
-    public void createTableForReplay(String tableName, long tableId) {
+    public void createTable(String tableName, long tableId) {
         LOG.debug("create table [{}]", tableName);
         tableNameToId.put(tableName, tableId);
         IcebergExternalTable table = new IcebergExternalTable(tableId, 
tableName, name,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
index a839b31298f..0e6ee7332d0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
@@ -49,7 +49,7 @@ public class PaimonExternalDatabase extends 
ExternalDatabase<PaimonExternalTable
     }
 
     @Override
-    public void dropTableForReplay(String tableName) {
+    public void dropTable(String tableName) {
         LOG.debug("drop table [{}]", tableName);
         Long tableId = tableNameToId.remove(tableName);
         if (tableId == null) {
@@ -59,7 +59,7 @@ public class PaimonExternalDatabase extends 
ExternalDatabase<PaimonExternalTable
     }
 
     @Override
-    public void createTableForReplay(String tableName, long tableId) {
+    public void createTable(String tableName, long tableId) {
         LOG.debug("create table [{}]", tableName);
         tableNameToId.put(tableName, tableId);
         PaimonExternalTable table = new PaimonExternalTable(tableId, 
tableName, name,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index 2ad27c87f22..76d7702fef4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -33,6 +33,7 @@ import org.apache.doris.catalog.Resource.ReferenceType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.HMSExternalDatabase;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
@@ -345,10 +346,10 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         writeLock();
         try {
             CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
-            Map<String, String> oldProperties = catalog.getProperties();
             if (catalog == null) {
                 throw new DdlException("No catalog found with name: " + 
stmt.getCatalogName());
             }
+            Map<String, String> oldProperties = catalog.getProperties();
             if (stmt.getNewProperties().containsKey("type") && 
!catalog.getType()
                     .equalsIgnoreCase(stmt.getNewProperties().get("type"))) {
                 throw new DdlException("Can't modify the type of catalog 
property with name: " + stmt.getCatalogName());
@@ -682,12 +683,6 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         ((HMSExternalTable) table).unsetObjectCreated();
         ((HMSExternalTable) table).setEventUpdateTime(updateTime);
         
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), 
dbName, tableName);
-        ExternalObjectLog log = new ExternalObjectLog();
-        log.setCatalogId(catalog.getId());
-        log.setDbId(db.getId());
-        log.setTableId(table.getId());
-        log.setLastUpdateTime(updateTime);
-        Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
     }
 
     public void refreshExternalTable(String dbName, String tableName, String 
catalogName, boolean ignoreIfNotExists)
@@ -773,43 +768,16 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
             }
             return;
         }
-        ExternalObjectLog log = new ExternalObjectLog();
-        log.setCatalogId(catalog.getId());
-        log.setDbId(db.getId());
-        log.setTableId(table.getId());
-        log.setLastUpdateTime(System.currentTimeMillis());
-        replayDropExternalTable(log);
-        Env.getCurrentEnv().getEditLog().logDropExternalTable(log);
-    }
 
-    public void replayDropExternalTable(ExternalObjectLog log) {
-        
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", 
log.getCatalogId(), log.getDbId(),
-                log.getTableId());
-        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
-        if (catalog == null) {
-            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
-            return;
-        }
-        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
-        if (db == null) {
-            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
-            return;
-        }
-        ExternalTable table = db.getTableForReplay(log.getTableId());
-        if (table == null) {
-            LOG.warn("No table found with id:[{}], it may have been dropped.", 
log.getTableId());
-            return;
-        }
         db.writeLock();
         try {
-            db.dropTableForReplay(table.getName());
-            db.setLastUpdateTime(log.getLastUpdateTime());
+            db.dropTable(table.getName());
+            Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(
+                        catalog.getId(), db.getFullName(), table.getName());
+            ((HMSExternalDatabase) 
db).setLastUpdateTime(System.currentTimeMillis());
         } finally {
             db.writeUnlock();
         }
-
-        Env.getCurrentEnv().getExtMetaCacheMgr()
-                .invalidateTableCache(catalog.getId(), db.getFullName(), 
table.getName());
     }
 
     public boolean externalTableExistInLocal(String dbName, String tableName, 
String catalogName) throws DdlException {
@@ -823,9 +791,9 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         return ((ExternalCatalog) catalog).tableExistInLocal(dbName, 
tableName);
     }
 
-    public void createExternalTableFromEvent(String dbName, String tableName, 
String catalogName,
-            boolean ignoreIfExists)
-            throws DdlException {
+    public void createExternalTableFromEvent(String dbName, String tableName,
+                                             String catalogName, long 
updateTime,
+                                             boolean ignoreIfExists) throws 
DdlException {
         CatalogIf catalog = nameToCatalog.get(catalogName);
         if (catalog == null) {
             throw new DdlException("No catalog found with name: " + 
catalogName);
@@ -848,33 +816,21 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
             }
             return;
         }
-        ExternalObjectLog log = new ExternalObjectLog();
-        log.setCatalogId(catalog.getId());
-        log.setDbId(db.getId());
-        log.setTableName(tableName);
-        log.setTableId(Env.getCurrentEnv().getNextId());
-        log.setLastUpdateTime(System.currentTimeMillis());
-        replayCreateExternalTableFromEvent(log);
-        Env.getCurrentEnv().getEditLog().logCreateExternalTable(log);
-    }
 
-    public void replayCreateExternalTableFromEvent(ExternalObjectLog log) {
-        
LOG.debug("ReplayCreateExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}],tableName:[{}]",
 log.getCatalogId(),
-                log.getDbId(), log.getTableId(), log.getTableName());
-        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
-        if (catalog == null) {
-            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
-            return;
-        }
-        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
-        if (db == null) {
-            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
+        long tblId = 
Env.getCurrentEnv().getExternalMetaIdMgr().getTblId(catalog.getId(), dbName, 
tableName);
+        // -1L means it will be dropped later, ignore
+        if (tblId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) {
             return;
         }
+
         db.writeLock();
         try {
-            db.createTableForReplay(log.getTableName(), log.getTableId());
-            db.setLastUpdateTime(log.getLastUpdateTime());
+            ((HMSExternalDatabase) db).createTable(tableName, tblId);
+            ((HMSExternalDatabase) 
db).setLastUpdateTime(System.currentTimeMillis());
+            table = db.getTableNullable(tableName);
+            if (table != null) {
+                ((HMSExternalTable) table).setEventUpdateTime(updateTime);
+            }
         } finally {
             db.writeUnlock();
         }
@@ -896,34 +852,8 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
             return;
         }
 
-        ExternalObjectLog log = new ExternalObjectLog();
-        log.setCatalogId(catalog.getId());
-        log.setDbId(db.getId());
-        log.setInvalidCache(true);
-        replayDropExternalDatabase(log);
-        Env.getCurrentEnv().getEditLog().logDropExternalDatabase(log);
-    }
-
-    public void replayDropExternalDatabase(ExternalObjectLog log) {
-        writeLock();
-        try {
-            
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", 
log.getCatalogId(),
-                    log.getDbId(), log.getTableId());
-            ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
-            if (catalog == null) {
-                LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
-                return;
-            }
-            ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
-            if (db == null) {
-                LOG.warn("No db found with id:[{}], it may have been 
dropped.", log.getDbId());
-                return;
-            }
-            catalog.dropDatabaseForReplay(db.getFullName());
-            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), 
db.getFullName());
-        } finally {
-            writeUnlock();
-        }
+        ((HMSExternalCatalog) catalog).dropDatabase(dbName);
+        
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), 
dbName);
     }
 
     public void createExternalDatabase(String dbName, String catalogName, 
boolean ignoreIfExists) throws DdlException {
@@ -942,28 +872,13 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
             return;
         }
 
-        ExternalObjectLog log = new ExternalObjectLog();
-        log.setCatalogId(catalog.getId());
-        log.setDbId(Env.getCurrentEnv().getNextId());
-        log.setDbName(dbName);
-        replayCreateExternalDatabase(log);
-        Env.getCurrentEnv().getEditLog().logCreateExternalDatabase(log);
-    }
-
-    public void replayCreateExternalDatabase(ExternalObjectLog log) {
-        writeLock();
-        try {
-            
LOG.debug("ReplayCreateExternalDatabase,catalogId:[{}],dbId:[{}],dbName:[{}]", 
log.getCatalogId(),
-                    log.getDbId(), log.getDbName());
-            ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
-            if (catalog == null) {
-                LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
-                return;
-            }
-            catalog.createDatabaseForReplay(log.getDbId(), log.getDbName());
-        } finally {
-            writeUnlock();
+        long dbId = 
Env.getCurrentEnv().getExternalMetaIdMgr().getDbId(catalog.getId(), dbName);
+        // -1L means it will be dropped later, ignore
+        if (dbId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) {
+            return;
         }
+
+        ((HMSExternalCatalog) catalog).createDatabase(dbId, dbName);
     }
 
     public void addExternalPartitions(String catalogName, String dbName, 
String tableName,
@@ -999,48 +914,6 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         HMSExternalTable hmsTable = (HMSExternalTable) table;
         
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), 
hmsTable, partitionNames);
         hmsTable.setEventUpdateTime(updateTime);
-        ExternalObjectLog log = new ExternalObjectLog();
-        log.setCatalogId(catalog.getId());
-        log.setDbId(db.getId());
-        log.setTableId(table.getId());
-        log.setPartitionNames(partitionNames);
-        log.setLastUpdateTime(updateTime);
-        Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log);
-    }
-
-    public void replayAddExternalPartitions(ExternalObjectLog log) {
-        
LOG.debug("ReplayAddExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", 
log.getCatalogId(),
-                log.getDbId(), log.getTableId());
-        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
-        if (catalog == null) {
-            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
-            return;
-        }
-        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
-        if (db == null) {
-            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
-            return;
-        }
-        ExternalTable table = db.getTableForReplay(log.getTableId());
-        if (table == null) {
-            LOG.warn("No table found with id:[{}], it may have been dropped.", 
log.getTableId());
-            return;
-        }
-        if (!(table instanceof HMSExternalTable)) {
-            LOG.warn("only support HMSTable");
-            return;
-        }
-
-        HMSExternalTable hmsTable = (HMSExternalTable) table;
-        try {
-            Env.getCurrentEnv().getExtMetaCacheMgr()
-                        .addPartitionsCache(catalog.getId(), hmsTable, 
log.getPartitionNames());
-            hmsTable.setEventUpdateTime(log.getLastUpdateTime());
-        } catch (HMSClientException e) {
-            LOG.warn("Network problem occurs or hms table has been deleted, 
fallback to invalidate table cache", e);
-            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(),
-                    db.getFullName(), table.getName());
-        }
     }
 
     public void dropExternalPartitions(String catalogName, String dbName, 
String tableName,
@@ -1069,42 +942,9 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
             return;
         }
 
-        ExternalObjectLog log = new ExternalObjectLog();
-        log.setCatalogId(catalog.getId());
-        log.setDbId(db.getId());
-        log.setTableId(table.getId());
-        log.setPartitionNames(partitionNames);
-        log.setLastUpdateTime(updateTime);
-        replayDropExternalPartitions(log);
-        Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log);
-    }
-
-    public void replayDropExternalPartitions(ExternalObjectLog log) {
-        
LOG.debug("ReplayDropExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", 
log.getCatalogId(),
-                log.getDbId(), log.getTableId());
-        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
-        if (catalog == null) {
-            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
-            return;
-        }
-        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
-        if (db == null) {
-            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
-            return;
-        }
-        ExternalTable table = db.getTableForReplay(log.getTableId());
-        if (table == null) {
-            LOG.warn("No table found with id:[{}], it may have been dropped.", 
log.getTableId());
-            return;
-        }
-        if (!(table instanceof HMSExternalTable)) {
-            LOG.warn("only support HMSTable");
-            return;
-        }
         HMSExternalTable hmsTable = (HMSExternalTable) table;
-        Env.getCurrentEnv().getExtMetaCacheMgr()
-                .dropPartitionsCache(catalog.getId(), hmsTable, 
log.getPartitionNames());
-        hmsTable.setEventUpdateTime(log.getLastUpdateTime());
+        
Env.getCurrentEnv().getExtMetaCacheMgr().dropPartitionsCache(catalog.getId(), 
hmsTable, partitionNames);
+        hmsTable.setEventUpdateTime(updateTime);
     }
 
     public void refreshExternalPartitions(String catalogName, String dbName, 
String tableName,
@@ -1136,42 +976,9 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
             return;
         }
 
-        ExternalObjectLog log = new ExternalObjectLog();
-        log.setCatalogId(catalog.getId());
-        log.setDbId(db.getId());
-        log.setTableId(table.getId());
-        log.setPartitionNames(partitionNames);
-        log.setLastUpdateTime(updateTime);
-        replayRefreshExternalPartitions(log);
-        Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log);
-    }
-
-    public void replayRefreshExternalPartitions(ExternalObjectLog log) {
-        
LOG.debug("replayRefreshExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]",
 log.getCatalogId(),
-                log.getDbId(), log.getTableId());
-        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
-        if (catalog == null) {
-            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
-            return;
-        }
-        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
-        if (db == null) {
-            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
-            return;
-        }
-        ExternalTable table = db.getTableForReplay(log.getTableId());
-        if (table == null) {
-            LOG.warn("No table found with id:[{}], it may have been dropped.", 
log.getTableId());
-            return;
-        }
-        if (!(table instanceof HMSExternalTable)) {
-            LOG.warn("only support HMSTable");
-            return;
-        }
-        Env.getCurrentEnv().getExtMetaCacheMgr()
-                .invalidatePartitionsCache(catalog.getId(), db.getFullName(), 
table.getName(),
-                        log.getPartitionNames());
-        ((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime());
+        Env.getCurrentEnv().getExtMetaCacheMgr().invalidatePartitionsCache(
+                    catalog.getId(), db.getFullName(), table.getName(), 
partitionNames);
+        ((HMSExternalTable) table).setEventUpdateTime(updateTime);
     }
 
     public void registerCatalogRefreshListener(Env env) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 3ff599d0ab4..6fcd495b67f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -521,13 +521,6 @@ public abstract class ExternalCatalog
         return null;
     }
 
-    /**
-     * External catalog has no cluster semantics.
-     */
-    protected static String getRealTableName(String tableName) {
-        return ClusterNamespace.getNameFromFullName(tableName);
-    }
-
     public static ExternalCatalog read(DataInput in) throws IOException {
         String json = Text.readString(in);
         return GsonUtils.GSON.fromJson(json, ExternalCatalog.class);
@@ -546,9 +539,6 @@ public abstract class ExternalCatalog
             db.setTableExtCatalog(this);
         }
         objectCreated = false;
-        if (this instanceof HMSExternalCatalog) {
-            ((HMSExternalCatalog) this).setLastSyncedEventId(-1L);
-        }
         // TODO: This code is to compatible with older version of metadata.
         //  Could only remove after all users upgrate to the new version.
         if (logType == null) {
@@ -569,11 +559,11 @@ public abstract class ExternalCatalog
         dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), 
db.getId());
     }
 
-    public void dropDatabaseForReplay(String dbName) {
+    public void dropDatabase(String dbName) {
         throw new NotImplementedException("dropDatabase not implemented");
     }
 
-    public void createDatabaseForReplay(long dbId, String dbName) {
+    public void createDatabase(long dbId, String dbName) {
         throw new NotImplementedException("createDatabase not implemented");
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaIdMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaIdMgr.java
new file mode 100644
index 00000000000..621c25b3698
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaIdMgr.java
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+/**
+ * <pre>
+ * ExternalMetaIdMgr is responsible for managing external meta ids.
+ * Now it just manages the external meta ids of hms events,
+ * but it will be extended to manage other external meta ids in the future.
+ * </pre>
+ * TODO: remove InitCatalogLog and InitDatabaseLog, manage external meta ids 
at ExternalMetaIdMgr
+ */
+public class ExternalMetaIdMgr {
+
+    private static final Logger LOG = 
LogManager.getLogger(ExternalMetaIdMgr.class);
+
+    public static final long META_ID_FOR_NOT_EXISTS = -1L;
+
+    private final Map<Long, CtlMetaIdMgr> idToCtlMgr = Maps.newConcurrentMap();
+
+    public ExternalMetaIdMgr() {
+    }
+
+    // invoke this method only on master
+    public static long nextMetaId() {
+        return Env.getCurrentEnv().getNextId();
+    }
+
+    // return the db id of the specified db, -1 means not exists
+    public long getDbId(long catalogId, String dbName) {
+        DbMetaIdMgr dbMetaIdMgr = getDbMetaIdMgr(catalogId, dbName);
+        if (dbMetaIdMgr == null) {
+            return META_ID_FOR_NOT_EXISTS;
+        }
+        return dbMetaIdMgr.dbId;
+    }
+
+    // return the tbl id of the specified tbl, -1 means not exists
+    public long getTblId(long catalogId, String dbName, String tblName) {
+        TblMetaIdMgr tblMetaIdMgr = getTblMetaIdMgr(catalogId, dbName, 
tblName);
+        if (tblMetaIdMgr == null) {
+            return META_ID_FOR_NOT_EXISTS;
+        }
+        return tblMetaIdMgr.tblId;
+    }
+
+    // return the partition id of the specified partition, -1 means not exists
+    public long getPartitionId(long catalogId, String dbName,
+                               String tblName, String partitionName) {
+        PartitionMetaIdMgr partitionMetaIdMgr = 
getPartitionMetaIdMgr(catalogId, dbName, tblName, partitionName);
+        if (partitionMetaIdMgr == null) {
+            return META_ID_FOR_NOT_EXISTS;
+        }
+        return partitionMetaIdMgr.partitionId;
+    }
+
+    private @Nullable DbMetaIdMgr getDbMetaIdMgr(long catalogId, String 
dbName) {
+        CtlMetaIdMgr ctlMetaIdMgr = idToCtlMgr.get(catalogId);
+        if (ctlMetaIdMgr == null) {
+            return null;
+        }
+        return ctlMetaIdMgr.dbNameToMgr.get(dbName);
+    }
+
+    private @Nullable TblMetaIdMgr getTblMetaIdMgr(long catalogId, String 
dbName, String tblName) {
+        DbMetaIdMgr dbMetaIdMgr = getDbMetaIdMgr(catalogId, dbName);
+        if (dbMetaIdMgr == null) {
+            return null;
+        }
+        return dbMetaIdMgr.tblNameToMgr.get(tblName);
+    }
+
+    private PartitionMetaIdMgr getPartitionMetaIdMgr(long catalogId, String 
dbName,
+                                                    String tblName, String 
partitionName) {
+        TblMetaIdMgr tblMetaIdMgr = getTblMetaIdMgr(catalogId, dbName, 
tblName);
+        if (tblMetaIdMgr == null) {
+            return null;
+        }
+        return tblMetaIdMgr.partitionNameToMgr.get(partitionName);
+    }
+
+    public void replayMetaIdMappingsLog(@NotNull MetaIdMappingsLog log) {
+        Preconditions.checkNotNull(log);
+        long catalogId = log.getCatalogId();
+        CtlMetaIdMgr ctlMetaIdMgr = idToCtlMgr.computeIfAbsent(catalogId, 
CtlMetaIdMgr::new);
+        for (MetaIdMappingsLog.MetaIdMapping mapping : 
log.getMetaIdMappings()) {
+            handleMetaIdMapping(mapping, ctlMetaIdMgr);
+        }
+        if (log.isFromHmsEvent()) {
+            CatalogIf<?> catalogIf = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(log.getCatalogId());
+            if (catalogIf != null) {
+                MetastoreEventsProcessor metastoreEventsProcessor = 
Env.getCurrentEnv().getMetastoreEventsProcessor();
+                metastoreEventsProcessor.updateMasterLastSyncedEventId(
+                            (HMSExternalCatalog) catalogIf, 
log.getLastSyncedEventId());
+            }
+        }
+    }
+
+    // no lock because the operations is serialized currently
+    private void handleMetaIdMapping(MetaIdMappingsLog.MetaIdMapping mapping, 
CtlMetaIdMgr ctlMetaIdMgr) {
+        MetaIdMappingsLog.OperationType opType = 
MetaIdMappingsLog.getOperationType(mapping.getOpType());
+        MetaIdMappingsLog.MetaObjectType objType = 
MetaIdMappingsLog.getMetaObjectType(mapping.getMetaObjType());
+        switch (opType) {
+            case ADD:
+                handleAddMetaIdMapping(mapping, ctlMetaIdMgr, objType);
+                break;
+
+            case DELETE:
+                handleDelMetaIdMapping(mapping, ctlMetaIdMgr, objType);
+                break;
+
+            default:
+                break;
+        }
+    }
+
+    private static void handleDelMetaIdMapping(MetaIdMappingsLog.MetaIdMapping 
mapping,
+                                               CtlMetaIdMgr ctlMetaIdMgr,
+                                               
MetaIdMappingsLog.MetaObjectType objType) {
+        TblMetaIdMgr tblMetaIdMgr;
+        DbMetaIdMgr dbMetaIdMgr;
+        switch (objType) {
+            case DATABASE:
+                ctlMetaIdMgr.dbNameToMgr.remove(mapping.getDbName());
+                break;
+
+            case TABLE:
+                dbMetaIdMgr = 
ctlMetaIdMgr.dbNameToMgr.get(mapping.getDbName());
+                if (dbMetaIdMgr != null) {
+                    dbMetaIdMgr.tblNameToMgr.remove(mapping.getTblName());
+                }
+                break;
+
+            case PARTITION:
+                dbMetaIdMgr = 
ctlMetaIdMgr.dbNameToMgr.get(mapping.getDbName());
+                if (dbMetaIdMgr != null) {
+                    tblMetaIdMgr = 
dbMetaIdMgr.tblNameToMgr.get(mapping.getTblName());
+                    if (tblMetaIdMgr != null) {
+                        
tblMetaIdMgr.partitionNameToMgr.remove(mapping.getPartitionName());
+                    }
+                }
+                break;
+
+            default:
+                break;
+        }
+    }
+
+    private static void handleAddMetaIdMapping(MetaIdMappingsLog.MetaIdMapping 
mapping,
+                                               CtlMetaIdMgr ctlMetaIdMgr,
+                                               
MetaIdMappingsLog.MetaObjectType objType) {
+        DbMetaIdMgr dbMetaIdMgr;
+        TblMetaIdMgr tblMetaIdMgr;
+        switch (objType) {
+            case DATABASE:
+                ctlMetaIdMgr.dbNameToMgr.put(mapping.getDbName(),
+                            new DbMetaIdMgr(mapping.getId(), 
mapping.getDbName()));
+                break;
+
+            case TABLE:
+                dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr
+                            .computeIfAbsent(mapping.getDbName(), 
DbMetaIdMgr::new);
+                dbMetaIdMgr.tblNameToMgr.put(mapping.getTblName(),
+                            new TblMetaIdMgr(mapping.getId(), 
mapping.getTblName()));
+                break;
+
+            case PARTITION:
+                dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr
+                            .computeIfAbsent(mapping.getDbName(), 
DbMetaIdMgr::new);
+                tblMetaIdMgr = dbMetaIdMgr.tblNameToMgr
+                            .computeIfAbsent(mapping.getTblName(), 
TblMetaIdMgr::new);
+                tblMetaIdMgr.partitionNameToMgr.put(mapping.getPartitionName(),
+                            new PartitionMetaIdMgr(mapping.getId(), 
mapping.getPartitionName()));
+                break;
+
+            default:
+                break;
+        }
+    }
+
+    public static class CtlMetaIdMgr {
+        protected final long catalogId;
+
+        protected CtlMetaIdMgr(long catalogId) {
+            this.catalogId = catalogId;
+        }
+
+        protected Map<String, DbMetaIdMgr> dbNameToMgr = 
Maps.newConcurrentMap();
+    }
+
+    public static class DbMetaIdMgr {
+        protected volatile long dbId = META_ID_FOR_NOT_EXISTS;
+        protected final String dbName;
+
+        protected DbMetaIdMgr(long dbId, String dbName) {
+            this.dbId = dbId;
+            this.dbName = dbName;
+        }
+
+        protected DbMetaIdMgr(String dbName) {
+            this.dbName = dbName;
+        }
+
+        protected Map<String, TblMetaIdMgr> tblNameToMgr = 
Maps.newConcurrentMap();
+    }
+
+    public static class TblMetaIdMgr {
+        protected volatile long tblId = META_ID_FOR_NOT_EXISTS;
+        protected final String tblName;
+
+        protected TblMetaIdMgr(long tblId, String tblName) {
+            this.tblId = tblId;
+            this.tblName = tblName;
+        }
+
+        protected TblMetaIdMgr(String tblName) {
+            this.tblName = tblName;
+        }
+
+        protected Map<String, PartitionMetaIdMgr> partitionNameToMgr = 
Maps.newConcurrentMap();
+    }
+
+    public static class PartitionMetaIdMgr {
+        protected volatile long partitionId = META_ID_FOR_NOT_EXISTS;
+        protected final String partitionName;
+
+        protected PartitionMetaIdMgr(long partitionId, String partitionName) {
+            this.partitionId = partitionId;
+            this.partitionName = partitionName;
+        }
+
+        protected PartitionMetaIdMgr(String partitionName) {
+            this.partitionName = partitionName;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index ae6b5f04738..dd6788ade2c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -23,23 +23,19 @@ import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.datasource.hive.HMSCachedClient;
 import org.apache.doris.datasource.hive.HMSCachedClientFactory;
-import 
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.HMSProperties;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -57,10 +53,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
 
     private static final int MIN_CLIENT_POOL_SIZE = 8;
     protected HMSCachedClient client;
-    // Record the latest synced event id when processing hive events
-    // Must set to -1 otherwise client.getNextNotification will throw exception
-    // Reference to https://github.com/apDdlache/doris/issues/18251
-    private long lastSyncedEventId = -1L;
+
     public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
     public static final String FILE_META_CACHE_TTL_SECOND = 
"file.meta.cache.ttl-second";
     // broker name for file split and query scan.
@@ -132,18 +125,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
         }
     }
 
-    public String getHiveMetastoreUris() {
-        return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, 
"");
-    }
-
-    public String getHiveVersion() {
-        return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, "");
-    }
-
-    protected List<String> listDatabaseNames() {
-        return client.getAllDatabases();
-    }
-
     @Override
     protected void initLocalObjectsImpl() {
         HiveConf hiveConf = null;
@@ -195,13 +176,13 @@ public class HMSExternalCatalog extends ExternalCatalog {
             hmsExternalDatabase.getTables().forEach(table -> 
names.add(table.getName()));
             return names;
         } else {
-            return client.getAllTables(getRealTableName(dbName));
+            return 
client.getAllTables(ClusterNamespace.getNameFromFullName(dbName));
         }
     }
 
     @Override
     public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
-        return client.tableExists(getRealTableName(dbName), tblName);
+        return 
client.tableExists(ClusterNamespace.getNameFromFullName(dbName), tblName);
     }
 
     @Override
@@ -211,7 +192,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
         if (hmsExternalDatabase == null) {
             return false;
         }
-        return 
hmsExternalDatabase.getTable(getRealTableName(tblName)).isPresent();
+        return 
hmsExternalDatabase.getTable(ClusterNamespace.getNameFromFullName(tblName)).isPresent();
     }
 
     public HMSCachedClient getClient() {
@@ -219,69 +200,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
         return client;
     }
 
-    public void setLastSyncedEventId(long lastSyncedEventId) {
-        this.lastSyncedEventId = lastSyncedEventId;
-    }
-
-    public NotificationEventResponse getNextEventResponse(HMSExternalCatalog 
hmsExternalCatalog)
-            throws MetastoreNotificationFetchException {
-        makeSureInitialized();
-        long currentEventId = getCurrentEventId();
-        if (lastSyncedEventId < 0) {
-            refreshCatalog(hmsExternalCatalog);
-            // invoke getCurrentEventId() and save the event id before refresh 
catalog to avoid missing events
-            // but set lastSyncedEventId to currentEventId only if there is 
not any problems when refreshing catalog
-            lastSyncedEventId = currentEventId;
-            LOG.info(
-                    "First pulling events on catalog [{}],refreshCatalog and 
init lastSyncedEventId,"
-                            + "lastSyncedEventId is [{}]",
-                    hmsExternalCatalog.getName(), lastSyncedEventId);
-            return null;
-        }
-
-        LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is 
{},lastSyncedEventId is {}",
-                hmsExternalCatalog.getName(), currentEventId, 
lastSyncedEventId);
-        if (currentEventId == lastSyncedEventId) {
-            LOG.info("Event id not updated when pulling events on catalog 
[{}]", hmsExternalCatalog.getName());
-            return null;
-        }
-
-        try {
-            return client.getNextNotification(lastSyncedEventId, 
Config.hms_events_batch_size_per_rpc, null);
-        } catch (MetastoreNotificationFetchException e) {
-            // Need a fallback to handle this because this error state can not 
be recovered until restarting FE
-            if (StringUtils.isNotEmpty(e.getMessage())
-                        && 
e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) {
-                refreshCatalog(hmsExternalCatalog);
-                // set lastSyncedEventId to currentEventId after refresh 
catalog successfully
-                lastSyncedEventId = currentEventId;
-                LOG.warn("Notification events are missing, maybe an event can 
not be handled "
-                        + "or processing rate is too low, fallback to refresh 
the catalog");
-                return null;
-            }
-            throw e;
-        }
-    }
-
-    private void refreshCatalog(HMSExternalCatalog hmsExternalCatalog) {
-        CatalogLog log = new CatalogLog();
-        log.setCatalogId(hmsExternalCatalog.getId());
-        log.setInvalidCache(true);
-        Env.getCurrentEnv().getCatalogMgr().refreshCatalog(log);
-    }
-
-    private long getCurrentEventId() {
-        makeSureInitialized();
-        CurrentNotificationEventId currentNotificationEventId = 
client.getCurrentNotificationEventId();
-        if (currentNotificationEventId == null) {
-            LOG.warn("Get currentNotificationEventId is null");
-            return -1;
-        }
-        return currentNotificationEventId.getEventId();
-    }
-
     @Override
-    public void dropDatabaseForReplay(String dbName) {
+    public void dropDatabase(String dbName) {
         LOG.debug("drop database [{}]", dbName);
         Long dbId = dbNameToId.remove(dbName);
         if (dbId == null) {
@@ -291,7 +211,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
     }
 
     @Override
-    public void createDatabaseForReplay(long dbId, String dbName) {
+    public void createDatabase(long dbId, String dbName) {
         LOG.debug("create database [{}]", dbName);
         dbNameToId.put(dbName, dbId);
         ExternalDatabase<? extends ExternalTable> db = getDbForInit(dbName, 
dbId, logType);
@@ -317,4 +237,17 @@ public class HMSExternalCatalog extends ExternalCatalog {
             catalogProperty.addProperty(PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, 
"true");
         }
     }
+
+    public String getHiveMetastoreUris() {
+        return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, 
"");
+    }
+
+    public String getHiveVersion() {
+        return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, "");
+    }
+
+    protected List<String> listDatabaseNames() {
+        return client.getAllDatabases();
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java
deleted file mode 100644
index 2f462b551c7..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
-import org.apache.doris.persist.gson.GsonUtils;
-
-import com.google.gson.annotations.SerializedName;
-import lombok.Data;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-@Data
-public class InitTableLog implements Writable {
-    enum Type {
-        HMS,
-        ES,
-        UNKNOWN;
-    }
-
-    @SerializedName(value = "catalogId")
-    private long catalogId;
-
-    @SerializedName(value = "dbId")
-    private long dbId;
-
-    @SerializedName(value = "tableId")
-    private long tableId;
-
-    @SerializedName(value = "type")
-    private Type type;
-
-    @SerializedName(value = "schema")
-    protected volatile List<Column> schema;
-
-    public InitTableLog() {}
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        Text.writeString(out, GsonUtils.GSON.toJson(this));
-    }
-
-    public static InitTableLog read(DataInput in) throws IOException {
-        String json = Text.readString(in);
-        return GsonUtils.GSON.fromJson(json, InitTableLog.class);
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaIdMappingsLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaIdMappingsLog.java
new file mode 100644
index 00000000000..629b4d13a40
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaIdMappingsLog.java
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+import lombok.Getter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Data
+public class MetaIdMappingsLog implements Writable {
+
+    public static final short OPERATION_TYPE_IGNORE = 0;
+    public static final short OPERATION_TYPE_ADD = 1;
+    public static final short OPERATION_TYPE_DELETE = 2;
+
+    public static final short META_OBJECT_TYPE_IGNORE = 0;
+    public static final short META_OBJECT_TYPE_DATABASE = 1;
+    public static final short META_OBJECT_TYPE_TABLE = 2;
+    public static final short META_OBJECT_TYPE_PARTITION = 3;
+
+    @SerializedName(value = "ctlId")
+    private long catalogId = -1L;
+
+    @SerializedName(value = "fromEvent")
+    private boolean fromHmsEvent = false;
+
+    // The synced event id of master
+    @SerializedName(value = "lastEventId")
+    private long lastSyncedEventId = -1L;
+
+    @SerializedName(value = "metaIdMappings")
+    private List<MetaIdMapping> metaIdMappings = Lists.newLinkedList();
+
+    public MetaIdMappingsLog() {
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(catalogId, lastSyncedEventId,
+                    metaIdMappings == null ? 0 : 
Arrays.hashCode(metaIdMappings.toArray()));
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof MetaIdMappingsLog)) {
+            return false;
+        }
+        return Objects.equals(this.catalogId, ((MetaIdMappingsLog) 
obj).catalogId)
+                    && Objects.equals(this.fromHmsEvent, ((MetaIdMappingsLog) 
obj).fromHmsEvent)
+                    && Objects.equals(this.lastSyncedEventId, 
((MetaIdMappingsLog) obj).lastSyncedEventId)
+                    && Objects.equals(this.metaIdMappings, 
((MetaIdMappingsLog) obj).metaIdMappings);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static MetaIdMappingsLog read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, MetaIdMappingsLog.class);
+    }
+
+    public void addMetaIdMapping(MetaIdMapping metaIdMapping) {
+        this.metaIdMappings.add(metaIdMapping);
+    }
+
+    public void addMetaIdMappings(List<MetaIdMapping> metaIdMappings) {
+        this.metaIdMappings.addAll(metaIdMappings);
+    }
+
+    public static OperationType getOperationType(short opType) {
+        switch (opType) {
+            case OPERATION_TYPE_ADD:
+                return OperationType.ADD;
+            case OPERATION_TYPE_DELETE:
+                return OperationType.DELETE;
+            default:
+                return OperationType.IGNORE;
+        }
+    }
+
+    public static MetaObjectType getMetaObjectType(short metaObjType) {
+        switch (metaObjType) {
+            case META_OBJECT_TYPE_DATABASE:
+                return MetaObjectType.DATABASE;
+            case META_OBJECT_TYPE_TABLE:
+                return MetaObjectType.TABLE;
+            case META_OBJECT_TYPE_PARTITION:
+                return MetaObjectType.PARTITION;
+            default:
+                return MetaObjectType.IGNORE;
+        }
+    }
+
+    @Getter
+    public static class MetaIdMapping {
+
+        @SerializedName(value = "opType")
+        private short opType;
+        @SerializedName(value = "metaObjType")
+        private short metaObjType;
+        // name of Database
+        @SerializedName(value = "dbName")
+        private String dbName;
+        // name of Table
+        @SerializedName(value = "tblName")
+        private String tblName;
+        // name of Partition
+        @SerializedName(value = "pName")
+        private String partitionName;
+        // id of Database/Table/Partition
+        @SerializedName(value = "id")
+        private long id;
+
+        public MetaIdMapping() {}
+
+        public MetaIdMapping(short opType,
+                             short metaObjType,
+                             String dbName,
+                             String tblName,
+                             String partitionName,
+                             long id) {
+            this.opType = opType;
+            this.metaObjType = metaObjType;
+            this.dbName = dbName;
+            this.tblName = tblName;
+            this.partitionName = partitionName;
+            this.id = id;
+        }
+
+        public MetaIdMapping(short opType,
+                             short metaObjType,
+                             String dbName,
+                             String tblName,
+                             String partitionName) {
+            this.opType = opType;
+            this.metaObjType = metaObjType;
+            this.dbName = dbName;
+            this.tblName = tblName;
+            this.partitionName = partitionName;
+            this.id = -1L;
+        }
+
+        public MetaIdMapping(short opType,
+                             short metaObjType,
+                             String dbName,
+                             String tblName,
+                             long id) {
+            this.opType = opType;
+            this.metaObjType = metaObjType;
+            this.dbName = dbName;
+            this.tblName = tblName;
+            this.partitionName = null;
+            this.id = id;
+        }
+
+        public MetaIdMapping(short opType,
+                             short metaObjType,
+                             String dbName,
+                             String tblName) {
+            this.opType = opType;
+            this.metaObjType = metaObjType;
+            this.dbName = dbName;
+            this.tblName = tblName;
+            this.partitionName = null;
+            this.id = -1L;
+        }
+
+        public MetaIdMapping(short opType,
+                             short metaObjType,
+                             String dbName,
+                             long id) {
+            this.opType = opType;
+            this.metaObjType = metaObjType;
+            this.dbName = dbName;
+            this.tblName = null;
+            this.partitionName = null;
+            this.id = id;
+        }
+
+        public MetaIdMapping(short opType,
+                             short metaObjType,
+                             String dbName) {
+            this.opType = opType;
+            this.metaObjType = metaObjType;
+            this.dbName = dbName;
+            this.tblName = null;
+            this.partitionName = null;
+            this.id = -1L;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(opType, metaObjType, dbName, tblName, 
partitionName, id);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof MetaIdMapping)) {
+                return false;
+            }
+            return Objects.equals(this.opType, ((MetaIdMapping) obj).opType)
+                        && Objects.equals(this.metaObjType, ((MetaIdMapping) 
obj).metaObjType)
+                        && Objects.equals(this.dbName, ((MetaIdMapping) 
obj).dbName)
+                        && Objects.equals(this.tblName, ((MetaIdMapping) 
obj).tblName)
+                        && Objects.equals(this.partitionName, ((MetaIdMapping) 
obj).partitionName)
+                        && Objects.equals(this.id, ((MetaIdMapping) obj).id);
+        }
+
+    }
+
+    public enum OperationType {
+        IGNORE(OPERATION_TYPE_IGNORE),
+        // Add a Database/Table/Partition
+        ADD(OPERATION_TYPE_ADD),
+        // Delete Database/Table/Partition
+        DELETE(OPERATION_TYPE_DELETE);
+
+        private final short opType;
+
+        OperationType(short opType) {
+            this.opType = opType;
+        }
+
+        public short getOperationType() {
+            return opType;
+        }
+    }
+
+    public enum MetaObjectType {
+        IGNORE(META_OBJECT_TYPE_IGNORE),
+        DATABASE(META_OBJECT_TYPE_DATABASE),
+        TABLE(META_OBJECT_TYPE_TABLE),
+        PARTITION(META_OBJECT_TYPE_PARTITION);
+
+        private final short metaObjType;
+
+        MetaObjectType(short metaObjType) {
+            this.metaObjType = metaObjType;
+        }
+
+        public short getMetaObjectType() {
+            return metaObjType;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
index e1dacbd0b29..ffc7b95ff59 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
@@ -20,8 +20,11 @@ package org.apache.doris.datasource.hive.event;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.ExternalMetaIdMgr;
+import org.apache.doris.datasource.MetaIdMappingsLog;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.common.FileUtils;
@@ -109,4 +112,17 @@ public class AddPartitionEvent extends 
MetastorePartitionEvent {
                     debugString("Failed to process event"), e);
         }
     }
+
+    @Override
+    protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() 
{
+        List<MetaIdMappingsLog.MetaIdMapping> metaIdMappings = 
Lists.newArrayList();
+        for (String partitionName : this.getAllPartitionNames()) {
+            MetaIdMappingsLog.MetaIdMapping metaIdMapping = new 
MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_ADD,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION,
+                        dbName, tblName, partitionName, 
ExternalMetaIdMgr.nextMetaId());
+            metaIdMappings.add(metaIdMapping);
+        }
+        return ImmutableList.copyOf(metaIdMappings);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
index 6de71fbbc59..c69812a22b3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
@@ -100,7 +100,8 @@ public class AlterTableEvent extends MetastoreTableEvent {
         Env.getCurrentEnv().getCatalogMgr()
                 .dropExternalTable(tableBefore.getDbName(), 
tableBefore.getTableName(), catalogName, true);
         Env.getCurrentEnv().getCatalogMgr()
-                .createExternalTableFromEvent(tableAfter.getDbName(), 
tableAfter.getTableName(), catalogName, true);
+                .createExternalTableFromEvent(
+                            tableAfter.getDbName(), tableAfter.getTableName(), 
catalogName, eventTime, true);
     }
 
     private void processRename() throws DdlException {
@@ -118,7 +119,8 @@ public class AlterTableEvent extends MetastoreTableEvent {
         Env.getCurrentEnv().getCatalogMgr()
                 .dropExternalTable(tableBefore.getDbName(), 
tableBefore.getTableName(), catalogName, true);
         Env.getCurrentEnv().getCatalogMgr()
-                .createExternalTableFromEvent(tableAfter.getDbName(), 
tableAfter.getTableName(), catalogName, true);
+                .createExternalTableFromEvent(
+                            tableAfter.getDbName(), tableAfter.getTableName(), 
catalogName, eventTime, true);
 
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
index 42d813319cc..d79d23824ab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
@@ -20,8 +20,11 @@ package org.apache.doris.datasource.hive.event;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.ExternalMetaIdMgr;
+import org.apache.doris.datasource.MetaIdMappingsLog;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
@@ -59,4 +62,13 @@ public class CreateDatabaseEvent extends MetastoreEvent {
                     debugString("Failed to process event"), e);
         }
     }
+
+    @Override
+    protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() 
{
+        MetaIdMappingsLog.MetaIdMapping metaIdMapping = new 
MetaIdMappingsLog.MetaIdMapping(
+                    MetaIdMappingsLog.OPERATION_TYPE_ADD,
+                    MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
+                    dbName, ExternalMetaIdMgr.nextMetaId());
+        return ImmutableList.of(metaIdMapping);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
index 4c3615fbda8..246ce8626f4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
@@ -19,8 +19,11 @@ package org.apache.doris.datasource.hive.event;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.ExternalMetaIdMgr;
+import org.apache.doris.datasource.MetaIdMappingsLog;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -77,10 +80,19 @@ public class CreateTableEvent extends MetastoreTableEvent {
         try {
             infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", 
catalogName, dbName, tblName);
             Env.getCurrentEnv().getCatalogMgr()
-                    .createExternalTableFromEvent(dbName, 
hmsTbl.getTableName(), catalogName, true);
+                    .createExternalTableFromEvent(dbName, 
hmsTbl.getTableName(), catalogName, eventTime, true);
         } catch (DdlException e) {
             throw new MetastoreNotificationException(
                     debugString("Failed to process event"), e);
         }
     }
+
+    @Override
+    protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() 
{
+        MetaIdMappingsLog.MetaIdMapping metaIdMapping = new 
MetaIdMappingsLog.MetaIdMapping(
+                    MetaIdMappingsLog.OPERATION_TYPE_ADD,
+                    MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
+                    dbName, tblName, ExternalMetaIdMgr.nextMetaId());
+        return ImmutableList.of(metaIdMapping);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
index 3481f832fe0..ca69e6f14d0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
@@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.MetaIdMappingsLog;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
@@ -59,4 +61,13 @@ public class DropDatabaseEvent extends MetastoreEvent {
                     debugString("Failed to process event"), e);
         }
     }
+
+    @Override
+    protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() 
{
+        MetaIdMappingsLog.MetaIdMapping metaIdMapping = new 
MetaIdMappingsLog.MetaIdMapping(
+                    MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                    MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
+                    dbName);
+        return ImmutableList.of(metaIdMapping);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
index f71f44cf5ab..dd443010289 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
@@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.MetaIdMappingsLog;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -124,7 +126,7 @@ public class DropPartitionEvent extends 
MetastorePartitionEvent {
             return false;
         }
 
-        // `that` event can be batched if this event's partitions contains all 
of the partitions which `that` event has
+        // `that` event can be batched if this event's partitions contains all 
the partitions which `that` event has
         // else just remove `that` event's relevant partitions
         for (String partitionName : getAllPartitionNames()) {
             if (thatPartitionEvent instanceof AddPartitionEvent) {
@@ -136,4 +138,17 @@ public class DropPartitionEvent extends 
MetastorePartitionEvent {
 
         return 
getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames());
     }
+
+    @Override
+    protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() 
{
+        List<MetaIdMappingsLog.MetaIdMapping> metaIdMappings = 
Lists.newArrayList();
+        for (String partitionName : this.getAllPartitionNames()) {
+            MetaIdMappingsLog.MetaIdMapping metaIdMapping = new 
MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
+                        dbName, tblName, partitionName);
+            metaIdMappings.add(metaIdMapping);
+        }
+        return ImmutableList.copyOf(metaIdMappings);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
index c333506cad2..0f62e246082 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
@@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.MetaIdMappingsLog;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
@@ -89,14 +91,23 @@ public class DropTableEvent extends MetastoreTableEvent {
             return false;
         }
 
-        /**
+        /*
          * Check if `that` event is a rename event, a rename event can not be 
batched
          * because the process of `that` event will change the reference 
relation of this table,
          * otherwise it can be batched because this event is a drop-table event
          * and the process of this event will drop the whole table,
          * and `that` event must be a MetastoreTableEvent event otherwise 
`isSameTable` will return false
-         * */
+         */
         MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that;
         return !thatTblEvent.willChangeTableName();
     }
+
+    @Override
+    protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() 
{
+        MetaIdMappingsLog.MetaIdMapping metaIdMapping = new 
MetaIdMappingsLog.MetaIdMapping(
+                    MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                    MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
+                    dbName, tblName);
+        return ImmutableList.of(metaIdMapping);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
index f9771562ed4..a9d165b4d03 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
@@ -17,8 +17,10 @@
 
 package org.apache.doris.datasource.hive.event;
 
+import org.apache.doris.datasource.MetaIdMappingsLog;
 import org.apache.doris.datasource.hive.HMSCachedClient;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -227,6 +229,13 @@ public abstract class MetastoreEvent {
         return name.toString();
     }
 
+    /**
+     * Create a MetaIdMapping list from the event if the event is a 
create/add/drop event
+     */
+    protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() 
{
+        return ImmutableList.of();
+    }
+
     @Override
     public String toString() {
         return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
index aabc562dba1..a3ba092703b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
@@ -18,7 +18,9 @@
 
 package org.apache.doris.datasource.hive.event;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.MetaIdMappingsLog;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -77,23 +79,39 @@ public class MetastoreEventFactory implements EventFactory {
         for (NotificationEvent event : events) {
             
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, 
catalogName));
         }
-        return createBatchEvents(catalogName, metastoreEvents);
+        List<MetastoreEvent> mergedEvents = mergeEvents(catalogName, 
metastoreEvents);
+        if (Env.getCurrentEnv().isMaster()) {
+            logMetaIdMappings(hmsExternalCatalog.getId(), 
events.get(events.size() - 1).getEventId(), mergedEvents);
+        }
+        return mergedEvents;
+    }
+
+    private void logMetaIdMappings(long catalogId, long lastSyncedEventId, 
List<MetastoreEvent> mergedEvents) {
+        MetaIdMappingsLog log = new MetaIdMappingsLog();
+        log.setCatalogId(catalogId);
+        log.setFromHmsEvent(true);
+        log.setLastSyncedEventId(lastSyncedEventId);
+        for (MetastoreEvent event : mergedEvents) {
+            log.addMetaIdMappings(event.transferToMetaIdMappings());
+        }
+        
Env.getCurrentEnv().getExternalMetaIdMgr().replayMetaIdMappingsLog(log);
+        Env.getCurrentEnv().getEditLog().logMetaIdMappingsLog(log);
     }
 
     /**
      * Merge events to reduce the cost time on event processing, currently 
mainly handles MetastoreTableEvent
      * because merge MetastoreTableEvent is simple and cost-effective.
      * For example, consider there are some events as following:
-     *
+     * <pre>
      *    event1: alter table db1.t1 add partition p1;
      *    event2: alter table db1.t1 drop partition p2;
      *    event3: alter table db1.t2 add partition p3;
      *    event4: alter table db2.t3 rename to t4;
      *    event5: drop table db1.t1;
-     *
+     * </pre>
      * Only `event3 event4 event5` will be reserved and other events will be 
skipped.
      * */
-    public List<MetastoreEvent> createBatchEvents(String catalogName, 
List<MetastoreEvent> events) {
+    public List<MetastoreEvent> mergeEvents(String catalogName, 
List<MetastoreEvent> events) {
         List<MetastoreEvent> eventsCopy = Lists.newArrayList(events);
         Map<MetastoreTableEvent.TableKey, List<Integer>> indexMap = 
Maps.newLinkedHashMap();
         for (int i = 0; i < events.size(); i++) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
index 622d84428fa..28793aecf21 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
@@ -18,13 +18,22 @@
 
 package org.apache.doris.datasource.hive.event;
 
+import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.HMSClientException;
 import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.MasterOpExecutor;
+import org.apache.doris.qe.OriginStatement;
 
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
@@ -35,6 +44,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A metastore event is a instance of the class
@@ -68,6 +78,13 @@ public class MetastoreEventsProcessor extends MasterDaemon {
     // event factory which is used to get or create MetastoreEvents
     private final MetastoreEventFactory metastoreEventFactory;
 
+    // manager the lastSyncedEventId of hms catalogs
+    // use HashMap is fine because all operations are in one thread
+    private final Map<Long, Long> lastSyncedEventIdMap = Maps.newHashMap();
+
+    // manager the masterLastSyncedEventId of hms catalogs
+    private final Map<Long, Long> masterLastSyncedEventIdMap = 
Maps.newHashMap();
+
     private boolean isRunning;
 
     public MetastoreEventsProcessor() {
@@ -76,13 +93,56 @@ public class MetastoreEventsProcessor extends MasterDaemon {
         this.isRunning = false;
     }
 
+    @Override
+    protected void runAfterCatalogReady() {
+        if (isRunning) {
+            LOG.warn("Last task not finished,ignore current task.");
+            return;
+        }
+        isRunning = true;
+        try {
+            realRun();
+        } catch (Exception ex) {
+            LOG.warn("Task failed", ex);
+        }
+        isRunning = false;
+    }
+
+    private void realRun() {
+        List<Long> catalogIds = 
Env.getCurrentEnv().getCatalogMgr().getCatalogIds();
+        for (Long catalogId : catalogIds) {
+            CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+            if (catalog instanceof HMSExternalCatalog) {
+                HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) 
catalog;
+                try {
+                    List<NotificationEvent> events = 
getNextHMSEvents(hmsExternalCatalog);
+                    if (!events.isEmpty()) {
+                        LOG.info("Events size are {} on catalog [{}]", 
events.size(),
+                                hmsExternalCatalog.getName());
+                        processEvents(events, hmsExternalCatalog);
+                    }
+                } catch (MetastoreNotificationFetchException e) {
+                    LOG.warn("Failed to fetch hms events on {}. msg: ", 
hmsExternalCatalog.getName(), e);
+                } catch (Exception ex) {
+                    LOG.warn("Failed to process hive metastore [{}] events .",
+                            hmsExternalCatalog.getName(), ex);
+                }
+            }
+        }
+    }
+
     /**
      * Fetch the next batch of NotificationEvents from metastore. The default 
batch size is
      * <code>{@link Config#hms_events_batch_size_per_rpc}</code>
      */
-    private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog 
hmsExternalCatalog) {
+    private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog 
hmsExternalCatalog) throws Exception {
         LOG.debug("Start to pull events on catalog [{}]", 
hmsExternalCatalog.getName());
-        NotificationEventResponse response = 
hmsExternalCatalog.getNextEventResponse(hmsExternalCatalog);
+        NotificationEventResponse response;
+        if (Env.getCurrentEnv().isMaster()) {
+            response = getNextEventResponseForMaster(hmsExternalCatalog);
+        } else {
+            response = getNextEventResponseForSlave(hmsExternalCatalog);
+        }
 
         if (response == null) {
             return Collections.emptyList();
@@ -99,11 +159,11 @@ public class MetastoreEventsProcessor extends MasterDaemon 
{
                         && hmsClientException.getCause() instanceof 
NoSuchObjectException) {
                     LOG.warn(event.debugString("Failed to process event and 
skip"), hmsClientException);
                 } else {
-                    hmsExternalCatalog.setLastSyncedEventId(event.getEventId() 
- 1);
+                    updateLastSyncedEventId(hmsExternalCatalog, 
event.getEventId() - 1);
                     throw hmsClientException;
                 }
             } catch (Exception e) {
-                hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 
1);
+                updateLastSyncedEventId(hmsExternalCatalog, event.getEventId() 
- 1);
                 throw e;
             }
         }
@@ -116,45 +176,142 @@ public class MetastoreEventsProcessor extends 
MasterDaemon {
         //transfer
         List<MetastoreEvent> metastoreEvents = 
metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog);
         doExecute(metastoreEvents, hmsExternalCatalog);
-        hmsExternalCatalog.setLastSyncedEventId(events.get(events.size() - 
1).getEventId());
+        updateLastSyncedEventId(hmsExternalCatalog, events.get(events.size() - 
1).getEventId());
     }
 
-    @Override
-    protected void runAfterCatalogReady() {
-        if (isRunning) {
-            LOG.warn("Last task not finished,ignore current task.");
-            return;
+    private NotificationEventResponse 
getNextEventResponseForMaster(HMSExternalCatalog hmsExternalCatalog)
+            throws MetastoreNotificationFetchException {
+        long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog);
+        long currentEventId = getCurrentHmsEventId(hmsExternalCatalog);
+        if (lastSyncedEventId < 0) {
+            refreshCatalogForMaster(hmsExternalCatalog);
+            // invoke getCurrentEventId() and save the event id before refresh 
catalog to avoid missing events
+            // but set lastSyncedEventId to currentEventId only if there is 
not any problems when refreshing catalog
+            updateLastSyncedEventId(hmsExternalCatalog, currentEventId);
+            LOG.info(
+                    "First pulling events on catalog [{}],refreshCatalog and 
init lastSyncedEventId,"
+                            + "lastSyncedEventId is [{}]",
+                    hmsExternalCatalog.getName(), lastSyncedEventId);
+            return null;
         }
-        isRunning = true;
+
+        LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {}, 
lastSyncedEventId is {}",
+                hmsExternalCatalog.getName(), currentEventId, 
lastSyncedEventId);
+        if (currentEventId == lastSyncedEventId) {
+            LOG.info("Event id not updated when pulling events on catalog 
[{}]", hmsExternalCatalog.getName());
+            return null;
+        }
+
         try {
-            realRun();
-        } catch (Exception ex) {
-            LOG.warn("Task failed", ex);
+            return 
hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId,
+                        Config.hms_events_batch_size_per_rpc, null);
+        } catch (MetastoreNotificationFetchException e) {
+            // Need a fallback to handle this because this error state can not 
be recovered until restarting FE
+            if (StringUtils.isNotEmpty(e.getMessage())
+                    && 
e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) {
+                refreshCatalogForMaster(hmsExternalCatalog);
+                // set lastSyncedEventId to currentEventId after refresh 
catalog successfully
+                updateLastSyncedEventId(hmsExternalCatalog, currentEventId);
+                LOG.warn("Notification events are missing, maybe an event can 
not be handled "
+                        + "or processing rate is too low, fallback to refresh 
the catalog");
+                return null;
+            }
+            throw e;
         }
-        isRunning = false;
     }
 
-    private void realRun() {
-        List<Long> catalogIds = 
Env.getCurrentEnv().getCatalogMgr().getCatalogIds();
-        for (Long catalogId : catalogIds) {
-            CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
-            if (catalog instanceof HMSExternalCatalog) {
-                HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) 
catalog;
-                try {
-                    List<NotificationEvent> events = 
getNextHMSEvents(hmsExternalCatalog);
-                    if (!events.isEmpty()) {
-                        LOG.info("Events size are {} on catalog [{}]", 
events.size(),
-                                hmsExternalCatalog.getName());
-                        processEvents(events, hmsExternalCatalog);
-                    }
-                } catch (MetastoreNotificationFetchException e) {
-                    LOG.warn("Failed to fetch hms events on {}. msg: ", 
hmsExternalCatalog.getName(), e);
-                } catch (Exception ex) {
-                    LOG.warn("Failed to process hive metastore [{}] events .",
-                            hmsExternalCatalog.getName(), ex);
-                }
+    private NotificationEventResponse 
getNextEventResponseForSlave(HMSExternalCatalog hmsExternalCatalog)
+                throws Exception {
+        long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog);
+        long masterLastSyncedEventId = 
getMasterLastSyncedEventId(hmsExternalCatalog);
+        // do nothing if masterLastSyncedEventId has not been synced
+        if (masterLastSyncedEventId == -1L) {
+            LOG.info("LastSyncedEventId of master has not been synced on 
catalog [{}]", hmsExternalCatalog.getName());
+            return null;
+        }
+        // do nothing if lastSyncedEventId is equals to masterLastSyncedEventId
+        if (lastSyncedEventId == masterLastSyncedEventId) {
+            LOG.info("Event id not updated when pulling events on catalog 
[{}]", hmsExternalCatalog.getName());
+            return null;
+        }
+
+        if (lastSyncedEventId < 0) {
+            refreshCatalogForSlave(hmsExternalCatalog);
+            // Use masterLastSyncedEventId to avoid missing events
+            updateLastSyncedEventId(hmsExternalCatalog, 
masterLastSyncedEventId);
+            LOG.info(
+                    "First pulling events on catalog [{}],refreshCatalog and 
init lastSyncedEventId,"
+                            + "lastSyncedEventId is [{}]",
+                    hmsExternalCatalog.getName(), lastSyncedEventId);
+            return null;
+        }
+
+        LOG.debug("Catalog [{}] getNextEventResponse, masterLastSyncedEventId 
is {}, lastSyncedEventId is {}",
+                hmsExternalCatalog.getName(), masterLastSyncedEventId, 
lastSyncedEventId);
+
+        // For slave FE nodes, only fetch events which id is lower than 
masterLastSyncedEventId
+        int maxEventSize = Math.min((int) (masterLastSyncedEventId - 
lastSyncedEventId),
+                Config.hms_events_batch_size_per_rpc);
+        try {
+            return 
hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, 
maxEventSize, null);
+        } catch (MetastoreNotificationFetchException e) {
+            // Need a fallback to handle this because this error state can not 
be recovered until restarting FE
+            if (StringUtils.isNotEmpty(e.getMessage())
+                    && 
e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) {
+                refreshCatalogForMaster(hmsExternalCatalog);
+                // set masterLastSyncedEventId to lastSyncedEventId after 
refresh catalog successfully
+                updateLastSyncedEventId(hmsExternalCatalog, 
masterLastSyncedEventId);
+                LOG.warn("Notification events are missing, maybe an event can 
not be handled "
+                        + "or processing rate is too low, fallback to refresh 
the catalog");
+                return null;
             }
+            throw e;
+        }
+    }
+
+    private long getCurrentHmsEventId(HMSExternalCatalog hmsExternalCatalog) {
+        CurrentNotificationEventId currentNotificationEventId = 
hmsExternalCatalog.getClient()
+                .getCurrentNotificationEventId();
+        if (currentNotificationEventId == null) {
+            LOG.warn("Get currentNotificationEventId is null");
+            return -1L;
         }
+        return currentNotificationEventId.getEventId();
+    }
+
+    private long getLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog) {
+        // Returns to -1 if not exists, otherwise client.getNextNotification 
will throw exception
+        // Reference to https://github.com/apDdlache/doris/issues/18251
+        return lastSyncedEventIdMap.getOrDefault(hmsExternalCatalog.getId(), 
-1L);
+    }
+
+    private void updateLastSyncedEventId(HMSExternalCatalog 
hmsExternalCatalog, long eventId) {
+        lastSyncedEventIdMap.put(hmsExternalCatalog.getId(), eventId);
+    }
+
+    private long getMasterLastSyncedEventId(HMSExternalCatalog 
hmsExternalCatalog) {
+        return 
masterLastSyncedEventIdMap.getOrDefault(hmsExternalCatalog.getId(), -1L);
+    }
+
+    public void updateMasterLastSyncedEventId(HMSExternalCatalog 
hmsExternalCatalog, long eventId) {
+        masterLastSyncedEventIdMap.put(hmsExternalCatalog.getId(), eventId);
+    }
+
+    private void refreshCatalogForMaster(HMSExternalCatalog 
hmsExternalCatalog) {
+        CatalogLog log = new CatalogLog();
+        log.setCatalogId(hmsExternalCatalog.getId());
+        log.setInvalidCache(true);
+        Env.getCurrentEnv().getCatalogMgr().replayRefreshCatalog(log);
+    }
+
+    private void refreshCatalogForSlave(HMSExternalCatalog hmsExternalCatalog) 
throws Exception {
+        // Transfer to master to refresh catalog
+        String sql = "REFRESH CATALOG " + hmsExternalCatalog.getName();
+        OriginStatement originStmt = new OriginStatement(sql, 0);
+        MasterOpExecutor masterOpExecutor = new MasterOpExecutor(originStmt, 
new ConnectContext(),
+                    RedirectStatus.FORWARD_WITH_SYNC, false);
+        LOG.debug("Transfer to master to refresh catalog, stmt: {}", sql);
+        masterOpExecutor.execute();
     }
 
     public static MessageDeserializer getMessageDeserializer(String 
messageFormat) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 1bedda0a7e9..d74c519407f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -42,7 +42,7 @@ import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.InitDatabaseLog;
-import org.apache.doris.datasource.InitTableLog;
+import org.apache.doris.datasource.MetaIdMappingsLog;
 import org.apache.doris.ha.MasterInfo;
 import org.apache.doris.insertoverwrite.InsertOverwriteLog;
 import org.apache.doris.job.base.AbstractJob;
@@ -746,19 +746,18 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
-            case OperationType.OP_INIT_EXTERNAL_TABLE: {
-                data = InitTableLog.read(in);
-                isRead = true;
-                break;
-            }
-            case OperationType.OP_REFRESH_EXTERNAL_DB:
+            case OperationType.OP_INIT_EXTERNAL_TABLE:
             case OperationType.OP_DROP_EXTERNAL_TABLE:
             case OperationType.OP_CREATE_EXTERNAL_TABLE:
             case OperationType.OP_DROP_EXTERNAL_DB:
             case OperationType.OP_CREATE_EXTERNAL_DB:
             case OperationType.OP_ADD_EXTERNAL_PARTITIONS:
             case OperationType.OP_DROP_EXTERNAL_PARTITIONS:
-            case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS:
+            case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: {
+                isRead = true;
+                break;
+            }
+            case OperationType.OP_REFRESH_EXTERNAL_DB:
             case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
                 data = ExternalObjectLog.read(in);
                 isRead = true;
@@ -906,6 +905,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_ADD_META_ID_MAPPINGS: {
+                data = MetaIdMappingsLog.read(in);
+                isRead = true;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 8c2424d4837..958277dd6b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -51,6 +51,7 @@ import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.MetaIdMappingsLog;
 import org.apache.doris.ha.MasterInfo;
 import org.apache.doris.insertoverwrite.InsertOverwriteLog;
 import org.apache.doris.job.base.AbstractJob;
@@ -1011,38 +1012,24 @@ public class EditLog {
                     break;
                 }
                 case OperationType.OP_DROP_EXTERNAL_TABLE: {
-                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
-                    env.getCatalogMgr().replayDropExternalTable(log);
                     break;
                 }
                 case OperationType.OP_CREATE_EXTERNAL_TABLE: {
-                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
-                    
env.getCatalogMgr().replayCreateExternalTableFromEvent(log);
                     break;
                 }
                 case OperationType.OP_DROP_EXTERNAL_DB: {
-                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
-                    env.getCatalogMgr().replayDropExternalDatabase(log);
                     break;
                 }
                 case OperationType.OP_CREATE_EXTERNAL_DB: {
-                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
-                    env.getCatalogMgr().replayCreateExternalDatabase(log);
                     break;
                 }
                 case OperationType.OP_ADD_EXTERNAL_PARTITIONS: {
-                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
-                    env.getCatalogMgr().replayAddExternalPartitions(log);
                     break;
                 }
                 case OperationType.OP_DROP_EXTERNAL_PARTITIONS: {
-                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
-                    env.getCatalogMgr().replayDropExternalPartitions(log);
                     break;
                 }
                 case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: {
-                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
-                    env.getCatalogMgr().replayRefreshExternalPartitions(log);
                     break;
                 }
                 case OperationType.OP_CREATE_WORKLOAD_GROUP: {
@@ -1165,6 +1152,10 @@ public class EditLog {
                     env.getBackupHandler().getRepoMgr().alterRepo(repository, 
true);
                     break;
                 }
+                case OperationType.OP_ADD_META_ID_MAPPINGS: {
+                    
env.getExternalMetaIdMgr().replayMetaIdMappingsLog((MetaIdMappingsLog) 
journal.getData());
+                    break;
+                }
                 case OperationType.OP_LOG_UPDATE_ROWS:
                 case OperationType.OP_LOG_NEW_PARTITION_LOADED:
                 case OperationType.OP_LOG_ALTER_COLUMN_STATS: {
@@ -1886,26 +1877,32 @@ public class EditLog {
         logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log);
     }
 
+    @Deprecated
     public void logDropExternalTable(ExternalObjectLog log) {
         logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log);
     }
 
+    @Deprecated
     public void logCreateExternalTable(ExternalObjectLog log) {
         logEdit(OperationType.OP_CREATE_EXTERNAL_TABLE, log);
     }
 
+    @Deprecated
     public void logDropExternalDatabase(ExternalObjectLog log) {
         logEdit(OperationType.OP_DROP_EXTERNAL_DB, log);
     }
 
+    @Deprecated
     public void logCreateExternalDatabase(ExternalObjectLog log) {
         logEdit(OperationType.OP_CREATE_EXTERNAL_DB, log);
     }
 
+    @Deprecated
     public void logAddExternalPartitions(ExternalObjectLog log) {
         logEdit(OperationType.OP_ADD_EXTERNAL_PARTITIONS, log);
     }
 
+    @Deprecated
     public void logDropExternalPartitions(ExternalObjectLog log) {
         logEdit(OperationType.OP_DROP_EXTERNAL_PARTITIONS, log);
     }
@@ -2005,6 +2002,10 @@ public class EditLog {
         logEdit(OperationType.OP_INSERT_OVERWRITE, log);
     }
 
+    public void logMetaIdMappingsLog(MetaIdMappingsLog log) {
+        logEdit(OperationType.OP_ADD_META_ID_MAPPINGS, log);
+    }
+
     public String getNotReadyReason() {
         if (journal == null) {
             return "journal is null";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 0945dc0f151..0868d7f371b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -288,12 +288,19 @@ public class OperationType {
     public static final short OP_ADD_CONSTRAINT = 346;
     public  static final short OP_DROP_CONSTRAINT = 347;
 
+    @Deprecated
     public static final short OP_DROP_EXTERNAL_TABLE = 350;
+    @Deprecated
     public static final short OP_DROP_EXTERNAL_DB = 351;
+    @Deprecated
     public static final short OP_CREATE_EXTERNAL_TABLE = 352;
+    @Deprecated
     public static final short OP_CREATE_EXTERNAL_DB = 353;
+    @Deprecated
     public static final short OP_ADD_EXTERNAL_PARTITIONS = 354;
+    @Deprecated
     public static final short OP_DROP_EXTERNAL_PARTITIONS = 355;
+    @Deprecated
     public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356;
 
     public static final short OP_ALTER_USER = 400;
@@ -364,6 +371,8 @@ public class OperationType {
 
     public static final short OP_LOG_ALTER_COLUMN_STATS = 464;
 
+    public static final short OP_ADD_META_ID_MAPPINGS = 470;
+
     /**
      * Get opcode name by op code.
      **/
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalMetaIdMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalMetaIdMgrTest.java
new file mode 100644
index 00000000000..12e018a4cff
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalMetaIdMgrTest.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ExternalMetaIdMgrTest {
+
+    @Test
+    public void testReplayMetaIdMappingsLog() {
+        ExternalMetaIdMgr mgr = new ExternalMetaIdMgr();
+        MetaIdMappingsLog log1 = new MetaIdMappingsLog();
+        log1.setCatalogId(1L);
+        log1.setFromHmsEvent(false);
+        log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                    MetaIdMappingsLog.OPERATION_TYPE_ADD,
+                    MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
+                    "db1", ExternalMetaIdMgr.nextMetaId()));
+        mgr.replayMetaIdMappingsLog(log1);
+        Assertions.assertNotEquals(-1L, mgr.getDbId(1L, "db1"));
+
+        MetaIdMappingsLog log2 = new MetaIdMappingsLog();
+        log2.setCatalogId(1L);
+        log2.setFromHmsEvent(false);
+        log2.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                    MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                    MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
+                    "db1"));
+        mgr.replayMetaIdMappingsLog(log2);
+        Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1"));
+
+        MetaIdMappingsLog log3 = new MetaIdMappingsLog();
+        log3.setCatalogId(1L);
+        log3.setFromHmsEvent(false);
+        log3.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                    MetaIdMappingsLog.OPERATION_TYPE_ADD,
+                    MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
+                    "db1", "tbl1", ExternalMetaIdMgr.nextMetaId()));
+        mgr.replayMetaIdMappingsLog(log3);
+        Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1"));
+        Assertions.assertNotEquals(-1L, mgr.getTblId(1L, "db1", "tbl1"));
+
+        MetaIdMappingsLog log4 = new MetaIdMappingsLog();
+        log4.setCatalogId(1L);
+        log4.setFromHmsEvent(false);
+        log4.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                    MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                    MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
+                    "db1", "tbl1"));
+        log4.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                    MetaIdMappingsLog.OPERATION_TYPE_ADD,
+                    MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION,
+                    "db1", "tbl1", "p1", ExternalMetaIdMgr.nextMetaId()));
+        mgr.replayMetaIdMappingsLog(log4);
+        Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1"));
+        Assertions.assertEquals(-1L, mgr.getTblId(1L, "db1", "tbl1"));
+        Assertions.assertNotEquals(-1L, mgr.getPartitionId(1L, "db1", "tbl1", 
"p1"));
+    }
+
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/MetaIdMappingsLogTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/MetaIdMappingsLogTest.java
new file mode 100644
index 00000000000..fec57c29eda
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/MetaIdMappingsLogTest.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class MetaIdMappingsLogTest {
+
+    @Test
+    public void testSerialization() throws Exception {
+        // 1. Write objects to file
+        MetaIdMappingsLog log1 = new MetaIdMappingsLog();
+        Path path = Files.createFile(Paths.get("./metaIdMappingsLogTest.txt"));
+        try (DataOutputStream dos = new 
DataOutputStream(Files.newOutputStream(path))) {
+            log1.setFromHmsEvent(true);
+            log1.setLastSyncedEventId(-1L);
+            log1.setCatalogId(1L);
+            log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_ADD,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
+                        "db1", ExternalMetaIdMgr.nextMetaId()));
+            log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
+                        "db1", "tbl1", ExternalMetaIdMgr.nextMetaId()));
+            log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
+                        "db1", "tbl2", ExternalMetaIdMgr.nextMetaId()));
+            log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
+                        "db2"));
+            log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_ADD,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION,
+                        "db1", "tbl1", "p1", ExternalMetaIdMgr.nextMetaId()));
+            log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_ADD,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION,
+                        "db1", "tbl1", "p2", ExternalMetaIdMgr.nextMetaId()));
+            log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE,
+                        "db2"));
+            log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_TABLE,
+                        "db1", "tbl1", ExternalMetaIdMgr.nextMetaId()));
+            log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping(
+                        MetaIdMappingsLog.OPERATION_TYPE_DELETE,
+                        MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION,
+                        "db1", "tbl1", "p2", ExternalMetaIdMgr.nextMetaId()));
+            log1.write(dos);
+            dos.flush();
+        } catch (Throwable throwable) {
+            throwable.printStackTrace();
+            Files.deleteIfExists(path);
+            Assertions.fail();
+        }
+
+        // 2. Read objects from file
+        MetaIdMappingsLog log2;
+        try (DataInputStream dis = new 
DataInputStream(Files.newInputStream(path))) {
+            log2 = MetaIdMappingsLog.read(dis);
+            Assertions.assertEquals(log1, log2);
+        } catch (Throwable throwable) {
+            throwable.printStackTrace();
+            Assertions.fail();
+        } finally {
+            Files.deleteIfExists(path);
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
index c9e566dc9d8..136cac6b712 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
@@ -454,7 +454,7 @@ public class MetastoreEventFactoryTest {
             for (int j = 0; j < 1000; j++) {
                 events.add(producer.produceOneEvent(j));
             }
-            List<MetastoreEvent> mergedEvents = 
factory.createBatchEvents(testCtl, events);
+            List<MetastoreEvent> mergedEvents = factory.mergeEvents(testCtl, 
events);
 
             for (MetastoreEvent event : events) {
                 processEvent(validateCatalog, event);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to