This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 901f2711a3 [feature](multi-catalog) support hive metastore more events 
(#15702)
901f2711a3 is described below

commit 901f2711a3fdd1037653a1c8c25a346580b645ed
Author: zhangdong <493738...@qq.com>
AuthorDate: Mon Jan 16 14:16:12 2023 +0800

    [feature](multi-catalog) support hive metastore more events (#15702)
    
    support hive metastore more events
---
 .../org/apache/doris/catalog/RefreshManager.java   |  23 +-
 .../doris/catalog/external/ExternalDatabase.java   |   4 +
 .../catalog/external/HMSExternalDatabase.java      |  11 +
 .../main/java/org/apache/doris/common/Config.java  |   2 +-
 .../org/apache/doris/datasource/CatalogMgr.java    | 347 ++++++++++++++++++++-
 .../apache/doris/datasource/ExternalCatalog.java   |  20 ++
 .../doris/datasource/ExternalMetaCacheMgr.java     |  44 +++
 .../apache/doris/datasource/ExternalObjectLog.java |  10 +
 .../doris/datasource/HMSExternalCatalog.java       |  30 ++
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 193 +++++++++++-
 .../datasource/hive/event/AddPartitionEvent.java   |  88 ++++++
 .../{IgnoredEvent.java => AlterDatabaseEvent.java} |  19 +-
 .../datasource/hive/event/AlterPartitionEvent.java |  95 ++++++
 .../datasource/hive/event/AlterTableEvent.java     | 111 +++++++
 ...{IgnoredEvent.java => CreateDatabaseEvent.java} |  28 +-
 .../datasource/hive/event/CreateTableEvent.java    |  76 +++++
 .../{IgnoredEvent.java => DropDatabaseEvent.java}  |  28 +-
 .../datasource/hive/event/DropPartitionEvent.java  |  88 ++++++
 .../datasource/hive/event/DropTableEvent.java      |  39 +--
 .../doris/datasource/hive/event/IgnoredEvent.java  |   6 +-
 .../datasource/hive/event/MetastoreEvent.java      |  36 ++-
 .../hive/event/MetastoreEventFactory.java          |  33 +-
 .../org/apache/doris/journal/JournalEntity.java    |   6 +
 .../java/org/apache/doris/persist/EditLog.java     |  54 ++++
 .../org/apache/doris/persist/OperationType.java    |   6 +
 .../doris/planner/ListPartitionPrunerV2.java       |  77 +++--
 .../apache/doris/datasource/CatalogMgrTest.java    | 132 +++++++-
 27 files changed, 1464 insertions(+), 142 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index b8d6929994..ef95b3e5f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -56,7 +56,7 @@ public class RefreshManager {
             refreshInternalCtlIcebergTable(stmt, env);
         } else {
             // Process external catalog table refresh
-            refreshExternalCtlTable(dbName, tableName, catalog);
+            env.getCatalogMgr().refreshExternalTable(dbName, tableName, 
catalogName);
         }
         LOG.info("Successfully refresh table: {} from db: {}", tableName, 
dbName);
     }
@@ -146,25 +146,4 @@ public class RefreshManager {
                 stmt.getTableName(), "ICEBERG", icebergProperties, "");
         env.createTable(createTableStmt);
     }
-
-    private void refreshExternalCtlTable(String dbName, String tableName, 
CatalogIf catalog) throws DdlException {
-        if (!(catalog instanceof ExternalCatalog)) {
-            throw new DdlException("Only support refresh ExternalCatalog 
Tables");
-        }
-        DatabaseIf db = catalog.getDbNullable(dbName);
-        if (db == null) {
-            throw new DdlException("Database " + dbName + " does not exist in 
catalog " + catalog.getName());
-        }
-
-        TableIf table = db.getTableNullable(tableName);
-        if (table == null) {
-            throw new DdlException("Table " + tableName + " does not exist in 
db " + dbName);
-        }
-        
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), 
dbName, tableName);
-        ExternalObjectLog log = new ExternalObjectLog();
-        log.setCatalogId(catalog.getId());
-        log.setDbId(db.getId());
-        log.setTableId(table.getId());
-        Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index 65c027713e..e4efcac408 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -263,4 +263,8 @@ public class ExternalDatabase<T extends ExternalTable> 
implements DatabaseIf<T>,
     public void dropTable(String tableName) {
         throw new NotImplementedException();
     }
+
+    public void createTable(String tableName, long tableId) {
+        throw new NotImplementedException();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index a1f6bcddab..e379dd3c0d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -174,10 +174,21 @@ public class HMSExternalDatabase extends 
ExternalDatabase<HMSExternalTable> impl
     @Override
     public void dropTable(String tableName) {
         LOG.debug("drop table [{}]", tableName);
+        makeSureInitialized();
         Long tableId = tableNameToId.remove(tableName);
         if (tableId == null) {
             LOG.warn("drop table [{}] failed", tableName);
         }
         idToTbl.remove(tableId);
     }
+
+    @Override
+    public void createTable(String tableName, long tableId) {
+        LOG.debug("create table [{}]", tableName);
+        makeSureInitialized();
+        tableNameToId.put(tableName, tableId);
+        HMSExternalTable table = new HMSExternalTable(tableId, tableName, name,
+                (HMSExternalCatalog) extCatalog);
+        idToTbl.put(tableId, table);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index e611614f36..3d3ae9226a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1958,6 +1958,6 @@ public class Config extends ConfigBase {
      * HMS polling interval in milliseconds.
      */
     @ConfField(masterOnly = true)
-    public static int hms_events_polling_interval_ms = 20000;
+    public static int hms_events_polling_interval_ms = 10000;
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index 17b71aefaa..8054fca3e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -549,10 +549,47 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         }
     }
 
+    public void refreshExternalTable(String dbName, String tableName, String 
catalogName) throws DdlException {
+        CatalogIf catalog = nameToCatalog.get(catalogName);
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + 
catalogName);
+        }
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new DdlException("Only support refresh ExternalCatalog 
Tables");
+        }
+        DatabaseIf db = catalog.getDbNullable(dbName);
+        if (db == null) {
+            throw new DdlException("Database " + dbName + " does not exist in 
catalog " + catalog.getName());
+        }
+
+        TableIf table = db.getTableNullable(tableName);
+        if (table == null) {
+            throw new DdlException("Table " + tableName + " does not exist in 
db " + dbName);
+        }
+        
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), 
dbName, tableName);
+        ExternalObjectLog log = new ExternalObjectLog();
+        log.setCatalogId(catalog.getId());
+        log.setDbId(db.getId());
+        log.setTableId(table.getId());
+        Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
+    }
+
     public void replayRefreshExternalTable(ExternalObjectLog log) {
         ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+        if (catalog == null) {
+            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
+            return;
+        }
         ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+        if (db == null) {
+            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
+            return;
+        }
         ExternalTable table = db.getTableForReplay(log.getTableId());
+        if (table == null) {
+            LOG.warn("No table found with id:[{}], it may have been dropped.", 
log.getTableId());
+            return;
+        }
         Env.getCurrentEnv().getExtMetaCacheMgr()
                 .invalidateTableCache(catalog.getId(), db.getFullName(), 
table.getName());
     }
@@ -586,13 +623,321 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", 
log.getCatalogId(), log.getDbId(),
                 log.getTableId());
         ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+        if (catalog == null) {
+            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
+            return;
+        }
         ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+        if (db == null) {
+            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
+            return;
+        }
         ExternalTable table = db.getTableForReplay(log.getTableId());
-        db.dropTable(table.getName());
+        if (table == null) {
+            LOG.warn("No table found with id:[{}], it may have been dropped.", 
log.getTableId());
+            return;
+        }
+        db.writeLock();
+        try {
+            db.dropTable(table.getName());
+        } finally {
+            db.writeUnlock();
+        }
+
         Env.getCurrentEnv().getExtMetaCacheMgr()
                 .invalidateTableCache(catalog.getId(), db.getFullName(), 
table.getName());
     }
 
