This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 899f5f5cf5 [feature](multi-catalog) support hive metastore more events (#15702) 899f5f5cf5 is described below commit 899f5f5cf5acf007081aea19590d3dab75ebd34e Author: zhangdong <493738...@qq.com> AuthorDate: Mon Jan 16 14:16:12 2023 +0800 [feature](multi-catalog) support hive metastore more events (#15702) support hive metastore more events --- .../org/apache/doris/catalog/RefreshManager.java | 23 +- .../doris/catalog/external/ExternalDatabase.java | 4 + .../catalog/external/HMSExternalDatabase.java | 11 + .../main/java/org/apache/doris/common/Config.java | 2 +- .../org/apache/doris/datasource/CatalogMgr.java | 347 ++++++++++++++++++++- .../apache/doris/datasource/ExternalCatalog.java | 20 ++ .../doris/datasource/ExternalMetaCacheMgr.java | 44 +++ .../apache/doris/datasource/ExternalObjectLog.java | 10 + .../doris/datasource/HMSExternalCatalog.java | 30 ++ .../doris/datasource/hive/HiveMetaStoreCache.java | 193 +++++++++++- .../datasource/hive/event/AddPartitionEvent.java | 88 ++++++ .../{IgnoredEvent.java => AlterDatabaseEvent.java} | 19 +- .../datasource/hive/event/AlterPartitionEvent.java | 95 ++++++ .../datasource/hive/event/AlterTableEvent.java | 111 +++++++ ...{IgnoredEvent.java => CreateDatabaseEvent.java} | 28 +- .../datasource/hive/event/CreateTableEvent.java | 76 +++++ .../{IgnoredEvent.java => DropDatabaseEvent.java} | 28 +- .../datasource/hive/event/DropPartitionEvent.java | 88 ++++++ .../datasource/hive/event/DropTableEvent.java | 39 +-- .../doris/datasource/hive/event/IgnoredEvent.java | 6 +- .../datasource/hive/event/MetastoreEvent.java | 36 ++- .../hive/event/MetastoreEventFactory.java | 33 +- .../org/apache/doris/journal/JournalEntity.java | 6 + .../java/org/apache/doris/persist/EditLog.java | 54 ++++ .../org/apache/doris/persist/OperationType.java | 6 + .../doris/planner/ListPartitionPrunerV2.java | 77 +++-- .../apache/doris/datasource/CatalogMgrTest.java | 132 +++++++- 27 files changed, 1464 insertions(+), 142 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index b8d6929994..ef95b3e5f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -56,7 +56,7 @@ public class RefreshManager { refreshInternalCtlIcebergTable(stmt, env); } else { // Process external catalog table refresh - refreshExternalCtlTable(dbName, tableName, catalog); + env.getCatalogMgr().refreshExternalTable(dbName, tableName, catalogName); } LOG.info("Successfully refresh table: {} from db: {}", tableName, dbName); } @@ -146,25 +146,4 @@ public class RefreshManager { stmt.getTableName(), "ICEBERG", icebergProperties, ""); env.createTable(createTableStmt); } - - private void refreshExternalCtlTable(String dbName, String tableName, CatalogIf catalog) throws DdlException { - if (!(catalog instanceof ExternalCatalog)) { - throw new DdlException("Only support refresh ExternalCatalog Tables"); - } - DatabaseIf db = catalog.getDbNullable(dbName); - if (db == null) { - throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); - } - - TableIf table = db.getTableNullable(tableName); - if (table == null) { - throw new DdlException("Table " + tableName + " does not exist in db " + dbName); - } - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName); - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index 65c027713e..e4efcac408 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -263,4 +263,8 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>, public void dropTable(String tableName) { throw new NotImplementedException(); } + + public void createTable(String tableName, long tableId) { + throw new NotImplementedException(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java index a1f6bcddab..e379dd3c0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java @@ -174,10 +174,21 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl @Override public void dropTable(String tableName) { LOG.debug("drop table [{}]", tableName); + makeSureInitialized(); Long tableId = tableNameToId.remove(tableName); if (tableId == null) { LOG.warn("drop table [{}] failed", tableName); } idToTbl.remove(tableId); } + + @Override + public void createTable(String tableName, long tableId) { + LOG.debug("create table [{}]", tableName); + makeSureInitialized(); + tableNameToId.put(tableName, tableId); + HMSExternalTable table = new HMSExternalTable(tableId, tableName, name, + (HMSExternalCatalog) extCatalog); + idToTbl.put(tableId, table); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 5e03365b52..6aa43d1a86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1953,6 +1953,6 @@ public class Config extends ConfigBase { * HMS polling interval in milliseconds. */ @ConfField(masterOnly = true) - public static int hms_events_polling_interval_ms = 20000; + public static int hms_events_polling_interval_ms = 10000; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index 8fe1e07425..b3c822c21c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -553,10 +553,47 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } } + public void refreshExternalTable(String dbName, String tableName, String catalogName) throws DdlException { + CatalogIf catalog = nameToCatalog.get(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support refresh ExternalCatalog Tables"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db == null) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + + TableIf table = db.getTableNullable(tableName); + if (table == null) { + throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + } + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName); + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + log.setTableId(table.getId()); + Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log); + } + public void replayRefreshExternalTable(ExternalObjectLog log) { ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + if (catalog == null) { + LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); + return; + } ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + if (db == null) { + LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); + return; + } ExternalTable table = db.getTableForReplay(log.getTableId()); + if (table == null) { + LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); + return; + } Env.getCurrentEnv().getExtMetaCacheMgr() .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName()); } @@ -590,13 +627,321 @@ public class CatalogMgr implements Writable, GsonPostProcessable { LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), log.getDbId(), log.getTableId()); ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + if (catalog == null) { + LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); + return; + } ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + if (db == null) { + LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); + return; + } ExternalTable table = db.getTableForReplay(log.getTableId()); - db.dropTable(table.getName()); + if (table == null) { + LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); + return; + } + db.writeLock(); + try { + db.dropTable(table.getName()); + } finally { + db.writeUnlock(); + } + Env.getCurrentEnv().getExtMetaCacheMgr() .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName()); } + public boolean externalTableExistInLocal(String dbName, String tableName, String catalogName) throws DdlException { + CatalogIf catalog = nameToCatalog.get(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support ExternalCatalog Tables"); + } + return ((ExternalCatalog) catalog).tableExistInLocal(dbName, tableName); + } + + public void createExternalTable(String dbName, String tableName, String catalogName) throws DdlException { + CatalogIf catalog = nameToCatalog.get(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support create ExternalCatalog Tables"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db == null) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + + TableIf table = db.getTableNullable(tableName); + if (table != null) { + throw new DdlException("Table " + tableName + " has exist in db " + dbName); + } + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + log.setTableName(tableName); + log.setTableId(Env.getCurrentEnv().getNextId()); + replayCreateExternalTable(log); + Env.getCurrentEnv().getEditLog().logCreateExternalTable(log); + } + + public void replayCreateExternalTable(ExternalObjectLog log) { + LOG.debug("ReplayCreateExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}],tableName:[{}]", log.getCatalogId(), + log.getDbId(), log.getTableId(), log.getTableName()); + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + if (catalog == null) { + LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); + return; + } + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + if (db == null) { + LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); + return; + } + db.writeLock(); + try { + db.createTable(log.getTableName(), log.getTableId()); + } finally { + db.writeUnlock(); + } + } + + public void dropExternalDatabase(String dbName, String catalogName) throws DdlException { + CatalogIf catalog = nameToCatalog.get(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support drop ExternalCatalog databases"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db == null) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + log.setInvalidCache(true); + replayDropExternalDatabase(log); + Env.getCurrentEnv().getEditLog().logDropExternalDatabase(log); + } + + public void replayDropExternalDatabase(ExternalObjectLog log) { + writeLock(); + try { + LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), + log.getDbId(), log.getTableId()); + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + if (catalog == null) { + LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); + return; + } + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + if (db == null) { + LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); + return; + } + catalog.dropDatabase(db.getFullName()); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), db.getFullName()); + } finally { + writeUnlock(); + } + } + + public void createExternalDatabase(String dbName, String catalogName) throws DdlException { + CatalogIf catalog = nameToCatalog.get(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support create ExternalCatalog databases"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db != null) { + throw new DdlException("Database " + dbName + " has exist in catalog " + catalog.getName()); + } + + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(Env.getCurrentEnv().getNextId()); + log.setDbName(dbName); + replayCreateExternalDatabase(log); + Env.getCurrentEnv().getEditLog().logCreateExternalDatabase(log); + } + + public void replayCreateExternalDatabase(ExternalObjectLog log) { + writeLock(); + try { + LOG.debug("ReplayCreateExternalDatabase,catalogId:[{}],dbId:[{}],dbName:[{}]", log.getCatalogId(), + log.getDbId(), log.getDbName()); + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + if (catalog == null) { + LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); + return; + } + catalog.createDatabase(log.getDbId(), log.getDbName()); + } finally { + writeUnlock(); + } + } + + public void addExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames) + throws DdlException { + CatalogIf catalog = nameToCatalog.get(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support ExternalCatalog"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db == null) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + + TableIf table = db.getTableNullable(tableName); + if (table == null) { + throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + } + + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + log.setTableId(table.getId()); + log.setPartitionNames(partitionNames); + replayAddExternalPartitions(log); + Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log); + } + + public void replayAddExternalPartitions(ExternalObjectLog log) { + LOG.debug("ReplayAddExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), + log.getDbId(), log.getTableId()); + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + if (catalog == null) { + LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); + return; + } + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + if (db == null) { + LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); + return; + } + ExternalTable table = db.getTableForReplay(log.getTableId()); + if (table == null) { + LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); + return; + } + Env.getCurrentEnv().getExtMetaCacheMgr() + .addPartitionsCache(catalog.getId(), table, log.getPartitionNames()); + } + + public void dropExternalPartitions(String catalogName, String dbName, String tableName, List<String> partitionNames) + throws DdlException { + CatalogIf catalog = nameToCatalog.get(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support ExternalCatalog"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db == null) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + + TableIf table = db.getTableNullable(tableName); + if (table == null) { + throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + } + + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + log.setTableId(table.getId()); + log.setPartitionNames(partitionNames); + replayDropExternalPartitions(log); + Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log); + } + + public void replayDropExternalPartitions(ExternalObjectLog log) { + LOG.debug("ReplayDropExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), + log.getDbId(), log.getTableId()); + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + if (catalog == null) { + LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); + return; + } + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + if (db == null) { + LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); + return; + } + ExternalTable table = db.getTableForReplay(log.getTableId()); + if (table == null) { + LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); + return; + } + Env.getCurrentEnv().getExtMetaCacheMgr() + .dropPartitionsCache(catalog.getId(), table, log.getPartitionNames()); + } + + public void refreshExternalPartitions(String catalogName, String dbName, String tableName, + List<String> partitionNames) + throws DdlException { + CatalogIf catalog = nameToCatalog.get(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support ExternalCatalog"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db == null) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + + TableIf table = db.getTableNullable(tableName); + if (table == null) { + throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + } + + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + log.setTableId(table.getId()); + log.setPartitionNames(partitionNames); + replayRefreshExternalPartitions(log); + Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log); + } + + public void replayRefreshExternalPartitions(ExternalObjectLog log) { + LOG.debug("replayRefreshExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), + log.getDbId(), log.getTableId()); + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + if (catalog == null) { + LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); + return; + } + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + if (db == null) { + LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); + return; + } + ExternalTable table = db.getTableForReplay(log.getTableId()); + if (table == null) { + LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); + return; + } + Env.getCurrentEnv().getExtMetaCacheMgr() + .invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(), + log.getPartitionNames()); + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 22726c3b96..3a4a711e1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -35,6 +35,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import lombok.Data; +import org.apache.commons.lang.NotImplementedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -98,6 +99,17 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr */ public abstract boolean tableExist(SessionContext ctx, String dbName, String tblName); + /** + * check if the specified table exist in doris. + * + * @param dbName + * @param tblName + * @return true if table exists, false otherwise + */ + public boolean tableExistInLocal(String dbName, String tblName) { + throw new NotImplementedException(); + } + /** * Catalog can't be init when creating because the external catalog may depend on third system. * So you have to make sure the client of third system is initialized before any method was called. @@ -310,4 +322,12 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr idToDb.put(db.getId(), db); dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId()); } + + public void dropDatabase(String dbName) { + throw new NotImplementedException(); + } + + public void createDatabase(long dbId, String dbName) { + throw new NotImplementedException(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 934c3f8bbe..13875d084e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -17,6 +17,8 @@ package org.apache.doris.datasource; +import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -25,6 +27,7 @@ import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -118,4 +121,45 @@ public class ExternalMetaCacheMgr { } LOG.debug("invalid catalog cache for {}", catalogId); } + + public void addPartitionsCache(long catalogId, ExternalTable table, List<String> partitionNames) { + if (!(table instanceof HMSExternalTable)) { + LOG.warn("only support HMSTable"); + return; + } + String dbName = ClusterNamespace.getNameFromFullName(table.getDbName()); + HiveMetaStoreCache metaCache = cacheMap.get(catalogId); + if (metaCache != null) { + metaCache.addPartitionsCache(dbName, table.getName(), partitionNames, + ((HMSExternalTable) table).getPartitionColumnTypes()); + } + LOG.debug("add partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId); + } + + public void dropPartitionsCache(long catalogId, ExternalTable table, List<String> partitionNames) { + if (!(table instanceof HMSExternalTable)) { + LOG.warn("only support HMSTable"); + return; + } + String dbName = ClusterNamespace.getNameFromFullName(table.getDbName()); + HiveMetaStoreCache metaCache = cacheMap.get(catalogId); + if (metaCache != null) { + metaCache.dropPartitionsCache(dbName, table.getName(), partitionNames, + ((HMSExternalTable) table).getPartitionColumnTypes(), true); + } + LOG.debug("drop partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId); + } + + public void invalidatePartitionsCache(long catalogId, String dbName, String tableName, + List<String> partitionNames) { + HiveMetaStoreCache metaCache = cacheMap.get(catalogId); + if (metaCache != null) { + dbName = ClusterNamespace.getNameFromFullName(dbName); + for (String partitionName : partitionNames) { + metaCache.invalidatePartitionCache(dbName, tableName, partitionName); + } + + } + LOG.debug("invalidate partition cache for {}.{} in catalog {}", dbName, tableName, catalogId); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java index 030f981382..04d5b06036 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java @@ -29,6 +29,7 @@ import lombok.NoArgsConstructor; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; @NoArgsConstructor @Getter @@ -40,12 +41,21 @@ public class ExternalObjectLog implements Writable { @SerializedName(value = "dbId") private long dbId; + @SerializedName(value = "dbName") + private String dbName; + @SerializedName(value = "tableId") private long tableId; + @SerializedName(value = "tableName") + private String tableName; + @SerializedName(value = "invalidCache") private boolean invalidCache; + @SerializedName(value = "partitionNames") + private List<String> partitionNames; + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 5de9aec2eb..70ba9f9aa9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -154,6 +154,16 @@ public class HMSExternalCatalog extends ExternalCatalog { return client.tableExists(getRealTableName(dbName), tblName); } + @Override + public boolean tableExistInLocal(String dbName, String tblName) { + makeSureInitialized(); + HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase) idToDb.get(dbNameToId.get(dbName)); + if (hmsExternalDatabase == null) { + return false; + } + return hmsExternalDatabase.getTable(getRealTableName(tblName)).isPresent(); + } + public PooledHiveMetaStoreClient getClient() { makeSureInitialized(); return client; @@ -215,4 +225,24 @@ public class HMSExternalCatalog extends ExternalCatalog { } return currentNotificationEventId.getEventId(); } + + @Override + public void dropDatabase(String dbName) { + LOG.debug("drop database [{}]", dbName); + makeSureInitialized(); + Long dbId = dbNameToId.remove(dbName); + if (dbId == null) { + LOG.warn("drop database [{}] failed", dbName); + } + idToDb.remove(dbId); + } + + @Override + public void createDatabase(long dbId, String dbName) { + makeSureInitialized(); + LOG.debug("create database [{}]", dbName); + dbNameToId.put(dbName, dbId); + HMSExternalDatabase db = new HMSExternalDatabase(this, dbId, dbName); + idToDb.put(dbId, db); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 6f4d3845e7..c51ef521a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -45,6 +45,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; import lombok.Data; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -57,6 +58,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -163,27 +165,36 @@ public class HiveMetaStoreCache { LOG.debug("load #{} partitions for {} in catalog {}", partitionNames.size(), key, catalog.getName()); } Map<Long, PartitionItem> idToPartitionItem = Maps.newHashMapWithExpectedSize(partitionNames.size()); + Map<String, Long> partitionNameToIdMap = Maps.newHashMapWithExpectedSize(partitionNames.size()); + Map<Long, List<UniqueId>> idToUniqueIdsMap = Maps.newHashMapWithExpectedSize(partitionNames.size()); long idx = 0; for (String partitionName : partitionNames) { - idToPartitionItem.put(idx++, toListPartitionItem(partitionName, key.types)); + long partitionId = idx++; + ListPartitionItem listPartitionItem = toListPartitionItem(partitionName, key.types); + idToPartitionItem.put(partitionId, listPartitionItem); + partitionNameToIdMap.put(partitionName, partitionId); } Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = null; Map<Range<PartitionKey>, UniqueId> rangeToId = null; RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null; + Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = null; if (key.types.size() > 1) { // uidToPartitionRange and rangeToId are only used for multi-column partition - uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem); + uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem, idToUniqueIdsMap); rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange); } else { Preconditions.checkState(key.types.size() == 1, key.types); // singleColumnRangeMap is only used for single-column partition - singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem); + singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem, idToUniqueIdsMap); + singleUidToColumnRangeMap = ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap); } - return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, rangeToId, singleColumnRangeMap); + Map<Long, List<String>> partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem); + return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, rangeToId, singleColumnRangeMap, idx, + partitionNameToIdMap, idToUniqueIdsMap, singleUidToColumnRangeMap, partitionValuesMap); } - private ListPartitionItem toListPartitionItem(String partitionName, List<Type> types) { + public ListPartitionItem toListPartitionItem(String partitionName, List<Type> types) { // Partition name will be in format: nation=cn/city=beijing // parse it to get values "cn" and "beijing" String[] parts = partitionName.split("/"); @@ -274,6 +285,10 @@ public class HiveMetaStoreCache { public HivePartitionValues getPartitionValues(String dbName, String tblName, List<Type> types) { PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, types); + return getPartitionValues(key); + } + + public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) { try { return partitionValuesCache.get(key); } catch (ExecutionException e) { @@ -350,6 +365,21 @@ public class HiveMetaStoreCache { } } + public void invalidatePartitionCache(String dbName, String tblName, String partitionName) { + PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, null); + HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key); + if (partitionValues != null) { + Long partitionId = partitionValues.partitionNameToIdMap.get(partitionName); + List<String> values = partitionValues.partitionValuesMap.get(partitionId); + PartitionCacheKey partKey = new PartitionCacheKey(dbName, tblName, values); + HivePartition partition = partitionCache.getIfPresent(partKey); + if (partition != null) { + fileCache.invalidate(new FileCacheKey(partition.getPath(), null)); + partitionCache.invalidate(partKey); + } + } + } + public void invalidateDbCache(String dbName) { long start = System.currentTimeMillis(); Set<PartitionValueCacheKey> keys = partitionValuesCache.asMap().keySet(); @@ -369,6 +399,113 @@ public class HiveMetaStoreCache { LOG.debug("invalid all meta cache in catalog {}", catalog.getName()); } + // partition name format: nation=cn/city=beijing + public void addPartitionsCache(String dbName, String tblName, List<String> partitionNames, + List<Type> partitionColumnTypes) { + PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, partitionColumnTypes); + HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key); + if (partitionValues == null) { + return; + } + HivePartitionValues copy = partitionValues.copy(); + Map<Long, PartitionItem> idToPartitionItemBefore = copy.getIdToPartitionItem(); + Map<String, Long> partitionNameToIdMapBefore = copy.getPartitionNameToIdMap(); + Map<Long, List<UniqueId>> idToUniqueIdsMap = copy.getIdToUniqueIdsMap(); + Map<Long, PartitionItem> idToPartitionItem = new HashMap<>(); + long idx = copy.getNextPartitionId(); + for (String partitionName : partitionNames) { + if (partitionNameToIdMapBefore.containsKey(partitionName)) { + LOG.info("addPartitionsCache partitionName:[{}] has exist in table:[{}]", partitionName, tblName); + continue; + } + long partitionId = idx++; + ListPartitionItem listPartitionItem = toListPartitionItem(partitionName, key.types); + idToPartitionItemBefore.put(partitionId, listPartitionItem); + idToPartitionItem.put(partitionId, listPartitionItem); + partitionNameToIdMapBefore.put(partitionName, partitionId); + } + Map<Long, List<String>> partitionValuesMapBefore = copy.getPartitionValuesMap(); + Map<Long, List<String>> partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem); + partitionValuesMapBefore.putAll(partitionValuesMap); + copy.setNextPartitionId(idx); + if (key.types.size() > 1) { + Map<UniqueId, Range<PartitionKey>> uidToPartitionRangeBefore = copy.getUidToPartitionRange(); + // uidToPartitionRange and rangeToId are only used for multi-column partition + Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = ListPartitionPrunerV2 + .genUidToPartitionRange(idToPartitionItem, idToUniqueIdsMap); + uidToPartitionRangeBefore.putAll(uidToPartitionRange); + Map<Range<PartitionKey>, UniqueId> rangeToIdBefore = copy.getRangeToId(); + Map<Range<PartitionKey>, UniqueId> rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange); + rangeToIdBefore.putAll(rangeToId); + } else { + Preconditions.checkState(key.types.size() == 1, key.types); + // singleColumnRangeMap is only used for single-column partition + RangeMap<ColumnBound, UniqueId> singleColumnRangeMapBefore = copy.getSingleColumnRangeMap(); + RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = ListPartitionPrunerV2 + .genSingleColumnRangeMap(idToPartitionItem, idToUniqueIdsMap); + singleColumnRangeMapBefore.putAll(singleColumnRangeMap); + Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMapBefore = copy + .getSingleUidToColumnRangeMap(); + Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = ListPartitionPrunerV2 + .genSingleUidToColumnRange(singleColumnRangeMap); + singleUidToColumnRangeMapBefore.putAll(singleUidToColumnRangeMap); + } + HivePartitionValues partitionValuesCur = partitionValuesCache.getIfPresent(key); + if (partitionValuesCur == partitionValues) { + partitionValuesCache.put(key, copy); + } + } + + public void dropPartitionsCache(String dbName, String tblName, List<String> partitionNames, + List<Type> partitionColumnTypes, boolean invalidPartitionCache) { + PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, partitionColumnTypes); + HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key); + if (partitionValues == null) { + return; + } + HivePartitionValues copy = partitionValues.copy(); + Map<String, Long> partitionNameToIdMapBefore = copy.getPartitionNameToIdMap(); + Map<Long, PartitionItem> idToPartitionItemBefore = copy.getIdToPartitionItem(); + Map<Long, List<UniqueId>> idToUniqueIdsMapBefore = copy.getIdToUniqueIdsMap(); + Map<UniqueId, Range<PartitionKey>> uidToPartitionRangeBefore = copy.getUidToPartitionRange(); + Map<Range<PartitionKey>, UniqueId> rangeToIdBefore = copy.getRangeToId(); + RangeMap<ColumnBound, UniqueId> singleColumnRangeMapBefore = copy.getSingleColumnRangeMap(); + Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMapBefore = copy.getSingleUidToColumnRangeMap(); + Map<Long, List<String>> partitionValuesMap = copy.getPartitionValuesMap(); + for (String partitionName : partitionNames) { + if (!partitionNameToIdMapBefore.containsKey(partitionName)) { + LOG.info("dropPartitionsCache partitionName:[{}] not exist in table:[{}]", partitionName, tblName); + continue; + } + Long partitionId = partitionNameToIdMapBefore.remove(partitionName); + idToPartitionItemBefore.remove(partitionId); + partitionValuesMap.remove(partitionId); + List<UniqueId> uniqueIds = idToUniqueIdsMapBefore.remove(partitionId); + if (key.types.size() > 1) { + for (UniqueId uniqueId : uniqueIds) { + Range<PartitionKey> range = uidToPartitionRangeBefore.remove(uniqueId); + rangeToIdBefore.remove(range); + } + } else { + for (UniqueId uniqueId : uniqueIds) { + Range<ColumnBound> range = singleUidToColumnRangeMapBefore.remove(uniqueId); + singleColumnRangeMapBefore.remove(range); + } + } + if (invalidPartitionCache) { + invalidatePartitionCache(dbName, tblName, partitionName); + } + } + HivePartitionValues partitionValuesCur = partitionValuesCache.getIfPresent(key); + if (partitionValuesCur == partitionValues) { + partitionValuesCache.put(key, copy); + } + } + + public void putPartitionValuesCacheForTest(PartitionValueCacheKey key, HivePartitionValues values) { + partitionValuesCache.put(key, values); + } + /** * The Key of hive partition value cache */ @@ -480,24 +617,58 @@ public class HiveMetaStoreCache { @Data public static class HivePartitionValues { + private long nextPartitionId; + private Map<String, Long> partitionNameToIdMap; + private Map<Long, List<UniqueId>> idToUniqueIdsMap; private Map<Long, PartitionItem> idToPartitionItem; - private Map<Long, List<String>> partitionValuesMap = Maps.newHashMap(); + private Map<Long, List<String>> partitionValuesMap; + //multi pair private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange; private Map<Range<PartitionKey>, UniqueId> rangeToId; + //single pair private RangeMap<ColumnBound, UniqueId> singleColumnRangeMap; + private Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap; + + public HivePartitionValues() { + } public HivePartitionValues(Map<Long, PartitionItem> idToPartitionItem, Map<UniqueId, Range<PartitionKey>> uidToPartitionRange, Map<Range<PartitionKey>, UniqueId> rangeToId, - RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) { + RangeMap<ColumnBound, UniqueId> singleColumnRangeMap, + long nextPartitionId, + Map<String, Long> partitionNameToIdMap, + Map<Long, List<UniqueId>> idToUniqueIdsMap, + Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap, + Map<Long, List<String>> partitionValuesMap) { this.idToPartitionItem = idToPartitionItem; - for (Map.Entry<Long, PartitionItem> entry : this.idToPartitionItem.entrySet()) { - partitionValuesMap.put(entry.getKey(), - ((ListPartitionItem) entry.getValue()).getItems().get(0).getPartitionValuesAsStringList()); - } this.uidToPartitionRange = uidToPartitionRange; this.rangeToId = rangeToId; this.singleColumnRangeMap = singleColumnRangeMap; + this.nextPartitionId = nextPartitionId; + this.partitionNameToIdMap = partitionNameToIdMap; + this.idToUniqueIdsMap = idToUniqueIdsMap; + this.singleUidToColumnRangeMap = singleUidToColumnRangeMap; + this.partitionValuesMap = partitionValuesMap; + } + + public HivePartitionValues copy() { + HivePartitionValues copy = new HivePartitionValues(); + copy.setNextPartitionId(nextPartitionId); + copy.setPartitionNameToIdMap(partitionNameToIdMap == null ? null : Maps.newHashMap(partitionNameToIdMap)); + copy.setIdToUniqueIdsMap(idToUniqueIdsMap == null ? null : Maps.newHashMap(idToUniqueIdsMap)); + copy.setIdToPartitionItem(idToPartitionItem == null ? null : Maps.newHashMap(idToPartitionItem)); + copy.setPartitionValuesMap(partitionValuesMap == null ? null : Maps.newHashMap(partitionValuesMap)); + copy.setUidToPartitionRange(uidToPartitionRange == null ? null : Maps.newHashMap(uidToPartitionRange)); + copy.setRangeToId(rangeToId == null ? null : Maps.newHashMap(rangeToId)); + copy.setSingleUidToColumnRangeMap( + singleUidToColumnRangeMap == null ? null : Maps.newHashMap(singleUidToColumnRangeMap)); + if (singleColumnRangeMap != null) { + RangeMap<ColumnBound, UniqueId> copySingleColumnRangeMap = TreeRangeMap.create(); + copySingleColumnRangeMap.putAll(singleColumnRangeMap); + copy.setSingleColumnRangeMap(copySingleColumnRangeMap); + } + return copy; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java new file mode 100644 index 0000000000..004efa3d15 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * MetastoreEvent for ADD_PARTITION event type + */ +public class AddPartitionEvent extends MetastoreTableEvent { + private final Table hmsTbl; + private final List<String> partitionNames; + + private AddPartitionEvent(NotificationEvent event, + String catalogName) { + super(event, catalogName); + Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ADD_PARTITION)); + Preconditions + .checkNotNull(event.getMessage(), debugString("Event message is null")); + try { + AddPartitionMessage addPartitionMessage = + MetastoreEventsProcessor.getMessageDeserializer() + .getAddPartitionMessage(event.getMessage()); + hmsTbl = Preconditions.checkNotNull(addPartitionMessage.getTableObj()); + Iterable<Partition> addedPartitions = addPartitionMessage.getPartitionObjs(); + partitionNames = new ArrayList<>(); + List<String> partitionColNames = hmsTbl.getPartitionKeys().stream() + .map(FieldSchema::getName).collect(Collectors.toList()); + addedPartitions.forEach(partition -> partitionNames.add( + FileUtils.makePartName(partitionColNames, partition.getValues()))); + } catch (Exception ex) { + throw new MetastoreNotificationException(ex); + } + } + + protected static List<MetastoreEvent> getEvents(NotificationEvent event, + String catalogName) { + return Lists.newArrayList(new AddPartitionEvent(event, catalogName)); + } + + @Override + protected void process() throws MetastoreNotificationException { + try { + infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNames:[{}]", catalogName, dbName, tblName, + partitionNames.toString()); + // bail out early if there are not partitions to process + if (partitionNames.isEmpty()) { + infoLog("Partition list is empty. Ignoring this event."); + return; + } + Env.getCurrentEnv().getCatalogMgr() + .addExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames); + } catch (DdlException e) { + throw new MetastoreNotificationException( + debugString("Failed to process event")); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java similarity index 61% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java index 4d2dc1a178..59445a5dc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java @@ -18,26 +18,31 @@ package org.apache.doris.datasource.hive.event; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import java.util.List; /** - * An event type which is ignored. Useful for unsupported metastore event types + * MetastoreEvent for Alter_DATABASE event type */ -public class IgnoredEvent extends MetastoreEvent { - protected IgnoredEvent(NotificationEvent event, String catalogName) { +public class AlterDatabaseEvent extends MetastoreEvent { + + private AlterDatabaseEvent(NotificationEvent event, + String catalogName) { super(event, catalogName); + Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_DATABASE)); } - private static List<MetastoreEvent> getEvents(NotificationEvent event, + protected static List<MetastoreEvent> getEvents(NotificationEvent event, String catalogName) { - return Lists.newArrayList(new IgnoredEvent(event, catalogName)); + return Lists.newArrayList(new AlterDatabaseEvent(event, catalogName)); } @Override - public void process() { - debugLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType()); + protected void process() throws MetastoreNotificationException { + // only can change properties,we do nothing + infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java new file mode 100644 index 0000000000..b2ba44f842 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * MetastoreEvent for ALTER_PARTITION event type + */ +public class AlterPartitionEvent extends MetastoreTableEvent { + private final Table hmsTbl; + private final org.apache.hadoop.hive.metastore.api.Partition partitionAfter; + private final org.apache.hadoop.hive.metastore.api.Partition partitionBefore; + private final String partitionNameBefore; + private final String partitionNameAfter; + // true if this alter event was due to a rename operation + private final boolean isRename; + + private AlterPartitionEvent(NotificationEvent event, + String catalogName) { + super(event, catalogName); + Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_PARTITION)); + Preconditions + .checkNotNull(event.getMessage(), debugString("Event message is null")); + try { + AlterPartitionMessage alterPartitionMessage = + MetastoreEventsProcessor.getMessageDeserializer() + .getAlterPartitionMessage(event.getMessage()); + hmsTbl = Preconditions.checkNotNull(alterPartitionMessage.getTableObj()); + partitionBefore = Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore()); + partitionAfter = Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter()); + List<String> partitionColNames = hmsTbl.getPartitionKeys().stream() + .map(FieldSchema::getName).collect(Collectors.toList()); + partitionNameBefore = FileUtils.makePartName(partitionColNames, partitionBefore.getValues()); + partitionNameAfter = FileUtils.makePartName(partitionColNames, partitionAfter.getValues()); + isRename = !partitionNameBefore.equalsIgnoreCase(partitionNameAfter); + } catch (Exception ex) { + throw new MetastoreNotificationException(ex); + } + } + + protected static List<MetastoreEvent> getEvents(NotificationEvent event, + String catalogName) { + return Lists.newArrayList(new AlterPartitionEvent(event, catalogName)); + } + + @Override + protected void process() throws MetastoreNotificationException { + try { + infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNameBefore:[{}],partitionNameAfter:[{}]", + catalogName, dbName, tblName, partitionNameBefore, partitionNameAfter); + if (isRename) { + Env.getCurrentEnv().getCatalogMgr() + .dropExternalPartitions(catalogName, dbName, tblName, Lists.newArrayList(partitionNameBefore)); + Env.getCurrentEnv().getCatalogMgr() + .addExternalPartitions(catalogName, dbName, tblName, Lists.newArrayList(partitionNameAfter)); + } else { + Env.getCurrentEnv().getCatalogMgr() + .refreshExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), + Lists.newArrayList(partitionNameAfter)); + } + } catch (DdlException e) { + throw new MetastoreNotificationException( + debugString("Failed to process event")); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java new file mode 100644 index 0000000000..b14a1c6577 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage; + +import java.util.List; + +/** + * MetastoreEvent for ALTER_TABLE event type + */ +public class AlterTableEvent extends MetastoreTableEvent { + // the table object before alter operation + private final Table tableBefore; + // the table object after alter operation + private final Table tableAfter; + + // true if this alter event was due to a rename operation + private final boolean isRename; + + private AlterTableEvent(NotificationEvent event, String catalogName) { + super(event, catalogName); + Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(getEventType())); + Preconditions + .checkNotNull(event.getMessage(), debugString("Event message is null")); + try { + JSONAlterTableMessage alterTableMessage = + (JSONAlterTableMessage) MetastoreEventsProcessor.getMessageDeserializer() + .getAlterTableMessage(event.getMessage()); + tableAfter = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter()); + tableBefore = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore()); + } catch (Exception e) { + throw new MetastoreNotificationException( + debugString("Unable to parse the alter table message"), e); + } + // this is a rename event if either dbName or tblName of before and after object changed + isRename = !tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName()) + || !tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName()); + + } + + public static List<MetastoreEvent> getEvents(NotificationEvent event, + String catalogName) { + return Lists.newArrayList(new AlterTableEvent(event, catalogName)); + } + + + private void processRename() throws DdlException { + if (!isRename) { + return; + } + boolean hasExist = Env.getCurrentEnv().getCatalogMgr() + .externalTableExistInLocal(tableAfter.getDbName(), tableAfter.getTableName(), catalogName); + if (hasExist) { + infoLog("AlterExternalTable canceled,because tableAfter has exist, " + + "catalogName:[{}],dbName:[{}],tableName:[{}]", + catalogName, dbName, tableAfter.getTableName()); + return; + } + Env.getCurrentEnv().getCatalogMgr() + .dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName); + Env.getCurrentEnv().getCatalogMgr() + .createExternalTable(tableAfter.getDbName(), tableAfter.getTableName(), catalogName); + + } + + /** + * If the ALTER_TABLE event is due a table rename, this method removes the old table + * and creates a new table with the new name. Else, we just refresh table + */ + @Override + protected void process() throws MetastoreNotificationException { + try { + infoLog("catalogName:[{}],dbName:[{}],tableBefore:[{}],tableAfter:[{}]", catalogName, dbName, + tableBefore.getTableName(), tableAfter.getTableName()); + if (isRename) { + processRename(); + return; + } + //The scope of refresh can be narrowed in the future + Env.getCurrentEnv().getCatalogMgr() + .refreshExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName); + } catch (Exception e) { + throw new MetastoreNotificationException( + debugString("Failed to process event")); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java similarity index 51% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java index 4d2dc1a178..48476bce57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java @@ -18,26 +18,40 @@ package org.apache.doris.datasource.hive.event; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; + +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import java.util.List; /** - * An event type which is ignored. Useful for unsupported metastore event types + * MetastoreEvent for CREATE_DATABASE event type */ -public class IgnoredEvent extends MetastoreEvent { - protected IgnoredEvent(NotificationEvent event, String catalogName) { +public class CreateDatabaseEvent extends MetastoreEvent { + + private CreateDatabaseEvent(NotificationEvent event, + String catalogName) { super(event, catalogName); + Preconditions.checkArgument(getEventType().equals(MetastoreEventType.CREATE_DATABASE)); } - private static List<MetastoreEvent> getEvents(NotificationEvent event, + protected static List<MetastoreEvent> getEvents(NotificationEvent event, String catalogName) { - return Lists.newArrayList(new IgnoredEvent(event, catalogName)); + return Lists.newArrayList(new CreateDatabaseEvent(event, catalogName)); } @Override - public void process() { - debugLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType()); + protected void process() throws MetastoreNotificationException { + try { + infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName); + Env.getCurrentEnv().getCatalogMgr() + .createExternalDatabase(dbName, catalogName); + } catch (DdlException e) { + throw new MetastoreNotificationException( + debugString("Failed to process event")); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java new file mode 100644 index 0000000000..e4e8e8bb4b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; + +import java.util.List; + +/** + * MetastoreEvent for CREATE_TABLE event type + */ +public class CreateTableEvent extends MetastoreTableEvent { + private final Table hmsTbl; + + private CreateTableEvent(NotificationEvent event, String catalogName) throws MetastoreNotificationException { + super(event, catalogName); + Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(getEventType())); + Preconditions + .checkNotNull(event.getMessage(), debugString("Event message is null")); + try { + CreateTableMessage createTableMessage = + MetastoreEventsProcessor.getMessageDeserializer().getCreateTableMessage(event.getMessage()); + hmsTbl = Preconditions.checkNotNull(createTableMessage.getTableObj()); + } catch (Exception e) { + throw new MetastoreNotificationException( + debugString("Unable to deserialize the event message"), e); + } + } + + public static List<MetastoreEvent> getEvents(NotificationEvent event, String catalogName) { + return Lists.newArrayList(new CreateTableEvent(event, catalogName)); + } + + @Override + protected void process() throws MetastoreNotificationException { + try { + infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tblName); + boolean hasExist = Env.getCurrentEnv().getCatalogMgr() + .externalTableExistInLocal(dbName, hmsTbl.getTableName(), catalogName); + if (hasExist) { + infoLog( + "CreateExternalTable canceled,because table has exist," + + "catalogName:[{}],dbName:[{}],tableName:[{}]", + catalogName, dbName, tblName); + return; + } + Env.getCurrentEnv().getCatalogMgr().createExternalTable(dbName, hmsTbl.getTableName(), catalogName); + } catch (DdlException e) { + throw new MetastoreNotificationException( + debugString("Failed to process event")); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java similarity index 52% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java index 4d2dc1a178..b51145a258 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java @@ -18,26 +18,40 @@ package org.apache.doris.datasource.hive.event; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; + +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import java.util.List; /** - * An event type which is ignored. Useful for unsupported metastore event types + * MetastoreEvent for DROP_DATABASE event type */ -public class IgnoredEvent extends MetastoreEvent { - protected IgnoredEvent(NotificationEvent event, String catalogName) { +public class DropDatabaseEvent extends MetastoreEvent { + + private DropDatabaseEvent(NotificationEvent event, + String catalogName) { super(event, catalogName); + Preconditions.checkArgument(getEventType().equals(MetastoreEventType.DROP_DATABASE)); } - private static List<MetastoreEvent> getEvents(NotificationEvent event, + protected static List<MetastoreEvent> getEvents(NotificationEvent event, String catalogName) { - return Lists.newArrayList(new IgnoredEvent(event, catalogName)); + return Lists.newArrayList(new DropDatabaseEvent(event, catalogName)); } @Override - public void process() { - debugLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType()); + protected void process() throws MetastoreNotificationException { + try { + infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName); + Env.getCurrentEnv().getCatalogMgr() + .dropExternalDatabase(dbName, catalogName); + } catch (DdlException e) { + throw new MetastoreNotificationException( + debugString("Failed to process event")); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java new file mode 100644 index 0000000000..cd5ed5bfbd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * MetastoreEvent for ADD_PARTITION event type + */ +public class DropPartitionEvent extends MetastoreTableEvent { + private final Table hmsTbl; + private final List<String> partitionNames; + + private DropPartitionEvent(NotificationEvent event, + String catalogName) { + super(event, catalogName); + Preconditions.checkArgument(getEventType().equals(MetastoreEventType.DROP_PARTITION)); + Preconditions + .checkNotNull(event.getMessage(), debugString("Event message is null")); + try { + DropPartitionMessage dropPartitionMessage = + MetastoreEventsProcessor.getMessageDeserializer() + .getDropPartitionMessage(event.getMessage()); + hmsTbl = Preconditions.checkNotNull(dropPartitionMessage.getTableObj()); + List<Map<String, String>> droppedPartitions = dropPartitionMessage.getPartitions(); + partitionNames = new ArrayList<>(); + List<String> partitionColNames = hmsTbl.getPartitionKeys().stream() + .map(FieldSchema::getName).collect(Collectors.toList()); + droppedPartitions.forEach(partition -> partitionNames.add( + getPartitionName(partition, partitionColNames))); + } catch (Exception ex) { + throw new MetastoreNotificationException(ex); + } + } + + protected static List<MetastoreEvent> getEvents(NotificationEvent event, + String catalogName) { + return Lists.newArrayList( + new DropPartitionEvent(event, catalogName)); + } + + @Override + protected void process() throws MetastoreNotificationException { + try { + infoLog("catalogName:[{}],dbName:[{}],tableName:[{}],partitionNames:[{}]", catalogName, dbName, tblName, + partitionNames.toString()); + // bail out early if there are not partitions to process + if (partitionNames.isEmpty()) { + infoLog("Partition list is empty. Ignoring this event."); + return; + } + Env.getCurrentEnv().getCatalogMgr() + .dropExternalPartitions(catalogName, dbName, hmsTbl.getTableName(), partitionNames); + } catch (DdlException e) { + throw new MetastoreNotificationException( + debugString("Failed to process event")); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java index 8647e47b78..2b84c1bb42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java @@ -25,8 +25,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.List; @@ -34,25 +32,21 @@ import java.util.List; * MetastoreEvent for DROP_TABLE event type */ public class DropTableEvent extends MetastoreTableEvent { - private static final Logger LOG = LogManager.getLogger(DropTableEvent.class); - private final String dbName; private final String tableName; private DropTableEvent(NotificationEvent event, String catalogName) { super(event, catalogName); Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType())); - JSONDropTableMessage dropTableMessage = - (JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer() - .getDropTableMessage(event.getMessage()); + Preconditions + .checkNotNull(event.getMessage(), debugString("Event message is null")); try { - dbName = dropTableMessage.getDB(); + JSONDropTableMessage dropTableMessage = + (JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer() + .getDropTableMessage(event.getMessage()); tableName = dropTableMessage.getTable(); } catch (Exception e) { - throw new MetastoreNotificationException(debugString( - "Could not parse event message. " - + "Check if %s is set to true in metastore configuration", - MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e); + throw new MetastoreNotificationException(e); } } @@ -61,29 +55,14 @@ public class DropTableEvent extends MetastoreTableEvent { return Lists.newArrayList(new DropTableEvent(event, catalogName)); } - @Override - protected boolean existInCache() { - return true; - } - - @Override - protected boolean canBeSkipped() { - return false; - } - - protected boolean isSupported() { - return true; - } - @Override protected void process() throws MetastoreNotificationException { try { - LOG.info("DropTable event process,catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, - tableName); + infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tableName); Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName, tableName, catalogName); } catch (DdlException e) { - LOG.warn("DropExternalTable failed,dbName:[{}],tableName:[{}],catalogName:[{}].", dbName, tableName, - catalogName, e); + throw new MetastoreNotificationException( + debugString("Failed to process event")); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java index 4d2dc1a178..d504c2917f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java @@ -27,17 +27,17 @@ import java.util.List; * An event type which is ignored. Useful for unsupported metastore event types */ public class IgnoredEvent extends MetastoreEvent { - protected IgnoredEvent(NotificationEvent event, String catalogName) { + private IgnoredEvent(NotificationEvent event, String catalogName) { super(event, catalogName); } - private static List<MetastoreEvent> getEvents(NotificationEvent event, + protected static List<MetastoreEvent> getEvents(NotificationEvent event, String catalogName) { return Lists.newArrayList(new IgnoredEvent(event, catalogName)); } @Override public void process() { - debugLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType()); + infoLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java index 5cc4594457..132496a9f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java @@ -22,6 +22,9 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; +import java.util.Map; + /** * Abstract base class for all MetastoreEvents. A MetastoreEvent is an object used to * process a NotificationEvent received from metastore. @@ -105,11 +108,6 @@ public abstract class MetastoreEvent { return null; } - - protected boolean existInCache() throws MetastoreNotificationException { - return false; - } - /** * Returns the number of events represented by this event. For most events this is 1. * In case of batch events this could be more than 1. @@ -128,14 +126,6 @@ public abstract class MetastoreEvent { return false; } - /** - * Whether the current version of FE supports processing of some events, some events are reserved, - * and may be processed later version. - */ - protected boolean isSupported() { - return false; - } - /** * Process the information available in the NotificationEvent. */ @@ -196,6 +186,26 @@ public abstract class MetastoreEvent { LOG.debug(formatString, formatArgs); } + protected String getPartitionName(Map<String, String> part, List<String> partitionColNames) { + if (part.size() == 0) { + return ""; + } + if (partitionColNames.size() != part.size()) { + return ""; + } + StringBuilder name = new StringBuilder(); + int i = 0; + for (String colName : partitionColNames) { + if (i++ > 0) { + name.append("/"); + } + name.append(colName); + name.append("="); + name.append(part.get(colName)); + } + return name.toString(); + } + @Override public String toString() { return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java index 2719158c8e..ce96ce62e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java @@ -26,9 +26,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; /** * Factory class to create various MetastoreEvents. @@ -42,31 +40,36 @@ public class MetastoreEventFactory implements EventFactory { Preconditions.checkNotNull(event.getEventType()); MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType()); switch (metastoreEventType) { + case CREATE_TABLE: + return CreateTableEvent.getEvents(event, catalogName); case DROP_TABLE: return DropTableEvent.getEvents(event, catalogName); + case ALTER_TABLE: + return AlterTableEvent.getEvents(event, catalogName); + case CREATE_DATABASE: + return CreateDatabaseEvent.getEvents(event, catalogName); + case DROP_DATABASE: + return DropDatabaseEvent.getEvents(event, catalogName); + case ALTER_DATABASE: + return AlterDatabaseEvent.getEvents(event, catalogName); + case ADD_PARTITION: + return AddPartitionEvent.getEvents(event, catalogName); + case DROP_PARTITION: + return DropPartitionEvent.getEvents(event, catalogName); + case ALTER_PARTITION: + return AlterPartitionEvent.getEvents(event, catalogName); default: // ignore all the unknown events by creating a IgnoredEvent - return Lists.newArrayList(new IgnoredEvent(event, catalogName)); + return IgnoredEvent.getEvents(event, catalogName); } } List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) { List<MetastoreEvent> metastoreEvents = Lists.newArrayList(); - for (NotificationEvent event : events) { metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, hmsExternalCatalog.getName())); } - - List<MetastoreEvent> tobeProcessEvents = metastoreEvents.stream() - .filter(MetastoreEvent::isSupported) - .collect(Collectors.toList()); - - if (tobeProcessEvents.isEmpty()) { - LOG.info("The metastore events to process is empty on catalog {}", hmsExternalCatalog.getName()); - return Collections.emptyList(); - } - - return createBatchEvents(tobeProcessEvents); + return createBatchEvents(metastoreEvents); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 232509f60b..070eec0725 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -713,6 +713,12 @@ public class JournalEntity implements Writable { } case OperationType.OP_REFRESH_EXTERNAL_DB: case OperationType.OP_DROP_EXTERNAL_TABLE: + case OperationType.OP_CREATE_EXTERNAL_TABLE: + case OperationType.OP_DROP_EXTERNAL_DB: + case OperationType.OP_CREATE_EXTERNAL_DB: + case OperationType.OP_ADD_EXTERNAL_PARTITIONS: + case OperationType.OP_DROP_EXTERNAL_PARTITIONS: + case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: case OperationType.OP_REFRESH_EXTERNAL_TABLE: { data = ExternalObjectLog.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index fe757ff3aa..a3a54232bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -959,6 +959,36 @@ public class EditLog { env.getCatalogMgr().replayDropExternalTable(log); break; } + case OperationType.OP_CREATE_EXTERNAL_TABLE: { + final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); + env.getCatalogMgr().replayCreateExternalTable(log); + break; + } + case OperationType.OP_DROP_EXTERNAL_DB: { + final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); + env.getCatalogMgr().replayDropExternalDatabase(log); + break; + } + case OperationType.OP_CREATE_EXTERNAL_DB: { + final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); + env.getCatalogMgr().replayCreateExternalDatabase(log); + break; + } + case OperationType.OP_ADD_EXTERNAL_PARTITIONS: { + final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); + env.getCatalogMgr().replayAddExternalPartitions(log); + break; + } + case OperationType.OP_DROP_EXTERNAL_PARTITIONS: { + final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); + env.getCatalogMgr().replayDropExternalPartitions(log); + break; + } + case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: { + final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); + env.getCatalogMgr().replayRefreshExternalPartitions(log); + break; + } case OperationType.OP_INIT_EXTERNAL_TABLE: { // Do nothing. break; @@ -1650,6 +1680,30 @@ public class EditLog { logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log); } + public void logCreateExternalTable(ExternalObjectLog log) { + logEdit(OperationType.OP_CREATE_EXTERNAL_TABLE, log); + } + + public void logDropExternalDatabase(ExternalObjectLog log) { + logEdit(OperationType.OP_DROP_EXTERNAL_DB, log); + } + + public void logCreateExternalDatabase(ExternalObjectLog log) { + logEdit(OperationType.OP_CREATE_EXTERNAL_DB, log); + } + + public void logAddExternalPartitions(ExternalObjectLog log) { + logEdit(OperationType.OP_ADD_EXTERNAL_PARTITIONS, log); + } + + public void logDropExternalPartitions(ExternalObjectLog log) { + logEdit(OperationType.OP_DROP_EXTERNAL_PARTITIONS, log); + } + + public void logInvalidateExternalPartitions(ExternalObjectLog log) { + logEdit(OperationType.OP_REFRESH_EXTERNAL_PARTITIONS, log); + } + public Journal getJournal() { return this.journal; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 13009ba85d..813b4c4c49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -258,6 +258,12 @@ public class OperationType { public static final short OP_ALTER_MTMV_TASK = 342; public static final short OP_DROP_EXTERNAL_TABLE = 350; + public static final short OP_DROP_EXTERNAL_DB = 351; + public static final short OP_CREATE_EXTERNAL_TABLE = 352; + public static final short OP_CREATE_EXTERNAL_DB = 353; + public static final short OP_ADD_EXTERNAL_PARTITIONS = 354; + public static final short OP_DROP_EXTERNAL_PARTITIONS = 355; + public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356; public static final short OP_ALTER_USER = 400; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java index bbefea50e3..fcae3c4ecd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ListPartitionPrunerV2.java @@ -18,6 +18,7 @@ package org.apache.doris.planner; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.common.AnalysisException; @@ -33,8 +34,10 @@ import com.google.common.collect.TreeRangeMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -59,7 +62,7 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { super(idToPartitionItem, partitionColumns, columnNameToRange); this.uidToPartitionRange = Maps.newHashMap(); if (partitionColumns.size() > 1) { - this.uidToPartitionRange = genUidToPartitionRange(idToPartitionItem); + this.uidToPartitionRange = genUidToPartitionRange(idToPartitionItem, new HashMap<>()); this.rangeToId = genRangeToId(uidToPartitionRange); } } @@ -77,16 +80,21 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { } public static Map<UniqueId, Range<PartitionKey>> genUidToPartitionRange( - Map<Long, PartitionItem> idToPartitionItem) { + Map<Long, PartitionItem> idToPartitionItem, Map<Long, List<UniqueId>> idToUniqueIdsMap) { Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = Maps.newHashMap(); idToPartitionItem.forEach((id, item) -> { List<PartitionKey> keys = item.getItems(); List<Range<PartitionKey>> ranges = keys.stream() .map(key -> Range.closed(key, key)) .collect(Collectors.toList()); + List<UniqueId> uniqueIds = idToUniqueIdsMap.get(id) == null ? new ArrayList<>(ranges.size()) + : idToUniqueIdsMap.get(id); for (int i = 0; i < ranges.size(); i++) { - uidToPartitionRange.put(new ListPartitionUniqueId(id, i), ranges.get(i)); + ListPartitionUniqueId listPartitionUniqueId = new ListPartitionUniqueId(id, i); + uidToPartitionRange.put(listPartitionUniqueId, ranges.get(i)); + uniqueIds.add(listPartitionUniqueId); } + idToUniqueIdsMap.put(id, uniqueIds); }); return uidToPartitionRange; } @@ -94,21 +102,35 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { @Override void genSingleColumnRangeMap() { if (singleColumnRangeMap == null) { - singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem); + singleColumnRangeMap = genSingleColumnRangeMap(idToPartitionItem, new HashMap<>()); } } - public static RangeMap<ColumnBound, UniqueId> genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem) { + public static Map<Long, List<String>> getPartitionValuesMap(Map<Long, PartitionItem> idToPartitionItem) { + Map<Long, List<String>> partitionValuesMap = new HashMap<>(); + for (Map.Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) { + partitionValuesMap.put(entry.getKey(), + ((ListPartitionItem) entry.getValue()).getItems().get(0).getPartitionValuesAsStringList()); + } + return partitionValuesMap; + } + + public static RangeMap<ColumnBound, UniqueId> genSingleColumnRangeMap(Map<Long, PartitionItem> idToPartitionItem, + Map<Long, List<UniqueId>> idToUniqueIdsMap) { RangeMap<ColumnBound, UniqueId> candidate = TreeRangeMap.create(); idToPartitionItem.forEach((id, item) -> { List<PartitionKey> keys = item.getItems(); List<Range<PartitionKey>> ranges = keys.stream() .map(key -> Range.closed(key, key)) .collect(Collectors.toList()); + List<UniqueId> uniqueIds = idToUniqueIdsMap.get(id) == null ? new ArrayList<>(ranges.size()) + : idToUniqueIdsMap.get(id); for (int i = 0; i < ranges.size(); i++) { - candidate.put(mapPartitionKeyRange(ranges.get(i), 0), - new ListPartitionUniqueId(id, i)); + ListPartitionUniqueId listPartitionUniqueId = new ListPartitionUniqueId(id, i); + candidate.put(mapPartitionKeyRange(ranges.get(i), 0), listPartitionUniqueId); + uniqueIds.add(listPartitionUniqueId); } + idToUniqueIdsMap.put(id, uniqueIds); }); return candidate; } @@ -151,6 +173,13 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { return rangeToId; } + public static Map<UniqueId, Range<ColumnBound>> genSingleUidToColumnRange( + RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) { + Map<UniqueId, Range<ColumnBound>> uidToColumnRange = Maps.newHashMap(); + singleColumnRangeMap.asMapOfRanges().forEach((columnBound, uid) -> uidToColumnRange.put(uid, columnBound)); + return uidToColumnRange; + } + private Collection<Long> doPruneMultiple(Map<Column, FinalFilters> columnToFilters, Map<Range<PartitionKey>, UniqueId> partitionRangeToUid, int columnIdx) { @@ -178,20 +207,20 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { grouped.forEach(candidateRangeMap::put); return finalFilters.filters.stream() - .map(filter -> { - RangeMap<ColumnBound, List<UniqueId>> filtered = - candidateRangeMap.subRangeMap(filter); - // Find PartitionKey ranges according to filtered UniqueIds. - Map<Range<PartitionKey>, UniqueId> filteredPartitionRange = - filtered.asMapOfRanges().values() - .stream() - .flatMap(List::stream) - .collect(Collectors.toMap(uidToPartitionRange::get, Function.identity())); - return doPruneMultiple(columnToFilters, filteredPartitionRange, - columnIdx + 1); - }) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); + .map(filter -> { + RangeMap<ColumnBound, List<UniqueId>> filtered = + candidateRangeMap.subRangeMap(filter); + // Find PartitionKey ranges according to filtered UniqueIds. + Map<Range<PartitionKey>, UniqueId> filteredPartitionRange = + filtered.asMapOfRanges().values() + .stream() + .flatMap(List::stream) + .collect(Collectors.toMap(uidToPartitionRange::get, Function.identity())); + return doPruneMultiple(columnToFilters, filteredPartitionRange, + columnIdx + 1); + }) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); case NO_FILTERS: default: return doPruneMultiple(columnToFilters, partitionRangeToUid, columnIdx + 1); @@ -215,9 +244,9 @@ public class ListPartitionPrunerV2 extends PartitionPrunerV2Base { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("partitionId", partitionId) - .add("partitionKeyIndex", partitionKeyIndex) - .toString(); + .add("partitionId", partitionId) + .add("partitionKeyIndex", partitionKeyIndex) + .toString(); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java index a75f332f70..f74bf032e5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java @@ -32,8 +32,12 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.EsResource; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ResourceMgr; +import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.EsExternalDatabase; import org.apache.doris.catalog.external.EsExternalTable; import org.apache.doris.catalog.external.HMSExternalDatabase; @@ -41,14 +45,23 @@ import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues; +import org.apache.doris.datasource.hive.HiveMetaStoreCache.PartitionValueCacheKey; import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.planner.ColumnBound; +import org.apache.doris.planner.ListPartitionPrunerV2; +import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSet; import org.apache.doris.system.SystemInfoService; import org.apache.doris.utframe.TestWithFeService; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -58,6 +71,7 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -69,13 +83,14 @@ public class CatalogMgrTest extends TestWithFeService { private static UserIdentity user2; private CatalogMgr mgr; private ResourceMgr resourceMgr; + private ExternalMetaCacheMgr externalMetaCacheMgr; @Override protected void runBeforeAll() throws Exception { FeConstants.runningUnitTest = true; mgr = Env.getCurrentEnv().getCatalogMgr(); resourceMgr = Env.getCurrentEnv().getResourceMgr(); - + externalMetaCacheMgr = Env.getCurrentEnv().getExtMetaCacheMgr(); ConnectContext rootCtx = createDefaultCtx(); env = Env.getCurrentEnv(); auth = env.getAuth(); @@ -417,4 +432,119 @@ public class CatalogMgrTest extends TestWithFeService { } } + @Test + public void testAddMultiColumnPartitionsCache() { + HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog("hive"); + HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog); + PartitionValueCacheKey partitionValueCacheKey = new PartitionValueCacheKey("hiveDb", "hiveTable", + Lists.newArrayList(Type.INT, Type.SMALLINT)); + HivePartitionValues hivePartitionValues = loadPartitionValues(partitionValueCacheKey, + Lists.newArrayList("y=2020/m=1", "y=2020/m=2"), metaStoreCache); + metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues); + metaStoreCache.addPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("y=2020/m=3", "y=2020/m=4"), + partitionValueCacheKey.getTypes()); + HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey); + Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 4); + } + + @Test + public void testDropMultiColumnPartitionsCache() { + HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog("hive"); + HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog); + PartitionValueCacheKey partitionValueCacheKey = new PartitionValueCacheKey("hiveDb", "hiveTable", + Lists.newArrayList(Type.INT, Type.SMALLINT)); + HivePartitionValues hivePartitionValues = loadPartitionValues(partitionValueCacheKey, + Lists.newArrayList("y=2020/m=1", "y=2020/m=2"), metaStoreCache); + metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues); + metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("y=2020/m=1", "y=2020/m=2"), + partitionValueCacheKey.getTypes(), false); + HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey); + Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 0); + } + + @Test + public void testAddSingleColumnPartitionsCache() { + HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog("hive"); + HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog); + PartitionValueCacheKey partitionValueCacheKey = new PartitionValueCacheKey("hiveDb", "hiveTable", + Lists.newArrayList(Type.SMALLINT)); + HivePartitionValues hivePartitionValues = loadPartitionValues(partitionValueCacheKey, + Lists.newArrayList("m=1", "m=2"), metaStoreCache); + metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues); + metaStoreCache.addPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("m=3", "m=4"), + partitionValueCacheKey.getTypes()); + HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey); + Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 4); + } + + @Test + public void testDropSingleColumnPartitionsCache() { + HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog("hive"); + HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog); + PartitionValueCacheKey partitionValueCacheKey = new PartitionValueCacheKey("hiveDb", "hiveTable", + Lists.newArrayList(Type.SMALLINT)); + HivePartitionValues hivePartitionValues = loadPartitionValues(partitionValueCacheKey, + Lists.newArrayList("m=1", "m=2"), metaStoreCache); + metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues); + metaStoreCache.dropPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("m=1", "m=2"), + partitionValueCacheKey.getTypes(), false); + HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey); + Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 0); + } + + @Test + public void testAddPartitionsCacheToLargeTable() { + HMSExternalCatalog hiveCatalog = (HMSExternalCatalog) mgr.getCatalog("hive"); + HiveMetaStoreCache metaStoreCache = externalMetaCacheMgr.getMetaStoreCache(hiveCatalog); + PartitionValueCacheKey partitionValueCacheKey = new PartitionValueCacheKey("hiveDb", "hiveTable", + Lists.newArrayList(Type.INT)); + List<String> pNames = new ArrayList<>(100000); + for (int i = 1; i <= 100000; i++) { + pNames.add("m=" + i); + } + HivePartitionValues hivePartitionValues = loadPartitionValues(partitionValueCacheKey, + pNames, metaStoreCache); + metaStoreCache.putPartitionValuesCacheForTest(partitionValueCacheKey, hivePartitionValues); + long start = System.currentTimeMillis(); + metaStoreCache.addPartitionsCache("hiveDb", "hiveTable", Lists.newArrayList("m=100001"), + partitionValueCacheKey.getTypes()); + //387 in 4c16g + System.out.println("testAddPartitionsCacheToLargeTable use time mills:" + (System.currentTimeMillis() - start)); + HivePartitionValues partitionValues = metaStoreCache.getPartitionValues(partitionValueCacheKey); + Assert.assertEquals(partitionValues.getPartitionNameToIdMap().size(), 100001); + } + + private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key, List<String> partitionNames, + HiveMetaStoreCache metaStoreCache) { + // partition name format: nation=cn/city=beijing + Map<Long, PartitionItem> idToPartitionItem = Maps.newHashMapWithExpectedSize(partitionNames.size()); + Map<String, Long> partitionNameToIdMap = Maps.newHashMapWithExpectedSize(partitionNames.size()); + Map<Long, List<UniqueId>> idToUniqueIdsMap = Maps.newHashMapWithExpectedSize(partitionNames.size()); + long idx = 0; + for (String partitionName : partitionNames) { + long partitionId = idx++; + ListPartitionItem listPartitionItem = metaStoreCache.toListPartitionItem(partitionName, key.getTypes()); + idToPartitionItem.put(partitionId, listPartitionItem); + partitionNameToIdMap.put(partitionName, partitionId); + } + + Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = null; + Map<Range<PartitionKey>, UniqueId> rangeToId = null; + RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null; + Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = null; + if (key.getTypes().size() > 1) { + // uidToPartitionRange and rangeToId are only used for multi-column partition + uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem, idToUniqueIdsMap); + rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange); + } else { + Preconditions.checkState(key.getTypes().size() == 1, key.getTypes()); + // singleColumnRangeMap is only used for single-column partition + singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem, idToUniqueIdsMap); + singleUidToColumnRangeMap = ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap); + } + Map<Long, List<String>> partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem); + return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, rangeToId, singleColumnRangeMap, idx, + partitionNameToIdMap, idToUniqueIdsMap, singleUidToColumnRangeMap, partitionValuesMap); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org