+    public boolean externalTableExistInLocal(String dbName, String tableName, 
String catalogName) throws DdlException {
+        CatalogIf catalog = nameToCatalog.get(catalogName);
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + 
catalogName);
+        }
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new DdlException("Only support ExternalCatalog Tables");
+        }
+        return ((ExternalCatalog) catalog).tableExistInLocal(dbName, 
tableName);
+    }
+
+    public void createExternalTable(String dbName, String tableName, String 
catalogName) throws DdlException {
+        CatalogIf catalog = nameToCatalog.get(catalogName);
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + 
catalogName);
+        }
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new DdlException("Only support create ExternalCatalog 
Tables");
+        }
+        DatabaseIf db = catalog.getDbNullable(dbName);
+        if (db == null) {
+            throw new DdlException("Database " + dbName + " does not exist in 
catalog " + catalog.getName());
+        }
+
+        TableIf table = db.getTableNullable(tableName);
+        if (table != null) {
+            throw new DdlException("Table " + tableName + " has exist in db " 
+ dbName);
+        }
+        ExternalObjectLog log = new ExternalObjectLog();
+        log.setCatalogId(catalog.getId());
+        log.setDbId(db.getId());
+        log.setTableName(tableName);
+        log.setTableId(Env.getCurrentEnv().getNextId());
+        replayCreateExternalTable(log);
+        Env.getCurrentEnv().getEditLog().logCreateExternalTable(log);
+    }
+
+    public void replayCreateExternalTable(ExternalObjectLog log) {
+        
LOG.debug("ReplayCreateExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}],tableName:[{}]",
 log.getCatalogId(),
+                log.getDbId(), log.getTableId(), log.getTableName());
+        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+        if (catalog == null) {
+            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
+            return;
+        }
+        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+        if (db == null) {
+            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
+            return;
+        }
+        db.writeLock();
+        try {
+            db.createTable(log.getTableName(), log.getTableId());
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    public void dropExternalDatabase(String dbName, String catalogName) throws 
DdlException {
+        CatalogIf catalog = nameToCatalog.get(catalogName);
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + 
catalogName);
+        }
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new DdlException("Only support drop ExternalCatalog 
databases");
+        }
+        DatabaseIf db = catalog.getDbNullable(dbName);
+        if (db == null) {
+            throw new DdlException("Database " + dbName + " does not exist in 
catalog " + catalog.getName());
+        }
+
+        ExternalObjectLog log = new ExternalObjectLog();
+        log.setCatalogId(catalog.getId());
+        log.setDbId(db.getId());
+        log.setInvalidCache(true);
+        replayDropExternalDatabase(log);
+        Env.getCurrentEnv().getEditLog().logDropExternalDatabase(log);
+    }
+
+    public void replayDropExternalDatabase(ExternalObjectLog log) {
+        writeLock();
+        try {
+            
LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", 
log.getCatalogId(),
+                    log.getDbId(), log.getTableId());
+            ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+            if (catalog == null) {
+                LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
+                return;
+            }
+            ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+            if (db == null) {
+                LOG.warn("No db found with id:[{}], it may have been 
dropped.", log.getDbId());
+                return;
+            }
+            catalog.dropDatabase(db.getFullName());
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), 
db.getFullName());
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public void createExternalDatabase(String dbName, String catalogName) 
throws DdlException {
+        CatalogIf catalog = nameToCatalog.get(catalogName);
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + 
catalogName);
+        }
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new DdlException("Only support create ExternalCatalog 
databases");
+        }
+        DatabaseIf db = catalog.getDbNullable(dbName);
+        if (db != null) {
+            throw new DdlException("Database " + dbName + " has exist in 
catalog " + catalog.getName());
+        }
+
+        ExternalObjectLog log = new ExternalObjectLog();
+        log.setCatalogId(catalog.getId());
+        log.setDbId(Env.getCurrentEnv().getNextId());
+        log.setDbName(dbName);
+        replayCreateExternalDatabase(log);
+        Env.getCurrentEnv().getEditLog().logCreateExternalDatabase(log);
+    }
+
+    public void replayCreateExternalDatabase(ExternalObjectLog log) {
+        writeLock();
+        try {
+            
LOG.debug("ReplayCreateExternalDatabase,catalogId:[{}],dbId:[{}],dbName:[{}]", 
log.getCatalogId(),
+                    log.getDbId(), log.getDbName());
+            ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+            if (catalog == null) {
+                LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
+                return;
+            }
+            catalog.createDatabase(log.getDbId(), log.getDbName());
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    public void addExternalPartitions(String catalogName, String dbName, 
String tableName, List<String> partitionNames)
+            throws DdlException {
+        CatalogIf catalog = nameToCatalog.get(catalogName);
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + 
catalogName);
+        }
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new DdlException("Only support ExternalCatalog");
+        }
+        DatabaseIf db = catalog.getDbNullable(dbName);
+        if (db == null) {
+            throw new DdlException("Database " + dbName + " does not exist in 
catalog " + catalog.getName());
+        }
+
+        TableIf table = db.getTableNullable(tableName);
+        if (table == null) {
+            throw new DdlException("Table " + tableName + " does not exist in 
db " + dbName);
+        }
+
+        ExternalObjectLog log = new ExternalObjectLog();
+        log.setCatalogId(catalog.getId());
+        log.setDbId(db.getId());
+        log.setTableId(table.getId());
+        log.setPartitionNames(partitionNames);
+        replayAddExternalPartitions(log);
+        Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log);
+    }
+
+    public void replayAddExternalPartitions(ExternalObjectLog log) {
+        
LOG.debug("ReplayAddExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", 
log.getCatalogId(),
+                log.getDbId(), log.getTableId());
+        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+        if (catalog == null) {
+            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
+            return;
+        }
+        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+        if (db == null) {
+            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
+            return;
+        }
+        ExternalTable table = db.getTableForReplay(log.getTableId());
+        if (table == null) {
+            LOG.warn("No table found with id:[{}], it may have been dropped.", 
log.getTableId());
+            return;
+        }
+        Env.getCurrentEnv().getExtMetaCacheMgr()
+                .addPartitionsCache(catalog.getId(), table, 
log.getPartitionNames());
+    }
+
+    public void dropExternalPartitions(String catalogName, String dbName, 
String tableName, List<String> partitionNames)
+            throws DdlException {
+        CatalogIf catalog = nameToCatalog.get(catalogName);
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + 
catalogName);
+        }
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new DdlException("Only support ExternalCatalog");
+        }
+        DatabaseIf db = catalog.getDbNullable(dbName);
+        if (db == null) {
+            throw new DdlException("Database " + dbName + " does not exist in 
catalog " + catalog.getName());
+        }
+
+        TableIf table = db.getTableNullable(tableName);
+        if (table == null) {
+            throw new DdlException("Table " + tableName + " does not exist in 
db " + dbName);
+        }
+
+        ExternalObjectLog log = new ExternalObjectLog();
+        log.setCatalogId(catalog.getId());
+        log.setDbId(db.getId());
+        log.setTableId(table.getId());
+        log.setPartitionNames(partitionNames);
+        replayDropExternalPartitions(log);
+        Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log);
+    }
+
+    public void replayDropExternalPartitions(ExternalObjectLog log) {
+        
LOG.debug("ReplayDropExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", 
log.getCatalogId(),
+                log.getDbId(), log.getTableId());
+        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+        if (catalog == null) {
+            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
+            return;
+        }
+        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+        if (db == null) {
+            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
+            return;
+        }
+        ExternalTable table = db.getTableForReplay(log.getTableId());
+        if (table == null) {
+            LOG.warn("No table found with id:[{}], it may have been dropped.", 
log.getTableId());
+            return;
+        }
+        Env.getCurrentEnv().getExtMetaCacheMgr()
+                .dropPartitionsCache(catalog.getId(), table, 
log.getPartitionNames());
+    }
+
+    public void refreshExternalPartitions(String catalogName, String dbName, 
String tableName,
+            List<String> partitionNames)
+            throws DdlException {
+        CatalogIf catalog = nameToCatalog.get(catalogName);
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + 
catalogName);
+        }
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new DdlException("Only support ExternalCatalog");
+        }
+        DatabaseIf db = catalog.getDbNullable(dbName);
+        if (db == null) {
+            throw new DdlException("Database " + dbName + " does not exist in 
catalog " + catalog.getName());
+        }
+
+        TableIf table = db.getTableNullable(tableName);
+        if (table == null) {
+            throw new DdlException("Table " + tableName + " does not exist in 
db " + dbName);
+        }
+
+        ExternalObjectLog log = new ExternalObjectLog();
+        log.setCatalogId(catalog.getId());
+        log.setDbId(db.getId());
+        log.setTableId(table.getId());
+        log.setPartitionNames(partitionNames);
+        replayRefreshExternalPartitions(log);
+        Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log);
+    }
+
+    public void replayRefreshExternalPartitions(ExternalObjectLog log) {
+        
LOG.debug("replayRefreshExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]",
 log.getCatalogId(),
+                log.getDbId(), log.getTableId());
+        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+        if (catalog == null) {
+            LOG.warn("No catalog found with id:[{}], it may have been 
dropped.", log.getCatalogId());
+            return;
+        }
+        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+        if (db == null) {
+            LOG.warn("No db found with id:[{}], it may have been dropped.", 
log.getDbId());
+            return;
+        }
+        ExternalTable table = db.getTableForReplay(log.getTableId());
+        if (table == null) {
+            LOG.warn("No table found with id:[{}], it may have been dropped.", 
log.getTableId());
+            return;
+        }
+        Env.getCurrentEnv().getExtMetaCacheMgr()
+                .invalidatePartitionsCache(catalog.getId(), db.getFullName(), 
table.getName(),
+                        log.getPartitionNames());
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 22726c3b96..3a4a711e1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 import lombok.Data;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.Nullable;
@@ -98,6 +99,17 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
      */
     public abstract boolean tableExist(SessionContext ctx, String dbName, 
String tblName);
 
+    /**
+     * check if the specified table exist in doris.
+     *
+     * @param dbName
+     * @param tblName
+     * @return true if table exists, false otherwise
+     */
+    public boolean tableExistInLocal(String dbName, String tblName) {
+        throw new NotImplementedException();
+    }
+
     /**
      * Catalog can't be init when creating because the external catalog may 
depend on third system.
      * So you have to make sure the client of third system is initialized 
before any method was called.
@@ -310,4 +322,12 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
         idToDb.put(db.getId(), db);
         dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), 
db.getId());
     }
+
+    public void dropDatabase(String dbName) {
+        throw new NotImplementedException();
+    }
+
+    public void createDatabase(long dbId, String dbName) {
+        throw new NotImplementedException();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 934c3f8bbe..13875d084e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.datasource;
 
+import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@@ -25,6 +27,7 @@ import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
@@ -118,4 +121,45 @@ public class ExternalMetaCacheMgr {
         }
         LOG.debug("invalid catalog cache for {}", catalogId);
     }
+
+    public void addPartitionsCache(long catalogId, ExternalTable table, 
List<String> partitionNames) {
+        if (!(table instanceof HMSExternalTable)) {
+            LOG.warn("only support HMSTable");
+            return;
+        }
+        String dbName = 
ClusterNamespace.getNameFromFullName(table.getDbName());
+        HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+        if (metaCache != null) {
+            metaCache.addPartitionsCache(dbName, table.getName(), 
partitionNames,
+                    ((HMSExternalTable) table).getPartitionColumnTypes());
+        }
+        LOG.debug("add partition cache for {}.{} in catalog {}", dbName, 
table.getName(), catalogId);
+    }
+
+    public void dropPartitionsCache(long catalogId, ExternalTable table, 
List<String> partitionNames) {
+        if (!(table instanceof HMSExternalTable)) {
+            LOG.warn("only support HMSTable");
+            return;
+        }
+        String dbName = 
ClusterNamespace.getNameFromFullName(table.getDbName());
+        HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+        if (metaCache != null) {
+            metaCache.dropPartitionsCache(dbName, table.getName(), 
partitionNames,
+                    ((HMSExternalTable) table).getPartitionColumnTypes(), 
true);
+        }
+        LOG.debug("drop partition cache for {}.{} in catalog {}", dbName, 
table.getName(), catalogId);
+    }
+
+    public void invalidatePartitionsCache(long catalogId, String dbName, 
String tableName,
+            List<String> partitionNames) {
+        HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+        if (metaCache != null) {
+            dbName = ClusterNamespace.getNameFromFullName(dbName);
+            for (String partitionName : partitionNames) {
+                metaCache.invalidatePartitionCache(dbName, tableName, 
partitionName);
+            }
+
+        }
+        LOG.debug("invalidate partition cache for {}.{} in catalog {}", 
dbName, tableName, catalogId);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
index 030f981382..04d5b06036 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -29,6 +29,7 @@ import lombok.NoArgsConstructor;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
 @NoArgsConstructor
 @Getter
@@ -40,12 +41,21 @@ public class ExternalObjectLog implements Writable {
     @SerializedName(value = "dbId")
     private long dbId;
 
+    @SerializedName(value = "dbName")
+    private String dbName;
+
     @SerializedName(value = "tableId")
     private long tableId;
 
+    @SerializedName(value = "tableName")
+    private String tableName;
+
     @SerializedName(value = "invalidCache")
     private boolean invalidCache;
 
+    @SerializedName(value = "partitionNames")
+    private List<String> partitionNames;
+
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, GsonUtils.GSON.toJson(this));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 5de9aec2eb..70ba9f9aa9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -154,6 +154,16 @@ public class HMSExternalCatalog extends ExternalCatalog {
         return client.tableExists(getRealTableName(dbName), tblName);
     }
 
+    @Override
+    public boolean tableExistInLocal(String dbName, String tblName) {
+        makeSureInitialized();
+        HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase) 
idToDb.get(dbNameToId.get(dbName));
+        if (hmsExternalDatabase == null) {
+            return false;
+        }
+        return 
hmsExternalDatabase.getTable(getRealTableName(tblName)).isPresent();
+    }
+
     public PooledHiveMetaStoreClient getClient() {
         makeSureInitialized();
         return client;
@@ -215,4 +225,24 @@ public class HMSExternalCatalog extends ExternalCatalog {
         }
         return currentNotificationEventId.getEventId();
     }
+
+    @Override
+    public void dropDatabase(String dbName) {
+        LOG.debug("drop database [{}]", dbName);
+        makeSureInitialized();
+        Long dbId = dbNameToId.remove(dbName);
+        if (dbId == null) {
+            LOG.warn("drop database [{}] failed", dbName);
+        }
+        idToDb.remove(dbId);
+    }
+
+    @Override
+    public void createDatabase(long dbId, String dbName) {
+        makeSureInitialized();
+        LOG.debug("create database [{}]", dbName);
+        dbNameToId.put(dbName, dbId);
+        HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName);
+        idToDb.put(dbId, db);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 6f4d3845e7..c51ef521a4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -45,6 +45,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
 import lombok.Data;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -57,6 +58,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -163,27 +165,36 @@ public class HiveMetaStoreCache {
             LOG.debug("load #{} partitions for {} in catalog {}", 
partitionNames.size(), key, catalog.getName());
         }
         Map<Long, PartitionItem> idToPartitionItem = 
Maps.newHashMapWithExpectedSize(partitionNames.size());
+        Map<String, Long> partitionNameToIdMap = 
Maps.newHashMapWithExpectedSize(partitionNames.size());
+        Map<Long, List<UniqueId>> idToUniqueIdsMap = 
Maps.newHashMapWithExpectedSize(partitionNames.size());
         long idx = 0;
         for (String partitionName : partitionNames) {
-            idToPartitionItem.put(idx++, toListPartitionItem(partitionName, 
key.types));
+            long partitionId = idx++;
+            ListPartitionItem listPartitionItem = 
toListPartitionItem(partitionName, key.types);
+            idToPartitionItem.put(partitionId, listPartitionItem);
+            partitionNameToIdMap.put(partitionName, partitionId);
         }
 
         Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = null;
         Map<Range<PartitionKey>, UniqueId> rangeToId = null;
         RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null;
+        Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = null;
         if (key.types.size() > 1) {
             // uidToPartitionRange and rangeToId are only used for 
multi-column partition
-            uidToPartitionRange = 
ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem);
+            uidToPartitionRange = 
ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem, 
idToUniqueIdsMap);
             rangeToId = 
ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
         } else {
             Preconditions.checkState(key.types.size() == 1, key.types);
             // singleColumnRangeMap is only used for single-column partition
-            singleColumnRangeMap = 
ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem);
+            singleColumnRangeMap = 
ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem, 
idToUniqueIdsMap);
+            singleUidToColumnRangeMap = 
ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap);
         }
-        return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, 
rangeToId, singleColumnRangeMap);
+        Map<Long, List<String>> partitionValuesMap = 
ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
+        return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, 
rangeToId, singleColumnRangeMap, idx,
+                partitionNameToIdMap, idToUniqueIdsMap, 
singleUidToColumnRangeMap, partitionValuesMap);
     }
 
-    private ListPartitionItem toListPartitionItem(String partitionName, 
List<Type> types) {
+    public ListPartitionItem toListPartitionItem(String partitionName, 
List<Type> types) {
         // Partition name will be in format: nation=cn/city=beijing
         // parse it to get values "cn" and "beijing"
         String[] parts = partitionName.split("/");
@@ -274,6 +285,10 @@ public class HiveMetaStoreCache {
 
     public HivePartitionValues getPartitionValues(String dbName, String 
tblName, List<Type> types) {
         PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, 
tblName, types);
+        return getPartitionValues(key);
+    }
+
+    public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) {
         try {
             return partitionValuesCache.get(key);
         } catch (ExecutionException e) {
@@ -350,6 +365,21 @@ public class HiveMetaStoreCache {
         }
     }
 
+    public void invalidatePartitionCache(String dbName, String tblName, String 
partitionName) {
+        PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, 
tblName, null);
+        HivePartitionValues partitionValues = 
partitionValuesCache.getIfPresent(key);
+        if (partitionValues != null) {
+            Long partitionId = 
partitionValues.partitionNameToIdMap.get(partitionName);
+            List<String> values = 
partitionValues.partitionValuesMap.get(partitionId);
+            PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, 
values);
+            HivePartition partition = partitionCache.getIfPresent(partKey);
+            if (partition != null) {
+                fileCache.invalidate(new FileCacheKey(partition.getPath(), 
null));
+                partitionCache.invalidate(partKey);
+            }
+        }
+    }
+
     public void invalidateDbCache(String dbName) {
         long start = System.currentTimeMillis();
         Set<PartitionValueCacheKey> keys = 
partitionValuesCache.asMap().keySet();
@@ -369,6 +399,113 @@ public class HiveMetaStoreCache {
         LOG.debug("invalid all meta cache in catalog {}", catalog.getName());
     }
 
+    // partition name format: nation=cn/city=beijing
+    public void addPartitionsCache(String dbName, String tblName, List<String> 
partitionNames,
+            List<Type> partitionColumnTypes) {
+        PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, 
tblName, partitionColumnTypes);
+        HivePartitionValues partitionValues = 
partitionValuesCache.getIfPresent(key);
+        if (partitionValues == null) {
+            return;
+        }
+        HivePartitionValues copy = partitionValues.copy();
+        Map<Long, PartitionItem> idToPartitionItemBefore = 
copy.getIdToPartitionItem();
+        Map<String, Long> partitionNameToIdMapBefore = 
copy.getPartitionNameToIdMap();
+        Map<Long, List<UniqueId>> idToUniqueIdsMap = 
copy.getIdToUniqueIdsMap();
+        Map<Long, PartitionItem> idToPartitionItem = new HashMap<>();
+        long idx = copy.getNextPartitionId();
+        for (String partitionName : partitionNames) {
+            if (partitionNameToIdMapBefore.containsKey(partitionName)) {
+                LOG.info("addPartitionsCache partitionName:[{}] has exist in 
table:[{}]", partitionName, tblName);
+                continue;
+            }
+            long partitionId = idx++;
+            ListPartitionItem listPartitionItem = 
toListPartitionItem(partitionName, key.types);
+            idToPartitionItemBefore.put(partitionId, listPartitionItem);
+            idToPartitionItem.put(partitionId, listPartitionItem);
+            partitionNameToIdMapBefore.put(partitionName, partitionId);
+        }
+        Map<Long, List<String>> partitionValuesMapBefore = 
copy.getPartitionValuesMap();
+        Map<Long, List<String>> partitionValuesMap = 
ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
+        partitionValuesMapBefore.putAll(partitionValuesMap);
+        copy.setNextPartitionId(idx);
+        if (key.types.size() > 1) {
+            Map<UniqueId, Range<PartitionKey>> uidToPartitionRangeBefore = 
copy.getUidToPartitionRange();
+            // uidToPartitionRange and rangeToId are only used for 
multi-column partition
+            Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = 
ListPartitionPrunerV2
+                    .genUidToPartitionRange(idToPartitionItem, 
idToUniqueIdsMap);
+            uidToPartitionRangeBefore.putAll(uidToPartitionRange);
+            Map<Range<PartitionKey>, UniqueId> rangeToIdBefore = 
copy.getRangeToId();
+            Map<Range<PartitionKey>, UniqueId> rangeToId = 
ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
+            rangeToIdBefore.putAll(rangeToId);
+        } else {
+            Preconditions.checkState(key.types.size() == 1, key.types);
+            // singleColumnRangeMap is only used for single-column partition
+            RangeMap<ColumnBound, UniqueId> singleColumnRangeMapBefore = 
copy.getSingleColumnRangeMap();
+            RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = 
ListPartitionPrunerV2
+                    .genSingleColumnRangeMap(idToPartitionItem, 
idToUniqueIdsMap);
+            singleColumnRangeMapBefore.putAll(singleColumnRangeMap);
+            Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMapBefore 
= copy
+                    .getSingleUidToColumnRangeMap();
+            Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = 
ListPartitionPrunerV2
+                    .genSingleUidToColumnRange(singleColumnRangeMap);
+            singleUidToColumnRangeMapBefore.putAll(singleUidToColumnRangeMap);
+        }
+        HivePartitionValues partitionValuesCur = 
partitionValuesCache.getIfPresent(key);
+        if (partitionValuesCur == partitionValues) {
+            partitionValuesCache.put(key, copy);
+        }
+    }
+
+    public void dropPartitionsCache(String dbName, String tblName, 
List<String> partitionNames,
+            List<Type> partitionColumnTypes, boolean invalidPartitionCache) {
+        PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, 
tblName, partitionColumnTypes);
+        HivePartitionValues partitionValues = 
partitionValuesCache.getIfPresent(key);
+        if (partitionValues == null) {
+            return;
+        }
+        HivePartitionValues copy = partitionValues.copy();
+        Map<String, Long> partitionNameToIdMapBefore = 
copy.getPartitionNameToIdMap();
+        Map<Long, PartitionItem> idToPartitionItemBefore = 
copy.getIdToPartitionItem();
+        Map<Long, List<UniqueId>> idToUniqueIdsMapBefore = 
copy.getIdToUniqueIdsMap();
+        Map<UniqueId, Range<PartitionKey>> uidToPartitionRangeBefore = 
copy.getUidToPartitionRange();
+        Map<Range<PartitionKey>, UniqueId> rangeToIdBefore = 
copy.getRangeToId();
+        RangeMap<ColumnBound, UniqueId> singleColumnRangeMapBefore = 
copy.getSingleColumnRangeMap();
+        Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMapBefore = 
copy.getSingleUidToColumnRangeMap();
+        Map<Long, List<String>> partitionValuesMap = 
copy.getPartitionValuesMap();
+        for (String partitionName : partitionNames) {
+            if (!partitionNameToIdMapBefore.containsKey(partitionName)) {
+                LOG.info("dropPartitionsCache partitionName:[{}] not exist in 
table:[{}]", partitionName, tblName);
+                continue;
+            }
+            Long partitionId = 
partitionNameToIdMapBefore.remove(partitionName);
+            idToPartitionItemBefore.remove(partitionId);
+            partitionValuesMap.remove(partitionId);
+            List<UniqueId> uniqueIds = 
idToUniqueIdsMapBefore.remove(partitionId);
+            if (key.types.size() > 1) {
+                for (UniqueId uniqueId : uniqueIds) {
+                    Range<PartitionKey> range = 
uidToPartitionRangeBefore.remove(uniqueId);
+                    rangeToIdBefore.remove(range);
+                }
+            } else {
+                for (UniqueId uniqueId : uniqueIds) {
+                    Range<ColumnBound> range = 
singleUidToColumnRangeMapBefore.remove(uniqueId);
+                    singleColumnRangeMapBefore.remove(range);
+                }
+            }
+            if (invalidPartitionCache) {
+                invalidatePartitionCache(dbName, tblName, partitionName);
+            }
+        }
+        HivePartitionValues partitionValuesCur = 
partitionValuesCache.getIfPresent(key);
+        if (partitionValuesCur == partitionValues) {
+            partitionValuesCache.put(key, copy);
+        }
+    }
+
+    public void putPartitionValuesCacheForTest(PartitionValueCacheKey key, 
HivePartitionValues values) {
+        partitionValuesCache.put(key, values);
+    }
+
     /**
      * The Key of hive partition value cache
      */
@@ -480,24 +617,58 @@ public class HiveMetaStoreCache {
 
     @Data
     public static class HivePartitionValues {
+        private long nextPartitionId;
+        private Map<String, Long> partitionNameToIdMap;
+        private Map<Long, List<UniqueId>> idToUniqueIdsMap;
         private Map<Long, PartitionItem> idToPartitionItem;
-        private Map<Long, List<String>> partitionValuesMap = Maps.newHashMap();
+        private Map<Long, List<String>> partitionValuesMap;
+        //multi pair
         private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange;
         private Map<Range<PartitionKey>, UniqueId> rangeToId;
+        //single pair
         private RangeMap<ColumnBound, UniqueId> singleColumnRangeMap;
+        private Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap;
+
+        public HivePartitionValues() {
+        }
 
         public HivePartitionValues(Map<Long, PartitionItem> idToPartitionItem,
                 Map<UniqueId, Range<PartitionKey>> uidToPartitionRange,
                 Map<Range<PartitionKey>, UniqueId> rangeToId,
-                RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) {
+                RangeMap<ColumnBound, UniqueId> singleColumnRangeMap,
+                long nextPartitionId,
+                Map<String, Long> partitionNameToIdMap,
+                Map<Long, List<UniqueId>> idToUniqueIdsMap,
+                Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap,
+                Map<Long, List<String>> partitionValuesMap) {
             this.idToPartitionItem = idToPartitionItem;
-            for (Map.Entry<Long, PartitionItem> entry : 
this.idToPartitionItem.entrySet()) {
-                partitionValuesMap.put(entry.getKey(),
-                        ((ListPartitionItem) 
entry.getValue()).getItems().get(0).getPartitionValuesAsStringList());
-            }
             this.uidToPartitionRange = uidToPartitionRange;
             this.rangeToId = rangeToId;
             this.singleColumnRangeMap = singleColumnRangeMap;
+            this.nextPartitionId = nextPartitionId;
+            this.partitionNameToIdMap = partitionNameToIdMap;
+            this.idToUniqueIdsMap = idToUniqueIdsMap;
+            this.singleUidToColumnRangeMap = singleUidToColumnRangeMap;
+            this.partitionValuesMap = partitionValuesMap;
+        }
+
+        public HivePartitionValues copy() {
+            HivePartitionValues copy = new HivePartitionValues();
+            copy.setNextPartitionId(nextPartitionId);
+            copy.setPartitionNameToIdMap(partitionNameToIdMap == null ? null : 
Maps.newHashMap(partitionNameToIdMap));
+            copy.setIdToUniqueIdsMap(idToUniqueIdsMap == null ? null : 
Maps.newHashMap(idToUniqueIdsMap));
+            copy.setIdToPartitionItem(idToPartitionItem == null ? null : 
Maps.newHashMap(idToPartitionItem));
+            copy.setPartitionValuesMap(partitionValuesMap == null ? null : 
Maps.newHashMap(partitionValuesMap));
+            copy.setUidToPartitionRange(uidToPartitionRange == null ? null : 
Maps.newHashMap(uidToPartitionRange));
+            copy.setRangeToId(rangeToId == null ? null : 
Maps.newHashMap(rangeToId));
+            copy.setSingleUidToColumnRangeMap(
+                    singleUidToColumnRangeMap == null ? null : 
Maps.newHashMap(singleUidToColumnRangeMap));
+            if (singleColumnRangeMap != null) {
+                RangeMap<ColumnBound, UniqueId> copySingleColumnRangeMap = 
TreeRangeMap.create();
+                copySingleColumnRangeMap.putAll(singleColumnRangeMap);
+                copy.setSingleColumnRangeMap(copySingleColumnRangeMap);
+            }
+            return copy;
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
new file mode 100644
index 0000000000..004efa3d15
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * MetastoreEvent for ADD_PARTITION event type
+ */
+public class AddPartitionEvent extends MetastoreTableEvent {
+    private final Table hmsTbl;
+    private final List<String> partitionNames;
+
+    private AddPartitionEvent(NotificationEvent event,
+            String catalogName) {
+        super(event, catalogName);
+        
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ADD_PARTITION));
+        Preconditions
+                .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
+        try {
+            AddPartitionMessage addPartitionMessage =
+                    MetastoreEventsProcessor.getMessageDeserializer()
+                            .getAddPartitionMessage(event.getMessage());
+            hmsTbl = 
Preconditions.checkNotNull(addPartitionMessage.getTableObj());
+            Iterable<Partition> addedPartitions = 
addPartitionMessage.getPartitionObjs();
+            partitionNames = new ArrayList<>();
+            List<String> partitionColNames = hmsTbl.getPartitionKeys().stream()
+                    .map(FieldSchema::getName).collect(Collectors.toList());
+            addedPartitions.forEach(partition -> partitionNames.add(
+                    FileUtils.makePartName(partitionColNames, 
partition.getValues())));
+        } catch (Exception ex) {
+            throw new MetastoreNotificationException(ex);
+        }
+    }
+
+    protected static List<MetastoreEvent> getEvents(NotificationEvent event,
+            String catalogName) {
+        return Lists.newArrayList(new AddPartitionEvent(event, catalogName));
+    }
+
+    @Override
+    protected void process() throws MetastoreNotificationException {
+        try {
+            
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNames:[{}]", 
catalogName, dbName, tblName,
+                    partitionNames.toString());
+            // bail out early if there are not partitions to process
+            if (partitionNames.isEmpty()) {
+                infoLog("Partition list is empty. Ignoring this event.");
+                return;
+            }
+            Env.getCurrentEnv().getCatalogMgr()
+                    .addExternalPartitions(catalogName, dbName, 
hmsTbl.getTableName(), partitionNames);
+        } catch (DdlException e) {
+            throw new MetastoreNotificationException(
+                    debugString("Failed to process event"));
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
similarity index 61%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
index 4d2dc1a178..59445a5dc4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
@@ -18,26 +18,31 @@
 
 package org.apache.doris.datasource.hive.event;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
 import java.util.List;
 
 /**
- * An event type which is ignored. Useful for unsupported metastore event types
+ * MetastoreEvent for Alter_DATABASE event type
  */
-public class IgnoredEvent extends MetastoreEvent {
-    protected IgnoredEvent(NotificationEvent event, String catalogName) {
+public class AlterDatabaseEvent extends MetastoreEvent {
+
+    private AlterDatabaseEvent(NotificationEvent event,
+            String catalogName) {
         super(event, catalogName);
+        
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_DATABASE));
     }
 
-    private static List<MetastoreEvent> getEvents(NotificationEvent event,
+    protected static List<MetastoreEvent> getEvents(NotificationEvent event,
             String catalogName) {
-        return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+        return Lists.newArrayList(new AlterDatabaseEvent(event, catalogName));
     }
 
     @Override
-    public void process() {
-        debugLog("Ignoring unknown event type " + 
metastoreNotificationEvent.getEventType());
+    protected void process() throws MetastoreNotificationException {
+        // only can change properties,we do nothing
+        infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
new file mode 100644
index 0000000000..b2ba44f842
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * MetastoreEvent for ALTER_PARTITION event type
+ */
+public class AlterPartitionEvent extends MetastoreTableEvent {
+    private final Table hmsTbl;
+    private final org.apache.hadoop.hive.metastore.api.Partition 
partitionAfter;
+    private final org.apache.hadoop.hive.metastore.api.Partition 
partitionBefore;
+    private final String partitionNameBefore;
+    private final String partitionNameAfter;
+    // true if this alter event was due to a rename operation
+    private final boolean isRename;
+
+    private AlterPartitionEvent(NotificationEvent event,
+            String catalogName) {
+        super(event, catalogName);
+        
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_PARTITION));
+        Preconditions
+                .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
+        try {
+            AlterPartitionMessage alterPartitionMessage =
+                    MetastoreEventsProcessor.getMessageDeserializer()
+                            .getAlterPartitionMessage(event.getMessage());
+            hmsTbl = 
Preconditions.checkNotNull(alterPartitionMessage.getTableObj());
+            partitionBefore = 
Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore());
+            partitionAfter = 
Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
+            List<String> partitionColNames = hmsTbl.getPartitionKeys().stream()
+                    .map(FieldSchema::getName).collect(Collectors.toList());
+            partitionNameBefore = FileUtils.makePartName(partitionColNames, 
partitionBefore.getValues());
+            partitionNameAfter = FileUtils.makePartName(partitionColNames, 
partitionAfter.getValues());
+            isRename = 
!partitionNameBefore.equalsIgnoreCase(partitionNameAfter);
+        } catch (Exception ex) {
+            throw new MetastoreNotificationException(ex);
+        }
+    }
+
+    protected static List<MetastoreEvent> getEvents(NotificationEvent event,
+            String catalogName) {
+        return Lists.newArrayList(new AlterPartitionEvent(event, catalogName));
+    }
+
+    @Override
+    protected void process() throws MetastoreNotificationException {
+        try {
+            
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNameBefore:[{}],partitionNameAfter:[{}]",
+                    catalogName, dbName, tblName, partitionNameBefore, 
partitionNameAfter);
+            if (isRename) {
+                Env.getCurrentEnv().getCatalogMgr()
+                        .dropExternalPartitions(catalogName, dbName, tblName, 
Lists.newArrayList(partitionNameBefore));
+                Env.getCurrentEnv().getCatalogMgr()
+                        .addExternalPartitions(catalogName, dbName, tblName, 
Lists.newArrayList(partitionNameAfter));
+            } else {
+                Env.getCurrentEnv().getCatalogMgr()
+                        .refreshExternalPartitions(catalogName, dbName, 
hmsTbl.getTableName(),
+                                Lists.newArrayList(partitionNameAfter));
+            }
+        } catch (DdlException e) {
+            throw new MetastoreNotificationException(
+                    debugString("Failed to process event"));
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
new file mode 100644
index 0000000000..b14a1c6577
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;
+
+import java.util.List;
+
+/**
+ * MetastoreEvent for ALTER_TABLE event type
+ */
+public class AlterTableEvent extends MetastoreTableEvent {
+    // the table object before alter operation
+    private final Table tableBefore;
+    // the table object after alter operation
+    private final Table tableAfter;
+
+    // true if this alter event was due to a rename operation
+    private final boolean isRename;
+
+    private AlterTableEvent(NotificationEvent event, String catalogName) {
+        super(event, catalogName);
+        
Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(getEventType()));
+        Preconditions
+                .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
+        try {
+            JSONAlterTableMessage alterTableMessage =
+                    (JSONAlterTableMessage) 
MetastoreEventsProcessor.getMessageDeserializer()
+                            .getAlterTableMessage(event.getMessage());
+            tableAfter = 
Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
+            tableBefore = 
Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
+        } catch (Exception e) {
+            throw new MetastoreNotificationException(
+                    debugString("Unable to parse the alter table message"), e);
+        }
+        // this is a rename event if either dbName or tblName of before and 
after object changed
+        isRename = 
!tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName())
+                || 
!tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName());
+
+    }
+
+    public static List<MetastoreEvent> getEvents(NotificationEvent event,
+            String catalogName) {
+        return Lists.newArrayList(new AlterTableEvent(event, catalogName));
+    }
+
+
+    private void processRename() throws DdlException {
+        if (!isRename) {
+            return;
+        }
+        boolean hasExist = Env.getCurrentEnv().getCatalogMgr()
+                .externalTableExistInLocal(tableAfter.getDbName(), 
tableAfter.getTableName(), catalogName);
+        if (hasExist) {
+            infoLog("AlterExternalTable canceled,because tableAfter has exist, 
"
+                            + "catalogName:[{}],dbName:[{}],tableName:[{}]",
+                    catalogName, dbName, tableAfter.getTableName());
+            return;
+        }
+        Env.getCurrentEnv().getCatalogMgr()
+                .dropExternalTable(tableBefore.getDbName(), 
tableBefore.getTableName(), catalogName);
+        Env.getCurrentEnv().getCatalogMgr()
+                .createExternalTable(tableAfter.getDbName(), 
tableAfter.getTableName(), catalogName);
+
+    }
+
+    /**
+     * If the ALTER_TABLE event is due a table rename, this method removes the 
old table
+     * and creates a new table with the new name. Else, we just refresh table
+     */
+    @Override
+    protected void process() throws MetastoreNotificationException {
+        try {
+            
infoLog("catalogName:[{}],dbName:[{}],tableBefore:[{}],tableAfter:[{}]", 
catalogName, dbName,
+                    tableBefore.getTableName(), tableAfter.getTableName());
+            if (isRename) {
+                processRename();
+                return;
+            }
+            //The scope of refresh can be narrowed in the future
+            Env.getCurrentEnv().getCatalogMgr()
+                    .refreshExternalTable(tableBefore.getDbName(), 
tableBefore.getTableName(), catalogName);
+        } catch (Exception e) {
+            throw new MetastoreNotificationException(
+                    debugString("Failed to process event"));
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
similarity index 51%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
index 4d2dc1a178..48476bce57 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
@@ -18,26 +18,40 @@
 
 package org.apache.doris.datasource.hive.event;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
 import java.util.List;
 
 /**
- * An event type which is ignored. Useful for unsupported metastore event types
+ * MetastoreEvent for CREATE_DATABASE event type
  */
-public class IgnoredEvent extends MetastoreEvent {
-    protected IgnoredEvent(NotificationEvent event, String catalogName) {
+public class CreateDatabaseEvent extends MetastoreEvent {
+
+    private CreateDatabaseEvent(NotificationEvent event,
+            String catalogName) {
         super(event, catalogName);
+        
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.CREATE_DATABASE));
     }
 
-    private static List<MetastoreEvent> getEvents(NotificationEvent event,
+    protected static List<MetastoreEvent> getEvents(NotificationEvent event,
             String catalogName) {
-        return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+        return Lists.newArrayList(new CreateDatabaseEvent(event, catalogName));
     }
 
     @Override
-    public void process() {
-        debugLog("Ignoring unknown event type " + 
metastoreNotificationEvent.getEventType());
+    protected void process() throws MetastoreNotificationException {
+        try {
+            infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
+            Env.getCurrentEnv().getCatalogMgr()
+                    .createExternalDatabase(dbName, catalogName);
+        } catch (DdlException e) {
+            throw new MetastoreNotificationException(
+                    debugString("Failed to process event"));
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
new file mode 100644
index 0000000000..e4e8e8bb4b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+
+import java.util.List;
+
+/**
+ * MetastoreEvent for CREATE_TABLE event type
+ */
+public class CreateTableEvent extends MetastoreTableEvent {
+    private final Table hmsTbl;
+
+    private CreateTableEvent(NotificationEvent event, String catalogName) 
throws MetastoreNotificationException {
+        super(event, catalogName);
+        
Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(getEventType()));
+        Preconditions
+                .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
+        try {
+            CreateTableMessage createTableMessage =
+                    
MetastoreEventsProcessor.getMessageDeserializer().getCreateTableMessage(event.getMessage());
+            hmsTbl = 
Preconditions.checkNotNull(createTableMessage.getTableObj());
+        } catch (Exception e) {
+            throw new MetastoreNotificationException(
+                    debugString("Unable to deserialize the event message"), e);
+        }
+    }
+
+    public static List<MetastoreEvent> getEvents(NotificationEvent event, 
String catalogName) {
+        return Lists.newArrayList(new CreateTableEvent(event, catalogName));
+    }
+
+    @Override
+    protected void process() throws MetastoreNotificationException {
+        try {
+            infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", 
catalogName, dbName, tblName);
+            boolean hasExist = Env.getCurrentEnv().getCatalogMgr()
+                    .externalTableExistInLocal(dbName, hmsTbl.getTableName(), 
catalogName);
+            if (hasExist) {
+                infoLog(
+                        "CreateExternalTable canceled,because table has exist,"
+                                + 
"catalogName:[{}],dbName:[{}],tableName:[{}]",
+                        catalogName, dbName, tblName);
+                return;
+            }
+            Env.getCurrentEnv().getCatalogMgr().createExternalTable(dbName, 
hmsTbl.getTableName(), catalogName);
+        } catch (DdlException e) {
+            throw new MetastoreNotificationException(
+                    debugString("Failed to process event"));
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
similarity index 52%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
index 4d2dc1a178..b51145a258 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java
@@ -18,26 +18,40 @@
 
 package org.apache.doris.datasource.hive.event;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 
 import java.util.List;
 
 /**
- * An event type which is ignored. Useful for unsupported metastore event types
+ * MetastoreEvent for DROP_DATABASE event type
  */
-public class IgnoredEvent extends MetastoreEvent {
-    protected IgnoredEvent(NotificationEvent event, String catalogName) {
+public class DropDatabaseEvent extends MetastoreEvent {
+
+    private DropDatabaseEvent(NotificationEvent event,
+            String catalogName) {
         super(event, catalogName);
+        
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.DROP_DATABASE));
     }
 
-    private static List<MetastoreEvent> getEvents(NotificationEvent event,
+    protected static List<MetastoreEvent> getEvents(NotificationEvent event,
             String catalogName) {
-        return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+        return Lists.newArrayList(new DropDatabaseEvent(event, catalogName));
     }
 
     @Override
-    public void process() {
-        debugLog("Ignoring unknown event type " + 
metastoreNotificationEvent.getEventType());
+    protected void process() throws MetastoreNotificationException {
+        try {
+            infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
+            Env.getCurrentEnv().getCatalogMgr()
+                    .dropExternalDatabase(dbName, catalogName);
+        } catch (DdlException e) {
+            throw new MetastoreNotificationException(
+                    debugString("Failed to process event"));
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
new file mode 100644
index 0000000000..cd5ed5bfbd
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * MetastoreEvent for ADD_PARTITION event type
+ */
+public class DropPartitionEvent extends MetastoreTableEvent {
+    private final Table hmsTbl;
+    private final List<String> partitionNames;
+
+    private DropPartitionEvent(NotificationEvent event,
+            String catalogName) {
+        super(event, catalogName);
+        
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.DROP_PARTITION));
+        Preconditions
+                .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
+        try {
+            DropPartitionMessage dropPartitionMessage =
+                    MetastoreEventsProcessor.getMessageDeserializer()
+                            .getDropPartitionMessage(event.getMessage());
+            hmsTbl = 
Preconditions.checkNotNull(dropPartitionMessage.getTableObj());
+            List<Map<String, String>> droppedPartitions = 
dropPartitionMessage.getPartitions();
+            partitionNames = new ArrayList<>();
+            List<String> partitionColNames = hmsTbl.getPartitionKeys().stream()
+                    .map(FieldSchema::getName).collect(Collectors.toList());
+            droppedPartitions.forEach(partition -> partitionNames.add(
+                    getPartitionName(partition, partitionColNames)));
+        } catch (Exception ex) {
+            throw new MetastoreNotificationException(ex);
+        }
+    }
+
+    protected static List<MetastoreEvent> getEvents(NotificationEvent event,
+            String catalogName) {
+        return Lists.newArrayList(
+                new DropPartitionEvent(event, catalogName));
+    }
+
+    @Override
+    protected void process() throws MetastoreNotificationException {
+        try {
+            
infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNames:[{}]", 
catalogName, dbName, tblName,
+                    partitionNames.toString());
+            // bail out early if there are not partitions to process
+            if (partitionNames.isEmpty()) {
+                infoLog("Partition list is empty. Ignoring this event.");
+                return;
+            }
+            Env.getCurrentEnv().getCatalogMgr()
+                    .dropExternalPartitions(catalogName, dbName, 
hmsTbl.getTableName(), partitionNames);
+        } catch (DdlException e) {
+            throw new MetastoreNotificationException(
+                    debugString("Failed to process event"));
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
index 8647e47b78..2b84c1bb42 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
@@ -25,8 +25,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 
@@ -34,25 +32,21 @@ import java.util.List;
  * MetastoreEvent for DROP_TABLE event type
  */
 public class DropTableEvent extends MetastoreTableEvent {
-    private static final Logger LOG = 
LogManager.getLogger(DropTableEvent.class);
-    private final String dbName;
     private final String tableName;
 
     private DropTableEvent(NotificationEvent event,
             String catalogName) {
         super(event, catalogName);
         
Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType()));
-        JSONDropTableMessage dropTableMessage =
-                (JSONDropTableMessage) 
MetastoreEventsProcessor.getMessageDeserializer()
-                        .getDropTableMessage(event.getMessage());
+        Preconditions
+                .checkNotNull(event.getMessage(), debugString("Event message 
is null"));
         try {
-            dbName = dropTableMessage.getDB();
+            JSONDropTableMessage dropTableMessage =
+                    (JSONDropTableMessage) 
MetastoreEventsProcessor.getMessageDeserializer()
+                            .getDropTableMessage(event.getMessage());
             tableName = dropTableMessage.getTable();
         } catch (Exception e) {
-            throw new MetastoreNotificationException(debugString(
-                    "Could not parse event message. "
-                            + "Check if %s is set to true in metastore 
configuration",
-                    
MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
+            throw new MetastoreNotificationException(e);
         }
     }
 
@@ -61,29 +55,14 @@ public class DropTableEvent extends MetastoreTableEvent {
         return Lists.newArrayList(new DropTableEvent(event, catalogName));
     }
 
-    @Override
-    protected boolean existInCache() {
-        return true;
-    }
-
-    @Override
-    protected boolean canBeSkipped() {
-        return false;
-    }
-
-    protected boolean isSupported() {
-        return true;
-    }
-
     @Override
     protected void process() throws MetastoreNotificationException {
         try {
-            LOG.info("DropTable event 
process,catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName,
-                    tableName);
+            infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", 
catalogName, dbName, tableName);
             Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName, 
tableName, catalogName);
         } catch (DdlException e) {
-            LOG.warn("DropExternalTable 
failed,dbName:[{}],tableName:[{}],catalogName:[{}].", dbName, tableName,
-                    catalogName, e);
+            throw new MetastoreNotificationException(
+                    debugString("Failed to process event"));
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
index 4d2dc1a178..d504c2917f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
@@ -27,17 +27,17 @@ import java.util.List;
  * An event type which is ignored. Useful for unsupported metastore event types
  */
 public class IgnoredEvent extends MetastoreEvent {
-    protected IgnoredEvent(NotificationEvent event, String catalogName) {
+    private IgnoredEvent(NotificationEvent event, String catalogName) {
         super(event, catalogName);
     }
 
-    private static List<MetastoreEvent> getEvents(NotificationEvent event,
+    protected static List<MetastoreEvent> getEvents(NotificationEvent event,
             String catalogName) {
         return Lists.newArrayList(new IgnoredEvent(event, catalogName));
     }
 
     @Override
     public void process() {
-        debugLog("Ignoring unknown event type " + 
metastoreNotificationEvent.getEventType());
+        infoLog("Ignoring unknown event type " + 
metastoreNotificationEvent.getEventType());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
index 5cc4594457..132496a9f4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
@@ -22,6 +22,9 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * Abstract base class for all MetastoreEvents. A MetastoreEvent is an object 
used to
  * process a NotificationEvent received from metastore.
@@ -105,11 +108,6 @@ public abstract class MetastoreEvent {
         return null;
     }
 
-
-    protected boolean existInCache() throws MetastoreNotificationException {
-        return false;
-    }
-
     /**
      * Returns the number of events represented by this event. For most events 
this is 1.
      * In case of batch events this could be more than 1.
@@ -128,14 +126,6 @@ public abstract class MetastoreEvent {
         return false;
     }
 
-    /**
-     * Whether the current version of FE supports processing of some events, 
some events are reserved,
-     * and may be processed later version.
-     */
-    protected boolean isSupported() {
-        return false;
-    }
-
     /**
      * Process the information available in the NotificationEvent.
      */
@@ -196,6 +186,26 @@ public abstract class MetastoreEvent {
         LOG.debug(formatString, formatArgs);
     }
 
+    protected String getPartitionName(Map<String, String> part, List<String> 
partitionColNames) {
+        if (part.size() == 0) {
+            return "";
+        }
+        if (partitionColNames.size() != part.size()) {
+            return "";
+        }
+        StringBuilder name = new StringBuilder();
+        int i = 0;
+        for (String colName : partitionColNames) {
+            if (i++ > 0) {
+                name.append("/");
+            }
+            name.append(colName);
+            name.append("=");
+            name.append(part.get(colName));
+        }
+        return name.toString();
+    }
+
     @Override
     public String toString() {
         return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
index 2719158c8e..ce96ce62e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
@@ -26,9 +26,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Factory class to create various MetastoreEvents.
@@ -42,31 +40,36 @@ public class MetastoreEventFactory implements EventFactory {
         Preconditions.checkNotNull(event.getEventType());
         MetastoreEventType metastoreEventType = 
MetastoreEventType.from(event.getEventType());
         switch (metastoreEventType) {
+            case CREATE_TABLE:
+                return CreateTableEvent.getEvents(event, catalogName);
             case DROP_TABLE:
                 return DropTableEvent.getEvents(event, catalogName);
+            case ALTER_TABLE:
+                return AlterTableEvent.getEvents(event, catalogName);
+            case CREATE_DATABASE:
+                return CreateDatabaseEvent.getEvents(event, catalogName);
+            case DROP_DATABASE:
+                return DropDatabaseEvent.getEvents(event, catalogName);
+            case ALTER_DATABASE:
+                return AlterDatabaseEvent.getEvents(event, catalogName);
+            case ADD_PARTITION:
+                return AddPartitionEvent.getEvents(event, catalogName);
+            case DROP_PARTITION:
+                return DropPartitionEvent.getEvents(event, catalogName);
+            case ALTER_PARTITION:
+                return AlterPartitionEvent.getEvents(event, catalogName);
             default:
                 // ignore all the unknown events by creating a IgnoredEvent
-                return Lists.newArrayList(new IgnoredEvent(event, 
catalogName));
+                return IgnoredEvent.getEvents(event, catalogName);
         }
     }
 
     List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events, 
HMSExternalCatalog hmsExternalCatalog) {
         List<MetastoreEvent> metastoreEvents = Lists.newArrayList();
-
         for (NotificationEvent event : events) {
             
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, 
hmsExternalCatalog.getName()));
         }
-
-        List<MetastoreEvent> tobeProcessEvents = metastoreEvents.stream()
-                .filter(MetastoreEvent::isSupported)
-                .collect(Collectors.toList());
-
-        if (tobeProcessEvents.isEmpty()) {
-            LOG.info("The metastore events to process is empty on catalog {}", 
hmsExternalCatalog.getName());
-            return Collections.emptyList();
-        }
-
-        return createBatchEvents(tobeProcessEvents);
+        return createBatchEvents(metastoreEvents);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index fb8fbb5b99..d8aff54d11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -707,6 +707,12 @@ public class JournalEntity implements Writable {
             }
             case OperationType.OP_REFRESH_EXTERNAL_DB:
             case OperationType.OP_DROP_EXTERNAL_TABLE:
+            case OperationType.OP_CREATE_EXTERNAL_TABLE:
+            case OperationType.OP_DROP_EXTERNAL_DB:
+            case OperationType.OP_CREATE_EXTERNAL_DB:
+            case OperationType.OP_ADD_EXTERNAL_PARTITIONS:
+            case OperationType.OP_DROP_EXTERNAL_PARTITIONS:
+            case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS:
             case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
                 data = ExternalObjectLog.read(in);
                 isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 9254583294..66671c184f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -954,6 +954,36 @@ public class EditLog {
                     env.getCatalogMgr().replayDropExternalTable(log);
                     break;
                 }
+                case OperationType.OP_CREATE_EXTERNAL_TABLE: {
+                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
+                    env.getCatalogMgr().replayCreateExternalTable(log);
+                    break;
+                }
+                case OperationType.OP_DROP_EXTERNAL_DB: {
+                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
+                    env.getCatalogMgr().replayDropExternalDatabase(log);
+                    break;
+                }
+                case OperationType.OP_CREATE_EXTERNAL_DB: {
+                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
+                    env.getCatalogMgr().replayCreateExternalDatabase(log);
+                    break;
+                }
+                case OperationType.OP_ADD_EXTERNAL_PARTITIONS: {
+                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
+                    env.getCatalogMgr().replayAddExternalPartitions(log);
+                    break;
+                }
+                case OperationType.OP_DROP_EXTERNAL_PARTITIONS: {
+                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
+                    env.getCatalogMgr().replayDropExternalPartitions(log);
+                    break;
+                }
+                case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: {
+                    final ExternalObjectLog log = (ExternalObjectLog) 
journal.getData();
+                    env.getCatalogMgr().replayRefreshExternalPartitions(log);
+                    break;
+                }
                 case OperationType.OP_INIT_EXTERNAL_TABLE: {
                     // Do nothing.
                     break;
@@ -1633,6 +1663,30 @@ public class EditLog {
         logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log);
     }
 
+    public void logCreateExternalTable(ExternalObjectLog log) {
+        logEdit(OperationType.OP_CREATE_EXTERNAL_TABLE, log);
+    }
+
+    public void logDropExternalDatabase(ExternalObjectLog log) {
+        logEdit(OperationType.OP_DROP_EXTERNAL_DB, log);
+    }
+
+    public void logCreateExternalDatabase(ExternalObjectLog log) {
+        logEdit(OperationType.OP_CREATE_EXTERNAL_DB, log);
+    }
+
+    public void logAddExternalPartitions(ExternalObjectLog log) {
+        logEdit(OperationType.OP_ADD_EXTERNAL_PARTITIONS, log);
+    }
+
+    public void logDropExternalPartitions(ExternalObjectLog log) {
+        logEdit(OperationType.OP_DROP_EXTERNAL_PARTITIONS, log);
+    }
+
+    public void logInvalidateExternalPartitions(ExternalObjectLog log) {
+        logEdit(OperationType.OP_REFRESH_EXTERNAL_PARTITIONS, log);
+    }
+
     public Journal getJournal() {
         return this.journal;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index c5889a8001..ee145f06f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -256,6 +256,12 @@ public class OperationType {
     public static final short OP_ALTER_MTMV_TASK = 342;
 
     public static final short OP_DROP_EXTERNAL_TABLE = 350;
+    public static final short OP_DROP_EXTERNAL_DB = 351;
+    public static final short OP_CREATE_EXTERNAL_TABLE = 352;
+    public static final short OP_CREATE_EXTERNAL_DB = 353;
+    public static final short OP_ADD_EXTERNAL_PARTITIONS = 354;
+    public static final short OP_DROP_EXTERNAL_PARTITIONS = 355;
+    public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356;
 
     public static final short OP_ALTER_USER = 400;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java
index bbefea50e3..fcae3c4ecd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java
@@ -18,6 +18,7 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.common.AnalysisException;
@@ -33,8 +34,10 @@ import com.google.common.collect.TreeRangeMap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -59,7 +62,7 @@ public class ListPartitionPrunerV2 extends 
PartitionPrunerV2Base {
         super(idToPartitionItem, partitionColumns, columnNameToRange);
         this.uidToPartitionRange = Maps.newHashMap();
         if (partitionColumns.size() > 1) {
-            this.uidToPartitionRange = 
genUidToPartitionRange(idToPartitionItem);
+            this.uidToPartitionRange = 
genUidToPartitionRange(idToPartitionItem, new HashMap<>());
             this.rangeToId = genRangeToId(uidToPartitionRange);
         }
     }
@@ -77,16 +80,21 @@ public class ListPartitionPrunerV2 extends 
PartitionPrunerV2Base {
     }
 
     public static Map<UniqueId, Range<PartitionKey>> genUidToPartitionRange(
-            Map<Long, PartitionItem> idToPartitionItem) {
+            Map<Long, PartitionItem> idToPartitionItem, Map<Long, 
List<UniqueId>> idToUniqueIdsMap) {
         Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = 
Maps.newHashMap();
         idToPartitionItem.forEach((id, item) -> {
             List<PartitionKey> keys = item.getItems();
             List<Range<PartitionKey>> ranges = keys.stream()
                     .map(key -> Range.closed(key, key))
                     .collect(Collectors.toList());
+            List<UniqueId> uniqueIds = idToUniqueIdsMap.get(id) == null ? new 
ArrayList<>(ranges.size())
+                    : idToUniqueIdsMap.get(id);
             for (int i = 0; i < ranges.size(); i++) {
-                uidToPartitionRange.put(new ListPartitionUniqueId(id, i), 
ranges.get(i));
+                ListPartitionUniqueId listPartitionUniqueId = new 
ListPartitionUniqueId(id, i);
+                uidToPartitionRange.put(listPartitionUniqueId, ranges.get(i));
+                uniqueIds.add(listPartitionUniqueId);
             }
+            idToUniqueIdsMap.put(id, uniqueIds);
         });
         return uidToPartitionRange;
     }
@@ -94,21 +102,35 @@ public class ListPartitionPrunerV2 extends 
PartitionPrunerV2Base {
     @Override
     void genSingleColumnRangeMap() {
         if (singleColumnRangeMap == null) {
-            singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem);
+            singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem, 
new HashMap<>());
         }
     }
 
-    public static RangeMap<ColumnBound, UniqueId> 
genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem) {
+    public static Map<Long, List<String>> getPartitionValuesMap(Map<Long, 
PartitionItem> idToPartitionItem) {
+        Map<Long, List<String>> partitionValuesMap = new HashMap<>();
+        for (Map.Entry<Long, PartitionItem> entry : 
idToPartitionItem.entrySet()) {
+            partitionValuesMap.put(entry.getKey(),
+                    ((ListPartitionItem) 
entry.getValue()).getItems().get(0).getPartitionValuesAsStringList());
+        }
+        return partitionValuesMap;
+    }
+
+    public static RangeMap<ColumnBound, UniqueId> 
genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem,
+            Map<Long, List<UniqueId>> idToUniqueIdsMap) {
         RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create();
         idToPartitionItem.forEach((id, item) -> {
             List<PartitionKey> keys = item.getItems();
             List<Range<PartitionKey>> ranges = keys.stream()
                     .map(key -> Range.closed(key, key))
                     .collect(Collectors.toList());
+            List<UniqueId> uniqueIds = idToUniqueIdsMap.get(id) == null ? new 
ArrayList<>(ranges.size())
+                    : idToUniqueIdsMap.get(id);
             for (int i = 0; i < ranges.size(); i++) {
-                candidate.put(mapPartitionKeyRange(ranges.get(i), 0),
-                        new ListPartitionUniqueId(id, i));
+                ListPartitionUniqueId listPartitionUniqueId = new 
ListPartitionUniqueId(id, i);
+                candidate.put(mapPartitionKeyRange(ranges.get(i), 0), 
listPartitionUniqueId);
+                uniqueIds.add(listPartitionUniqueId);
             }
+            idToUniqueIdsMap.put(id, uniqueIds);
         });
         return candidate;
     }
@@ -151,6 +173,13 @@ public class ListPartitionPrunerV2 extends 
PartitionPrunerV2Base {
         return rangeToId;
     }
 
+    public static Map<UniqueId, Range<ColumnBound>> genSingleUidToColumnRange(
+            RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) {
+        Map<UniqueId, Range<ColumnBound>> uidToColumnRange = Maps.newHashMap();
+        singleColumnRangeMap.asMapOfRanges().forEach((columnBound, uid) -> 
uidToColumnRange.put(uid, columnBound));
+        return uidToColumnRange;
+    }
+
     private Collection<Long> doPruneMultiple(Map<Column, FinalFilters> 
columnToFilters,
             Map<Range<PartitionKey>, UniqueId> partitionRangeToUid,
             int columnIdx) {
@@ -178,20 +207,20 @@ public class ListPartitionPrunerV2 extends 
PartitionPrunerV2Base {
                 grouped.forEach(candidateRangeMap::put);
 
                 return finalFilters.filters.stream()
-                    .map(filter -> {
-                        RangeMap<ColumnBound, List<UniqueId>> filtered =
-                                candidateRangeMap.subRangeMap(filter);
-                        // Find PartitionKey ranges according to filtered 
UniqueIds.
-                        Map<Range<PartitionKey>, UniqueId> 
filteredPartitionRange =
-                                filtered.asMapOfRanges().values()
-                                        .stream()
-                                        .flatMap(List::stream)
-                                        
.collect(Collectors.toMap(uidToPartitionRange::get, Function.identity()));
-                        return doPruneMultiple(columnToFilters, 
filteredPartitionRange,
-                            columnIdx + 1);
-                    })
-                    .flatMap(Collection::stream)
-                    .collect(Collectors.toSet());
+                        .map(filter -> {
+                            RangeMap<ColumnBound, List<UniqueId>> filtered =
+                                    candidateRangeMap.subRangeMap(filter);
+                            // Find PartitionKey ranges according to filtered 
UniqueIds.
+                            Map<Range<PartitionKey>, UniqueId> 
filteredPartitionRange =
+                                    filtered.asMapOfRanges().values()
+                                            .stream()
+                                            .flatMap(List::stream)
+                                            
.collect(Collectors.toMap(uidToPartitionRange::get, Function.identity()));
+                            return doPruneMultiple(columnToFilters, 
filteredPartitionRange,
+                                    columnIdx + 1);
+                        })
+                        .flatMap(Collection::stream)
+                        .collect(Collectors.toSet());
             case NO_FILTERS:
             default:
                 return doPruneMultiple(columnToFilters, partitionRangeToUid, 
columnIdx + 1);
@@ -215,9 +244,9 @@ public class ListPartitionPrunerV2 extends 
PartitionPrunerV2Base {
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(this)
-                .add("partitionId", partitionId)
-                .add("partitionKeyIndex", partitionKeyIndex)
-                .toString();
+                    .add("partitionId", partitionId)
+                    .add("partitionKeyIndex", partitionKeyIndex)
+                    .toString();
         }
 
         @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
index a75f332f70..f74bf032e5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java
@@ -32,8 +32,12 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EsResource;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ResourceMgr;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.EsExternalDatabase;
 import org.apache.doris.catalog.external.EsExternalTable;
 import org.apache.doris.catalog.external.HMSExternalDatabase;
@@ -41,14 +45,23 @@ import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues;
+import 
org.apache.doris.datasource.hive.HiveMetaStoreCache.PartitionValueCacheKey;
 import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.planner.ColumnBound;
+import org.apache.doris.planner.ListPartitionPrunerV2;
+import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSet;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.utframe.TestWithFeService;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
 import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -58,6 +71,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -69,13 +83,14 @@ public class CatalogMgrTest extends TestWithFeService {
     private static UserIdentity user2;
     private CatalogMgr mgr;
     private ResourceMgr resourceMgr;
+    private ExternalMetaCacheMgr externalMetaCacheMgr;
 
     @Override
     protected void runBeforeAll() throws Exception {
         FeConstants.runningUnitTest = true;
         mgr = Env.getCurrentEnv().getCatalogMgr();
         resourceMgr = Env.getCurrentEnv().getResourceMgr();
-
+        externalMetaCacheMgr = Env.getCurrentEnv().getExtMetaCacheMgr();
         ConnectContext rootCtx = createDefaultCtx();
         env = Env.getCurrentEnv();
         auth = env.getAuth();
@@ -417,4 +432,119 @@ public class CatalogMgrTest extends TestWithFeService {
         }
     }
 
+    @Test
+    public void testAddMultiColumnPartitionsCache() {
+        HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) 
mgr.getCatalog("hive");
+        HiveMetaStoreCache metaStoreCache = 
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
+        PartitionValueCacheKey partitionValueCacheKey = new 
PartitionValueCacheKey("hiveDb", "hiveTable",
+                Lists.newArrayList(Type.INT, Type.SMALLINT));
+        HivePartitionValues hivePartitionValues = 
loadPartitionValues(partitionValueCacheKey,
+                Lists.newArrayList("y=2020/m=1", "y=2020/m=2"), 
metaStoreCache);
+        metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, 
hivePartitionValues);
+        metaStoreCache.addPartitionsCache("hiveDb", "hiveTable", 
Lists.newArrayList("y=2020/m=3", "y=2020/m=4"),
+                partitionValueCacheKey.getTypes());
+        HivePartitionValues partitionValues = 
metaStoreCache.getPartitionValues(partitionValueCacheKey);
+        Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 
4);
+    }
+
+    @Test
+    public void testDropMultiColumnPartitionsCache() {
+        HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) 
mgr.getCatalog("hive");
+        HiveMetaStoreCache metaStoreCache = 
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
+        PartitionValueCacheKey partitionValueCacheKey = new 
PartitionValueCacheKey("hiveDb", "hiveTable",
+                Lists.newArrayList(Type.INT, Type.SMALLINT));
+        HivePartitionValues hivePartitionValues = 
loadPartitionValues(partitionValueCacheKey,
+                Lists.newArrayList("y=2020/m=1", "y=2020/m=2"), 
metaStoreCache);
+        metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, 
hivePartitionValues);
+        metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable", 
Lists.newArrayList("y=2020/m=1", "y=2020/m=2"),
+                partitionValueCacheKey.getTypes(), false);
+        HivePartitionValues partitionValues = 
metaStoreCache.getPartitionValues(partitionValueCacheKey);
+        Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 
0);
+    }
+
+    @Test
+    public void testAddSingleColumnPartitionsCache() {
+        HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) 
mgr.getCatalog("hive");
+        HiveMetaStoreCache metaStoreCache = 
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
+        PartitionValueCacheKey partitionValueCacheKey = new 
PartitionValueCacheKey("hiveDb", "hiveTable",
+                Lists.newArrayList(Type.SMALLINT));
+        HivePartitionValues hivePartitionValues = 
loadPartitionValues(partitionValueCacheKey,
+                Lists.newArrayList("m=1", "m=2"), metaStoreCache);
+        metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, 
hivePartitionValues);
+        metaStoreCache.addPartitionsCache("hiveDb", "hiveTable", 
Lists.newArrayList("m=3", "m=4"),
+                partitionValueCacheKey.getTypes());
+        HivePartitionValues partitionValues = 
metaStoreCache.getPartitionValues(partitionValueCacheKey);
+        Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 
4);
+    }
+
+    @Test
+    public void testDropSingleColumnPartitionsCache() {
+        HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) 
mgr.getCatalog("hive");
+        HiveMetaStoreCache metaStoreCache = 
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
+        PartitionValueCacheKey partitionValueCacheKey = new 
PartitionValueCacheKey("hiveDb", "hiveTable",
+                Lists.newArrayList(Type.SMALLINT));
+        HivePartitionValues hivePartitionValues = 
loadPartitionValues(partitionValueCacheKey,
+                Lists.newArrayList("m=1", "m=2"), metaStoreCache);
+        metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, 
hivePartitionValues);
+        metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable", 
Lists.newArrayList("m=1", "m=2"),
+                partitionValueCacheKey.getTypes(), false);
+        HivePartitionValues partitionValues = 
metaStoreCache.getPartitionValues(partitionValueCacheKey);
+        Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 
0);
+    }
+
+    @Test
+    public void testAddPartitionsCacheToLargeTable() {
+        HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) 
mgr.getCatalog("hive");
+        HiveMetaStoreCache metaStoreCache = 
externalMetaCacheMgr.getMetaStoreCache(hiveCatalog);
+        PartitionValueCacheKey partitionValueCacheKey = new 
PartitionValueCacheKey("hiveDb", "hiveTable",
+                Lists.newArrayList(Type.INT));
+        List<String> pNames = new ArrayList<>(100000);
+        for (int i = 1; i <= 100000; i++) {
+            pNames.add("m=" + i);
+        }
+        HivePartitionValues hivePartitionValues = 
loadPartitionValues(partitionValueCacheKey,
+                pNames, metaStoreCache);
+        metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, 
hivePartitionValues);
+        long start = System.currentTimeMillis();
+        metaStoreCache.addPartitionsCache("hiveDb", "hiveTable", 
Lists.newArrayList("m=100001"),
+                partitionValueCacheKey.getTypes());
+        //387 in 4c16g
+        System.out.println("testAddPartitionsCacheToLargeTable use time 
mills:" + (System.currentTimeMillis() - start));
+        HivePartitionValues partitionValues = 
metaStoreCache.getPartitionValues(partitionValueCacheKey);
+        Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 
100001);
+    }
+
+    private HivePartitionValues loadPartitionValues(PartitionValueCacheKey 
key, List<String> partitionNames,
+            HiveMetaStoreCache metaStoreCache) {
+        // partition name format: nation=cn/city=beijing
+        Map<Long, PartitionItem> idToPartitionItem = 
Maps.newHashMapWithExpectedSize(partitionNames.size());
+        Map<String, Long> partitionNameToIdMap = 
Maps.newHashMapWithExpectedSize(partitionNames.size());
+        Map<Long, List<UniqueId>> idToUniqueIdsMap = 
Maps.newHashMapWithExpectedSize(partitionNames.size());
+        long idx = 0;
+        for (String partitionName : partitionNames) {
+            long partitionId = idx++;
+            ListPartitionItem listPartitionItem = 
metaStoreCache.toListPartitionItem(partitionName, key.getTypes());
+            idToPartitionItem.put(partitionId, listPartitionItem);
+            partitionNameToIdMap.put(partitionName, partitionId);
+        }
+
+        Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = null;
+        Map<Range<PartitionKey>, UniqueId> rangeToId = null;
+        RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null;
+        Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = null;
+        if (key.getTypes().size() > 1) {
+            // uidToPartitionRange and rangeToId are only used for 
multi-column partition
+            uidToPartitionRange = 
ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem, 
idToUniqueIdsMap);
+            rangeToId = 
ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
+        } else {
+            Preconditions.checkState(key.getTypes().size() == 1, 
key.getTypes());
+            // singleColumnRangeMap is only used for single-column partition
+            singleColumnRangeMap = 
ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem, 
idToUniqueIdsMap);
+            singleUidToColumnRangeMap = 
ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap);
+        }
+        Map<Long, List<String>> partitionValuesMap = 
ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
+        return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, 
rangeToId, singleColumnRangeMap, idx,
+                partitionNameToIdMap, idToUniqueIdsMap, 
singleUidToColumnRangeMap, partitionValuesMap);
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to