This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a261c2788e0 [enhance-wip](multi-catalog) Speed up consume rate of hms events. (#27666) a261c2788e0 is described below commit a261c2788e03472e4ee8f46831ad4e2f4762559a Author: Xiangyu Wang <dut.xian...@gmail.com> AuthorDate: Fri Jan 19 15:24:12 2024 +0800 [enhance-wip](multi-catalog) Speed up consume rate of hms events. (#27666) ## Proposed changes The current implement will persist all catalogs/databases of external catalogs, and only the master FE can handle hms events and make all slave nodes replay these events, this will bring some problems: - The hms event processor ( `MetastoreEventsProcessor` ) can not consume the events in time. (Add journal log is a synchronized method, we can not speed up the consume rate by using concurrent processing, and each add-journal-log operation costs about tens of milliseconds) So the meta info of hive maybe out of date. - Slave FE nodes maybe crashed if FE replays the journal logs of hms events failed. (In fact we have fixed some issues about this, but we can not make sure all the issues have been resolved) - There are many journal logs which are produced by hms events, but in fact these logs are not used anymore after FE restart. It makes the start time of all FE nodes very long. Now doris try to persis all databases/tables of external catalogs just to make sure that the dbId/tableId of databases/tables are the same through all FE nodes, it will be used by analysis jobs. In this pr, we use a meta id manager called `ExternalMetaIdMgr` to manage these meta ids. On every loop when master fetches a batch of hms events, it handles the meta ids first and produce only one meta id mappings log, slave FE nodes will replay this log to sync the changes about these meta ids. `MetastoreEventsProcessor` will start on every FE nodes and try to consume these hms events as soon as possible. ## Further comments I've submitted two prs ( #22869 #21589 ) to speed up the consume rate of hms events before, it works fine when there are many `AlterTableEvent` / `DropTableEvent` on hive cluster. But the improvement is not that significant when most of hms events are partition-events. Unfortunately, we performed a cluster upgrade (upgrade spark 2.x to spark 3.x), maybe this is the reason that resulting in the majority of Hive Metastore events became partition-events. This is also the reason for the [...] Based on our observation, after merging this pull request, Doris is now capable of processing thousands of Hive Metastore events per second, compared to the previous capability of handling only a few dozen events. ```java 2023-12-07 05:17:03,518 INFO (replayer|105) [Env.replayJournal():2614] replayed journal id is 18287902, replay to journal id is 18287903 2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventFactory.mergeEvents():188] Event size on catalog [xxx] before merge is [1947], after merge is [1849] 2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955309 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2022-05-27],partitionNameAfter:[partitions=2022-05-27] 2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955310 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20230318],partitionNameAfter:[pday=20230318] 2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955311 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20190826],partitionNameAfter:[pday=20190826] 2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955312 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2021-09-16],partitionNameAfter:[partitions=2021-09-16] 2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955314 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2020-04-26],partitionNameAfter:[partitions=2020-04-26] 2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955315 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20230702],partitionNameAfter:[pday=20230702] 2023-12-07 05:17:03,735 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357955317 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[pday=20211019],partitionNameAfter:[pday=20211019] ... 2023-12-07 05:17:03,989 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357957252 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2021-08-27],partitionNameAfter:[partitions=2021-08-27] 2023-12-07 05:17:03,989 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEvent.infoLog():193] EventId: 357957253 EventType: ALTER_PARTITION catalogName:[xxx],dbName:[xxx],tableName:[xxx],partitionNameBefore:[partitions=2022-02-05],partitionNameAfter:[partitions=2022-02-05] 2023-12-07 05:17:04,661 INFO (replayer|105) [Env.replayJournal():2614] replayed journal id is 18287903, replay to journal id is 18287904 2023-12-07 05:17:05,028 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventsProcessor.realRun():116] Events size are 587 on catalog [xxx] 2023-12-07 05:17:05,662 INFO (org.apache.doris.datasource.hive.event.MetastoreEventsProcessor|37) [MetastoreEventFactory.mergeEvents():188] Event size on catalog [xxx] before merge is [587], after merge is [587] ``` --- .../main/java/org/apache/doris/catalog/Env.java | 17 +- .../doris/catalog/external/ExternalDatabase.java | 20 +- .../catalog/external/HMSExternalDatabase.java | 13 +- .../catalog/external/IcebergExternalDatabase.java | 4 +- .../catalog/external/PaimonExternalDatabase.java | 4 +- .../org/apache/doris/datasource/CatalogMgr.java | 257 +++---------------- .../apache/doris/datasource/ExternalCatalog.java | 14 +- .../apache/doris/datasource/ExternalMetaIdMgr.java | 263 ++++++++++++++++++++ .../doris/datasource/HMSExternalCatalog.java | 107 ++------ .../org/apache/doris/datasource/InitTableLog.java | 67 ----- .../apache/doris/datasource/MetaIdMappingsLog.java | 274 +++++++++++++++++++++ .../datasource/hive/event/AddPartitionEvent.java | 16 ++ .../datasource/hive/event/AlterTableEvent.java | 6 +- .../datasource/hive/event/CreateDatabaseEvent.java | 12 + .../datasource/hive/event/CreateTableEvent.java | 14 +- .../datasource/hive/event/DropDatabaseEvent.java | 11 + .../datasource/hive/event/DropPartitionEvent.java | 17 +- .../datasource/hive/event/DropTableEvent.java | 15 +- .../datasource/hive/event/MetastoreEvent.java | 9 + .../hive/event/MetastoreEventFactory.java | 26 +- .../hive/event/MetastoreEventsProcessor.java | 225 ++++++++++++++--- .../org/apache/doris/journal/JournalEntity.java | 20 +- .../java/org/apache/doris/persist/EditLog.java | 29 +-- .../org/apache/doris/persist/OperationType.java | 9 + .../doris/datasource/ExternalMetaIdMgrTest.java | 76 ++++++ .../doris/datasource/MetaIdMappingsLogTest.java | 97 ++++++++ .../external/hms/MetastoreEventFactoryTest.java | 2 +- 27 files changed, 1138 insertions(+), 486 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 222eb195b34..37930708cc8 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -132,6 +132,7 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.ExternalMetaCacheMgr; +import org.apache.doris.datasource.ExternalMetaIdMgr; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HiveTransactionMgr; import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor; @@ -362,6 +363,7 @@ public class Env { private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector; private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector; private CooldownConfHandler cooldownConfHandler; + private ExternalMetaIdMgr externalMetaIdMgr; private MetastoreEventsProcessor metastoreEventsProcessor; private ExportTaskRegister exportTaskRegister; @@ -649,6 +651,7 @@ public class Env { if (Config.enable_storage_policy) { this.cooldownConfHandler = new CooldownConfHandler(); } + this.externalMetaIdMgr = new ExternalMetaIdMgr(); this.metastoreEventsProcessor = new MetastoreEventsProcessor(); this.jobManager = new JobManager<>(); this.labelProcessor = new LabelProcessor(); @@ -844,6 +847,14 @@ public class Env { return workloadRuntimeStatusMgr; } + public ExternalMetaIdMgr getExternalMetaIdMgr() { + return externalMetaIdMgr; + } + + public MetastoreEventsProcessor getMetastoreEventsProcessor() { + return metastoreEventsProcessor; + } + // use this to get correct ClusterInfoService instance public static SystemInfoService getCurrentSystemInfo() { return getCurrentEnv().getClusterInfo(); @@ -1638,9 +1649,6 @@ public class Env { streamLoadRecordMgr.start(); tabletLoadIndexRecorderMgr.start(); new InternalSchemaInitializer().start(); - if (Config.enable_hms_events_incremental_sync) { - metastoreEventsProcessor.start(); - } getRefreshManager().start(); // binlog gcer @@ -1662,6 +1670,9 @@ public class Env { domainResolver.start(); // fe disk updater feDiskUpdater.start(); + if (Config.enable_hms_events_incremental_sync) { + metastoreEventsProcessor.start(); + } } private void transferToNonMaster(FrontendNodeType newType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index e6584d4a0b6..4af08e9b384 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -146,8 +146,15 @@ public abstract class ExternalDatabase<T extends ExternalTable> Map<Long, T> tmpIdToTbl = Maps.newConcurrentMap(); for (int i = 0; i < log.getRefreshCount(); i++) { T table = getTableForReplay(log.getRefreshTableIds().get(i)); - tmpTableNameToId.put(table.getName(), table.getId()); - tmpIdToTbl.put(table.getId(), table); + // When upgrade cluster with this pr: https://github.com/apache/doris/pull/27666 + // Maybe there are some create table events will be skipped + // if the cluster has any hms catalog(s) with hms event listener enabled. + // So we need add a validation here to avoid table(s) not found, this is just a temporary solution + // because later we will remove all the logics about InitCatalogLog/InitDatabaseLog. + if (table != null) { + tmpTableNameToId.put(table.getName(), table.getId()); + tmpIdToTbl.put(table.getId(), table); + } } for (int i = 0; i < log.getCreateCount(); i++) { T table = getExternalTable(log.getCreateTableNames().get(i), log.getCreateTableIds().get(i), catalog); @@ -195,8 +202,7 @@ public abstract class ExternalDatabase<T extends ExternalTable> idToTbl = tmpIdToTbl; } - long currentTime = System.currentTimeMillis(); - lastUpdateTime = currentTime; + lastUpdateTime = System.currentTimeMillis(); initDatabaseLog.setLastUpdateTime(lastUpdateTime); initialized = true; Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); @@ -370,17 +376,13 @@ public abstract class ExternalDatabase<T extends ExternalTable> throw new NotImplementedException("dropTable() is not implemented"); } - public void dropTableForReplay(String tableName) { - throw new NotImplementedException("replayDropTableFromEvent() is not implemented"); - } - @Override public CatalogIf getCatalog() { return extCatalog; } // Only used for sync hive metastore event - public void createTableForReplay(String tableName, long tableId) { + public void createTable(String tableName, long tableId) { throw new NotImplementedException("createTable() is not implemented"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java index d75f86bd088..f586ea7ed8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java @@ -64,17 +64,6 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> { @Override public void dropTable(String tableName) { - LOG.debug("drop table [{}]", tableName); - makeSureInitialized(); - Long tableId = tableNameToId.remove(tableName); - if (tableId == null) { - LOG.warn("drop table [{}] failed", tableName); - } - idToTbl.remove(tableId); - } - - @Override - public void dropTableForReplay(String tableName) { LOG.debug("replayDropTableFromEvent [{}]", tableName); Long tableId = tableNameToId.remove(tableName); if (tableId == null) { @@ -85,7 +74,7 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> { } @Override - public void createTableForReplay(String tableName, long tableId) { + public void createTable(String tableName, long tableId) { LOG.debug("create table [{}]", tableName); tableNameToId.put(tableName, tableId); HMSExternalTable table = getExternalTable(tableName, tableId, extCatalog); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java index a915b3b2418..1b5cd805a0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java @@ -49,7 +49,7 @@ public class IcebergExternalDatabase extends ExternalDatabase<IcebergExternalTab } @Override - public void dropTableForReplay(String tableName) { + public void dropTable(String tableName) { LOG.debug("drop table [{}]", tableName); Long tableId = tableNameToId.remove(tableName); if (tableId == null) { @@ -59,7 +59,7 @@ public class IcebergExternalDatabase extends ExternalDatabase<IcebergExternalTab } @Override - public void createTableForReplay(String tableName, long tableId) { + public void createTable(String tableName, long tableId) { LOG.debug("create table [{}]", tableName); tableNameToId.put(tableName, tableId); IcebergExternalTable table = new IcebergExternalTable(tableId, tableName, name, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java index a839b31298f..0e6ee7332d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java @@ -49,7 +49,7 @@ public class PaimonExternalDatabase extends ExternalDatabase<PaimonExternalTable } @Override - public void dropTableForReplay(String tableName) { + public void dropTable(String tableName) { LOG.debug("drop table [{}]", tableName); Long tableId = tableNameToId.remove(tableName); if (tableId == null) { @@ -59,7 +59,7 @@ public class PaimonExternalDatabase extends ExternalDatabase<PaimonExternalTable } @Override - public void createTableForReplay(String tableName, long tableId) { + public void createTable(String tableName, long tableId) { LOG.debug("create table [{}]", tableName); tableNameToId.put(tableName, tableId); PaimonExternalTable table = new PaimonExternalTable(tableId, tableName, name, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index 2ad27c87f22..76d7702fef4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.Resource.ReferenceType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.catalog.external.HMSExternalDatabase; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; @@ -345,10 +346,10 @@ public class CatalogMgr implements Writable, GsonPostProcessable { writeLock(); try { CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName()); - Map<String, String> oldProperties = catalog.getProperties(); if (catalog == null) { throw new DdlException("No catalog found with name: " + stmt.getCatalogName()); } + Map<String, String> oldProperties = catalog.getProperties(); if (stmt.getNewProperties().containsKey("type") && !catalog.getType() .equalsIgnoreCase(stmt.getNewProperties().get("type"))) { throw new DdlException("Can't modify the type of catalog property with name: " + stmt.getCatalogName()); @@ -682,12 +683,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { ((HMSExternalTable) table).unsetObjectCreated(); ((HMSExternalTable) table).setEventUpdateTime(updateTime); Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tableName); - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - log.setLastUpdateTime(updateTime); - Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log); } public void refreshExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfNotExists) @@ -773,43 +768,16 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - log.setLastUpdateTime(System.currentTimeMillis()); - replayDropExternalTable(log); - Env.getCurrentEnv().getEditLog().logDropExternalTable(log); - } - public void replayDropExternalTable(ExternalObjectLog log) { - LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), log.getDbId(), - log.getTableId()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - ExternalTable table = db.getTableForReplay(log.getTableId()); - if (table == null) { - LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); - return; - } db.writeLock(); try { - db.dropTableForReplay(table.getName()); - db.setLastUpdateTime(log.getLastUpdateTime()); + db.dropTable(table.getName()); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache( + catalog.getId(), db.getFullName(), table.getName()); + ((HMSExternalDatabase) db).setLastUpdateTime(System.currentTimeMillis()); } finally { db.writeUnlock(); } - - Env.getCurrentEnv().getExtMetaCacheMgr() - .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName()); } public boolean externalTableExistInLocal(String dbName, String tableName, String catalogName) throws DdlException { @@ -823,9 +791,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return ((ExternalCatalog) catalog).tableExistInLocal(dbName, tableName); } - public void createExternalTableFromEvent(String dbName, String tableName, String catalogName, - boolean ignoreIfExists) - throws DdlException { + public void createExternalTableFromEvent(String dbName, String tableName, + String catalogName, long updateTime, + boolean ignoreIfExists) throws DdlException { CatalogIf catalog = nameToCatalog.get(catalogName); if (catalog == null) { throw new DdlException("No catalog found with name: " + catalogName); @@ -848,33 +816,21 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableName(tableName); - log.setTableId(Env.getCurrentEnv().getNextId()); - log.setLastUpdateTime(System.currentTimeMillis()); - replayCreateExternalTableFromEvent(log); - Env.getCurrentEnv().getEditLog().logCreateExternalTable(log); - } - public void replayCreateExternalTableFromEvent(ExternalObjectLog log) { - LOG.debug("ReplayCreateExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}],tableName:[{}]", log.getCatalogId(), - log.getDbId(), log.getTableId(), log.getTableName()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); + long tblId = Env.getCurrentEnv().getExternalMetaIdMgr().getTblId(catalog.getId(), dbName, tableName); + // -1L means it will be dropped later, ignore + if (tblId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) { return; } + db.writeLock(); try { - db.createTableForReplay(log.getTableName(), log.getTableId()); - db.setLastUpdateTime(log.getLastUpdateTime()); + ((HMSExternalDatabase) db).createTable(tableName, tblId); + ((HMSExternalDatabase) db).setLastUpdateTime(System.currentTimeMillis()); + table = db.getTableNullable(tableName); + if (table != null) { + ((HMSExternalTable) table).setEventUpdateTime(updateTime); + } } finally { db.writeUnlock(); } @@ -896,34 +852,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setInvalidCache(true); - replayDropExternalDatabase(log); - Env.getCurrentEnv().getEditLog().logDropExternalDatabase(log); - } - - public void replayDropExternalDatabase(ExternalObjectLog log) { - writeLock(); - try { - LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), - log.getDbId(), log.getTableId()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - catalog.dropDatabaseForReplay(db.getFullName()); - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), db.getFullName()); - } finally { - writeUnlock(); - } + ((HMSExternalCatalog) catalog).dropDatabase(dbName); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), dbName); } public void createExternalDatabase(String dbName, String catalogName, boolean ignoreIfExists) throws DdlException { @@ -942,28 +872,13 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(Env.getCurrentEnv().getNextId()); - log.setDbName(dbName); - replayCreateExternalDatabase(log); - Env.getCurrentEnv().getEditLog().logCreateExternalDatabase(log); - } - - public void replayCreateExternalDatabase(ExternalObjectLog log) { - writeLock(); - try { - LOG.debug("ReplayCreateExternalDatabase,catalogId:[{}],dbId:[{}],dbName:[{}]", log.getCatalogId(), - log.getDbId(), log.getDbName()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - catalog.createDatabaseForReplay(log.getDbId(), log.getDbName()); - } finally { - writeUnlock(); + long dbId = Env.getCurrentEnv().getExternalMetaIdMgr().getDbId(catalog.getId(), dbName); + // -1L means it will be dropped later, ignore + if (dbId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) { + return; } + + ((HMSExternalCatalog) catalog).createDatabase(dbId, dbName); } public void addExternalPartitions(String catalogName, String dbName, String tableName, @@ -999,48 +914,6 @@ public class CatalogMgr implements Writable, GsonPostProcessable { HMSExternalTable hmsTable = (HMSExternalTable) table; Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames); hmsTable.setEventUpdateTime(updateTime); - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - log.setPartitionNames(partitionNames); - log.setLastUpdateTime(updateTime); - Env.getCurrentEnv().getEditLog().logAddExternalPartitions(log); - } - - public void replayAddExternalPartitions(ExternalObjectLog log) { - LOG.debug("ReplayAddExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), - log.getDbId(), log.getTableId()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - ExternalTable table = db.getTableForReplay(log.getTableId()); - if (table == null) { - LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); - return; - } - if (!(table instanceof HMSExternalTable)) { - LOG.warn("only support HMSTable"); - return; - } - - HMSExternalTable hmsTable = (HMSExternalTable) table; - try { - Env.getCurrentEnv().getExtMetaCacheMgr() - .addPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames()); - hmsTable.setEventUpdateTime(log.getLastUpdateTime()); - } catch (HMSClientException e) { - LOG.warn("Network problem occurs or hms table has been deleted, fallback to invalidate table cache", e); - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), - db.getFullName(), table.getName()); - } } public void dropExternalPartitions(String catalogName, String dbName, String tableName, @@ -1069,42 +942,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - log.setPartitionNames(partitionNames); - log.setLastUpdateTime(updateTime); - replayDropExternalPartitions(log); - Env.getCurrentEnv().getEditLog().logDropExternalPartitions(log); - } - - public void replayDropExternalPartitions(ExternalObjectLog log) { - LOG.debug("ReplayDropExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), - log.getDbId(), log.getTableId()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - ExternalTable table = db.getTableForReplay(log.getTableId()); - if (table == null) { - LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); - return; - } - if (!(table instanceof HMSExternalTable)) { - LOG.warn("only support HMSTable"); - return; - } HMSExternalTable hmsTable = (HMSExternalTable) table; - Env.getCurrentEnv().getExtMetaCacheMgr() - .dropPartitionsCache(catalog.getId(), hmsTable, log.getPartitionNames()); - hmsTable.setEventUpdateTime(log.getLastUpdateTime()); + Env.getCurrentEnv().getExtMetaCacheMgr().dropPartitionsCache(catalog.getId(), hmsTable, partitionNames); + hmsTable.setEventUpdateTime(updateTime); } public void refreshExternalPartitions(String catalogName, String dbName, String tableName, @@ -1136,42 +976,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable { return; } - ExternalObjectLog log = new ExternalObjectLog(); - log.setCatalogId(catalog.getId()); - log.setDbId(db.getId()); - log.setTableId(table.getId()); - log.setPartitionNames(partitionNames); - log.setLastUpdateTime(updateTime); - replayRefreshExternalPartitions(log); - Env.getCurrentEnv().getEditLog().logInvalidateExternalPartitions(log); - } - - public void replayRefreshExternalPartitions(ExternalObjectLog log) { - LOG.debug("replayRefreshExternalPartitions,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), - log.getDbId(), log.getTableId()); - ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); - if (catalog == null) { - LOG.warn("No catalog found with id:[{}], it may have been dropped.", log.getCatalogId()); - return; - } - ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); - if (db == null) { - LOG.warn("No db found with id:[{}], it may have been dropped.", log.getDbId()); - return; - } - ExternalTable table = db.getTableForReplay(log.getTableId()); - if (table == null) { - LOG.warn("No table found with id:[{}], it may have been dropped.", log.getTableId()); - return; - } - if (!(table instanceof HMSExternalTable)) { - LOG.warn("only support HMSTable"); - return; - } - Env.getCurrentEnv().getExtMetaCacheMgr() - .invalidatePartitionsCache(catalog.getId(), db.getFullName(), table.getName(), - log.getPartitionNames()); - ((HMSExternalTable) table).setEventUpdateTime(log.getLastUpdateTime()); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidatePartitionsCache( + catalog.getId(), db.getFullName(), table.getName(), partitionNames); + ((HMSExternalTable) table).setEventUpdateTime(updateTime); } public void registerCatalogRefreshListener(Env env) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 3ff599d0ab4..6fcd495b67f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -521,13 +521,6 @@ public abstract class ExternalCatalog return null; } - /** - * External catalog has no cluster semantics. - */ - protected static String getRealTableName(String tableName) { - return ClusterNamespace.getNameFromFullName(tableName); - } - public static ExternalCatalog read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, ExternalCatalog.class); @@ -546,9 +539,6 @@ public abstract class ExternalCatalog db.setTableExtCatalog(this); } objectCreated = false; - if (this instanceof HMSExternalCatalog) { - ((HMSExternalCatalog) this).setLastSyncedEventId(-1L); - } // TODO: This code is to compatible with older version of metadata. // Could only remove after all users upgrate to the new version. if (logType == null) { @@ -569,11 +559,11 @@ public abstract class ExternalCatalog dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId()); } - public void dropDatabaseForReplay(String dbName) { + public void dropDatabase(String dbName) { throw new NotImplementedException("dropDatabase not implemented"); } - public void createDatabaseForReplay(long dbId, String dbName) { + public void createDatabase(long dbId, String dbName) { throw new NotImplementedException("createDatabase not implemented"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaIdMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaIdMgr.java new file mode 100644 index 00000000000..621c25b3698 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaIdMgr.java @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.catalog.Env; +import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +/** + * <pre> + * ExternalMetaIdMgr is responsible for managing external meta ids. + * Now it just manages the external meta ids of hms events, + * but it will be extended to manage other external meta ids in the future. + * </pre> + * TODO: remove InitCatalogLog and InitDatabaseLog, manage external meta ids at ExternalMetaIdMgr + */ +public class ExternalMetaIdMgr { + + private static final Logger LOG = LogManager.getLogger(ExternalMetaIdMgr.class); + + public static final long META_ID_FOR_NOT_EXISTS = -1L; + + private final Map<Long, CtlMetaIdMgr> idToCtlMgr = Maps.newConcurrentMap(); + + public ExternalMetaIdMgr() { + } + + // invoke this method only on master + public static long nextMetaId() { + return Env.getCurrentEnv().getNextId(); + } + + // return the db id of the specified db, -1 means not exists + public long getDbId(long catalogId, String dbName) { + DbMetaIdMgr dbMetaIdMgr = getDbMetaIdMgr(catalogId, dbName); + if (dbMetaIdMgr == null) { + return META_ID_FOR_NOT_EXISTS; + } + return dbMetaIdMgr.dbId; + } + + // return the tbl id of the specified tbl, -1 means not exists + public long getTblId(long catalogId, String dbName, String tblName) { + TblMetaIdMgr tblMetaIdMgr = getTblMetaIdMgr(catalogId, dbName, tblName); + if (tblMetaIdMgr == null) { + return META_ID_FOR_NOT_EXISTS; + } + return tblMetaIdMgr.tblId; + } + + // return the partition id of the specified partition, -1 means not exists + public long getPartitionId(long catalogId, String dbName, + String tblName, String partitionName) { + PartitionMetaIdMgr partitionMetaIdMgr = getPartitionMetaIdMgr(catalogId, dbName, tblName, partitionName); + if (partitionMetaIdMgr == null) { + return META_ID_FOR_NOT_EXISTS; + } + return partitionMetaIdMgr.partitionId; + } + + private @Nullable DbMetaIdMgr getDbMetaIdMgr(long catalogId, String dbName) { + CtlMetaIdMgr ctlMetaIdMgr = idToCtlMgr.get(catalogId); + if (ctlMetaIdMgr == null) { + return null; + } + return ctlMetaIdMgr.dbNameToMgr.get(dbName); + } + + private @Nullable TblMetaIdMgr getTblMetaIdMgr(long catalogId, String dbName, String tblName) { + DbMetaIdMgr dbMetaIdMgr = getDbMetaIdMgr(catalogId, dbName); + if (dbMetaIdMgr == null) { + return null; + } + return dbMetaIdMgr.tblNameToMgr.get(tblName); + } + + private PartitionMetaIdMgr getPartitionMetaIdMgr(long catalogId, String dbName, + String tblName, String partitionName) { + TblMetaIdMgr tblMetaIdMgr = getTblMetaIdMgr(catalogId, dbName, tblName); + if (tblMetaIdMgr == null) { + return null; + } + return tblMetaIdMgr.partitionNameToMgr.get(partitionName); + } + + public void replayMetaIdMappingsLog(@NotNull MetaIdMappingsLog log) { + Preconditions.checkNotNull(log); + long catalogId = log.getCatalogId(); + CtlMetaIdMgr ctlMetaIdMgr = idToCtlMgr.computeIfAbsent(catalogId, CtlMetaIdMgr::new); + for (MetaIdMappingsLog.MetaIdMapping mapping : log.getMetaIdMappings()) { + handleMetaIdMapping(mapping, ctlMetaIdMgr); + } + if (log.isFromHmsEvent()) { + CatalogIf<?> catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(log.getCatalogId()); + if (catalogIf != null) { + MetastoreEventsProcessor metastoreEventsProcessor = Env.getCurrentEnv().getMetastoreEventsProcessor(); + metastoreEventsProcessor.updateMasterLastSyncedEventId( + (HMSExternalCatalog) catalogIf, log.getLastSyncedEventId()); + } + } + } + + // no lock because the operations is serialized currently + private void handleMetaIdMapping(MetaIdMappingsLog.MetaIdMapping mapping, CtlMetaIdMgr ctlMetaIdMgr) { + MetaIdMappingsLog.OperationType opType = MetaIdMappingsLog.getOperationType(mapping.getOpType()); + MetaIdMappingsLog.MetaObjectType objType = MetaIdMappingsLog.getMetaObjectType(mapping.getMetaObjType()); + switch (opType) { + case ADD: + handleAddMetaIdMapping(mapping, ctlMetaIdMgr, objType); + break; + + case DELETE: + handleDelMetaIdMapping(mapping, ctlMetaIdMgr, objType); + break; + + default: + break; + } + } + + private static void handleDelMetaIdMapping(MetaIdMappingsLog.MetaIdMapping mapping, + CtlMetaIdMgr ctlMetaIdMgr, + MetaIdMappingsLog.MetaObjectType objType) { + TblMetaIdMgr tblMetaIdMgr; + DbMetaIdMgr dbMetaIdMgr; + switch (objType) { + case DATABASE: + ctlMetaIdMgr.dbNameToMgr.remove(mapping.getDbName()); + break; + + case TABLE: + dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr.get(mapping.getDbName()); + if (dbMetaIdMgr != null) { + dbMetaIdMgr.tblNameToMgr.remove(mapping.getTblName()); + } + break; + + case PARTITION: + dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr.get(mapping.getDbName()); + if (dbMetaIdMgr != null) { + tblMetaIdMgr = dbMetaIdMgr.tblNameToMgr.get(mapping.getTblName()); + if (tblMetaIdMgr != null) { + tblMetaIdMgr.partitionNameToMgr.remove(mapping.getPartitionName()); + } + } + break; + + default: + break; + } + } + + private static void handleAddMetaIdMapping(MetaIdMappingsLog.MetaIdMapping mapping, + CtlMetaIdMgr ctlMetaIdMgr, + MetaIdMappingsLog.MetaObjectType objType) { + DbMetaIdMgr dbMetaIdMgr; + TblMetaIdMgr tblMetaIdMgr; + switch (objType) { + case DATABASE: + ctlMetaIdMgr.dbNameToMgr.put(mapping.getDbName(), + new DbMetaIdMgr(mapping.getId(), mapping.getDbName())); + break; + + case TABLE: + dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr + .computeIfAbsent(mapping.getDbName(), DbMetaIdMgr::new); + dbMetaIdMgr.tblNameToMgr.put(mapping.getTblName(), + new TblMetaIdMgr(mapping.getId(), mapping.getTblName())); + break; + + case PARTITION: + dbMetaIdMgr = ctlMetaIdMgr.dbNameToMgr + .computeIfAbsent(mapping.getDbName(), DbMetaIdMgr::new); + tblMetaIdMgr = dbMetaIdMgr.tblNameToMgr + .computeIfAbsent(mapping.getTblName(), TblMetaIdMgr::new); + tblMetaIdMgr.partitionNameToMgr.put(mapping.getPartitionName(), + new PartitionMetaIdMgr(mapping.getId(), mapping.getPartitionName())); + break; + + default: + break; + } + } + + public static class CtlMetaIdMgr { + protected final long catalogId; + + protected CtlMetaIdMgr(long catalogId) { + this.catalogId = catalogId; + } + + protected Map<String, DbMetaIdMgr> dbNameToMgr = Maps.newConcurrentMap(); + } + + public static class DbMetaIdMgr { + protected volatile long dbId = META_ID_FOR_NOT_EXISTS; + protected final String dbName; + + protected DbMetaIdMgr(long dbId, String dbName) { + this.dbId = dbId; + this.dbName = dbName; + } + + protected DbMetaIdMgr(String dbName) { + this.dbName = dbName; + } + + protected Map<String, TblMetaIdMgr> tblNameToMgr = Maps.newConcurrentMap(); + } + + public static class TblMetaIdMgr { + protected volatile long tblId = META_ID_FOR_NOT_EXISTS; + protected final String tblName; + + protected TblMetaIdMgr(long tblId, String tblName) { + this.tblId = tblId; + this.tblName = tblName; + } + + protected TblMetaIdMgr(String tblName) { + this.tblName = tblName; + } + + protected Map<String, PartitionMetaIdMgr> partitionNameToMgr = Maps.newConcurrentMap(); + } + + public static class PartitionMetaIdMgr { + protected volatile long partitionId = META_ID_FOR_NOT_EXISTS; + protected final String partitionName; + + protected PartitionMetaIdMgr(long partitionId, String partitionName) { + this.partitionId = partitionId; + this.partitionName = partitionName; + } + + protected PartitionMetaIdMgr(String partitionName) { + this.partitionName = partitionName; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index ae6b5f04738..dd6788ade2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -23,23 +23,19 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalDatabase; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.datasource.hive.HMSCachedClient; import org.apache.doris.datasource.hive.HMSCachedClientFactory; -import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -57,10 +53,7 @@ public class HMSExternalCatalog extends ExternalCatalog { private static final int MIN_CLIENT_POOL_SIZE = 8; protected HMSCachedClient client; - // Record the latest synced event id when processing hive events - // Must set to -1 otherwise client.getNextNotification will throw exception - // Reference to https://github.com/apDdlache/doris/issues/18251 - private long lastSyncedEventId = -1L; + public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter"; public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second"; // broker name for file split and query scan. @@ -132,18 +125,6 @@ public class HMSExternalCatalog extends ExternalCatalog { } } - public String getHiveMetastoreUris() { - return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); - } - - public String getHiveVersion() { - return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, ""); - } - - protected List<String> listDatabaseNames() { - return client.getAllDatabases(); - } - @Override protected void initLocalObjectsImpl() { HiveConf hiveConf = null; @@ -195,13 +176,13 @@ public class HMSExternalCatalog extends ExternalCatalog { hmsExternalDatabase.getTables().forEach(table -> names.add(table.getName())); return names; } else { - return client.getAllTables(getRealTableName(dbName)); + return client.getAllTables(ClusterNamespace.getNameFromFullName(dbName)); } } @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { - return client.tableExists(getRealTableName(dbName), tblName); + return client.tableExists(ClusterNamespace.getNameFromFullName(dbName), tblName); } @Override @@ -211,7 +192,7 @@ public class HMSExternalCatalog extends ExternalCatalog { if (hmsExternalDatabase == null) { return false; } - return hmsExternalDatabase.getTable(getRealTableName(tblName)).isPresent(); + return hmsExternalDatabase.getTable(ClusterNamespace.getNameFromFullName(tblName)).isPresent(); } public HMSCachedClient getClient() { @@ -219,69 +200,8 @@ public class HMSExternalCatalog extends ExternalCatalog { return client; } - public void setLastSyncedEventId(long lastSyncedEventId) { - this.lastSyncedEventId = lastSyncedEventId; - } - - public NotificationEventResponse getNextEventResponse(HMSExternalCatalog hmsExternalCatalog) - throws MetastoreNotificationFetchException { - makeSureInitialized(); - long currentEventId = getCurrentEventId(); - if (lastSyncedEventId < 0) { - refreshCatalog(hmsExternalCatalog); - // invoke getCurrentEventId() and save the event id before refresh catalog to avoid missing events - // but set lastSyncedEventId to currentEventId only if there is not any problems when refreshing catalog - lastSyncedEventId = currentEventId; - LOG.info( - "First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId," - + "lastSyncedEventId is [{}]", - hmsExternalCatalog.getName(), lastSyncedEventId); - return null; - } - - LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {},lastSyncedEventId is {}", - hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId); - if (currentEventId == lastSyncedEventId) { - LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName()); - return null; - } - - try { - return client.getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null); - } catch (MetastoreNotificationFetchException e) { - // Need a fallback to handle this because this error state can not be recovered until restarting FE - if (StringUtils.isNotEmpty(e.getMessage()) - && e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) { - refreshCatalog(hmsExternalCatalog); - // set lastSyncedEventId to currentEventId after refresh catalog successfully - lastSyncedEventId = currentEventId; - LOG.warn("Notification events are missing, maybe an event can not be handled " - + "or processing rate is too low, fallback to refresh the catalog"); - return null; - } - throw e; - } - } - - private void refreshCatalog(HMSExternalCatalog hmsExternalCatalog) { - CatalogLog log = new CatalogLog(); - log.setCatalogId(hmsExternalCatalog.getId()); - log.setInvalidCache(true); - Env.getCurrentEnv().getCatalogMgr().refreshCatalog(log); - } - - private long getCurrentEventId() { - makeSureInitialized(); - CurrentNotificationEventId currentNotificationEventId = client.getCurrentNotificationEventId(); - if (currentNotificationEventId == null) { - LOG.warn("Get currentNotificationEventId is null"); - return -1; - } - return currentNotificationEventId.getEventId(); - } - @Override - public void dropDatabaseForReplay(String dbName) { + public void dropDatabase(String dbName) { LOG.debug("drop database [{}]", dbName); Long dbId = dbNameToId.remove(dbName); if (dbId == null) { @@ -291,7 +211,7 @@ public class HMSExternalCatalog extends ExternalCatalog { } @Override - public void createDatabaseForReplay(long dbId, String dbName) { + public void createDatabase(long dbId, String dbName) { LOG.debug("create database [{}]", dbName); dbNameToId.put(dbName, dbId); ExternalDatabase<? extends ExternalTable> db = getDbForInit(dbName, dbId, logType); @@ -317,4 +237,17 @@ public class HMSExternalCatalog extends ExternalCatalog { catalogProperty.addProperty(PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "true"); } } + + public String getHiveMetastoreUris() { + return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); + } + + public String getHiveVersion() { + return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, ""); + } + + protected List<String> listDatabaseNames() { + return client.getAllDatabases(); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java deleted file mode 100644 index 2f462b551c7..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitTableLog.java +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource; - -import org.apache.doris.catalog.Column; -import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; -import org.apache.doris.persist.gson.GsonUtils; - -import com.google.gson.annotations.SerializedName; -import lombok.Data; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; - -@Data -public class InitTableLog implements Writable { - enum Type { - HMS, - ES, - UNKNOWN; - } - - @SerializedName(value = "catalogId") - private long catalogId; - - @SerializedName(value = "dbId") - private long dbId; - - @SerializedName(value = "tableId") - private long tableId; - - @SerializedName(value = "type") - private Type type; - - @SerializedName(value = "schema") - protected volatile List<Column> schema; - - public InitTableLog() {} - - @Override - public void write(DataOutput out) throws IOException { - Text.writeString(out, GsonUtils.GSON.toJson(this)); - } - - public static InitTableLog read(DataInput in) throws IOException { - String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, InitTableLog.class); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaIdMappingsLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaIdMappingsLog.java new file mode 100644 index 00000000000..629b4d13a40 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MetaIdMappingsLog.java @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import lombok.Data; +import lombok.Getter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +@Data +public class MetaIdMappingsLog implements Writable { + + public static final short OPERATION_TYPE_IGNORE = 0; + public static final short OPERATION_TYPE_ADD = 1; + public static final short OPERATION_TYPE_DELETE = 2; + + public static final short META_OBJECT_TYPE_IGNORE = 0; + public static final short META_OBJECT_TYPE_DATABASE = 1; + public static final short META_OBJECT_TYPE_TABLE = 2; + public static final short META_OBJECT_TYPE_PARTITION = 3; + + @SerializedName(value = "ctlId") + private long catalogId = -1L; + + @SerializedName(value = "fromEvent") + private boolean fromHmsEvent = false; + + // The synced event id of master + @SerializedName(value = "lastEventId") + private long lastSyncedEventId = -1L; + + @SerializedName(value = "metaIdMappings") + private List<MetaIdMapping> metaIdMappings = Lists.newLinkedList(); + + public MetaIdMappingsLog() { + } + + @Override + public int hashCode() { + return Objects.hash(catalogId, lastSyncedEventId, + metaIdMappings == null ? 0 : Arrays.hashCode(metaIdMappings.toArray())); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof MetaIdMappingsLog)) { + return false; + } + return Objects.equals(this.catalogId, ((MetaIdMappingsLog) obj).catalogId) + && Objects.equals(this.fromHmsEvent, ((MetaIdMappingsLog) obj).fromHmsEvent) + && Objects.equals(this.lastSyncedEventId, ((MetaIdMappingsLog) obj).lastSyncedEventId) + && Objects.equals(this.metaIdMappings, ((MetaIdMappingsLog) obj).metaIdMappings); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static MetaIdMappingsLog read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, MetaIdMappingsLog.class); + } + + public void addMetaIdMapping(MetaIdMapping metaIdMapping) { + this.metaIdMappings.add(metaIdMapping); + } + + public void addMetaIdMappings(List<MetaIdMapping> metaIdMappings) { + this.metaIdMappings.addAll(metaIdMappings); + } + + public static OperationType getOperationType(short opType) { + switch (opType) { + case OPERATION_TYPE_ADD: + return OperationType.ADD; + case OPERATION_TYPE_DELETE: + return OperationType.DELETE; + default: + return OperationType.IGNORE; + } + } + + public static MetaObjectType getMetaObjectType(short metaObjType) { + switch (metaObjType) { + case META_OBJECT_TYPE_DATABASE: + return MetaObjectType.DATABASE; + case META_OBJECT_TYPE_TABLE: + return MetaObjectType.TABLE; + case META_OBJECT_TYPE_PARTITION: + return MetaObjectType.PARTITION; + default: + return MetaObjectType.IGNORE; + } + } + + @Getter + public static class MetaIdMapping { + + @SerializedName(value = "opType") + private short opType; + @SerializedName(value = "metaObjType") + private short metaObjType; + // name of Database + @SerializedName(value = "dbName") + private String dbName; + // name of Table + @SerializedName(value = "tblName") + private String tblName; + // name of Partition + @SerializedName(value = "pName") + private String partitionName; + // id of Database/Table/Partition + @SerializedName(value = "id") + private long id; + + public MetaIdMapping() {} + + public MetaIdMapping(short opType, + short metaObjType, + String dbName, + String tblName, + String partitionName, + long id) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = tblName; + this.partitionName = partitionName; + this.id = id; + } + + public MetaIdMapping(short opType, + short metaObjType, + String dbName, + String tblName, + String partitionName) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = tblName; + this.partitionName = partitionName; + this.id = -1L; + } + + public MetaIdMapping(short opType, + short metaObjType, + String dbName, + String tblName, + long id) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = tblName; + this.partitionName = null; + this.id = id; + } + + public MetaIdMapping(short opType, + short metaObjType, + String dbName, + String tblName) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = tblName; + this.partitionName = null; + this.id = -1L; + } + + public MetaIdMapping(short opType, + short metaObjType, + String dbName, + long id) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = null; + this.partitionName = null; + this.id = id; + } + + public MetaIdMapping(short opType, + short metaObjType, + String dbName) { + this.opType = opType; + this.metaObjType = metaObjType; + this.dbName = dbName; + this.tblName = null; + this.partitionName = null; + this.id = -1L; + } + + @Override + public int hashCode() { + return Objects.hash(opType, metaObjType, dbName, tblName, partitionName, id); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof MetaIdMapping)) { + return false; + } + return Objects.equals(this.opType, ((MetaIdMapping) obj).opType) + && Objects.equals(this.metaObjType, ((MetaIdMapping) obj).metaObjType) + && Objects.equals(this.dbName, ((MetaIdMapping) obj).dbName) + && Objects.equals(this.tblName, ((MetaIdMapping) obj).tblName) + && Objects.equals(this.partitionName, ((MetaIdMapping) obj).partitionName) + && Objects.equals(this.id, ((MetaIdMapping) obj).id); + } + + } + + public enum OperationType { + IGNORE(OPERATION_TYPE_IGNORE), + // Add a Database/Table/Partition + ADD(OPERATION_TYPE_ADD), + // Delete Database/Table/Partition + DELETE(OPERATION_TYPE_DELETE); + + private final short opType; + + OperationType(short opType) { + this.opType = opType; + } + + public short getOperationType() { + return opType; + } + } + + public enum MetaObjectType { + IGNORE(META_OBJECT_TYPE_IGNORE), + DATABASE(META_OBJECT_TYPE_DATABASE), + TABLE(META_OBJECT_TYPE_TABLE), + PARTITION(META_OBJECT_TYPE_PARTITION); + + private final short metaObjType; + + MetaObjectType(short metaObjType) { + this.metaObjType = metaObjType; + } + + public short getMetaObjectType() { + return metaObjType; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java index e1dacbd0b29..ffc7b95ff59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java @@ -20,8 +20,11 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalMetaIdMgr; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.hadoop.hive.common.FileUtils; @@ -109,4 +112,17 @@ public class AddPartitionEvent extends MetastorePartitionEvent { debugString("Failed to process event"), e); } } + + @Override + protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() { + List<MetaIdMappingsLog.MetaIdMapping> metaIdMappings = Lists.newArrayList(); + for (String partitionName : this.getAllPartitionNames()) { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION, + dbName, tblName, partitionName, ExternalMetaIdMgr.nextMetaId()); + metaIdMappings.add(metaIdMapping); + } + return ImmutableList.copyOf(metaIdMappings); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java index 6de71fbbc59..c69812a22b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java @@ -100,7 +100,8 @@ public class AlterTableEvent extends MetastoreTableEvent { Env.getCurrentEnv().getCatalogMgr() .dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true); Env.getCurrentEnv().getCatalogMgr() - .createExternalTableFromEvent(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true); + .createExternalTableFromEvent( + tableAfter.getDbName(), tableAfter.getTableName(), catalogName, eventTime, true); } private void processRename() throws DdlException { @@ -118,7 +119,8 @@ public class AlterTableEvent extends MetastoreTableEvent { Env.getCurrentEnv().getCatalogMgr() .dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true); Env.getCurrentEnv().getCatalogMgr() - .createExternalTableFromEvent(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true); + .createExternalTableFromEvent( + tableAfter.getDbName(), tableAfter.getTableName(), catalogName, eventTime, true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java index 42d813319cc..d79d23824ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java @@ -20,8 +20,11 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalMetaIdMgr; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -59,4 +62,13 @@ public class CreateDatabaseEvent extends MetastoreEvent { debugString("Failed to process event"), e); } } + + @Override + protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + dbName, ExternalMetaIdMgr.nextMetaId()); + return ImmutableList.of(metaIdMapping); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java index 4c3615fbda8..246ce8626f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java @@ -19,8 +19,11 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalMetaIdMgr; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.Table; @@ -77,10 +80,19 @@ public class CreateTableEvent extends MetastoreTableEvent { try { infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tblName); Env.getCurrentEnv().getCatalogMgr() - .createExternalTableFromEvent(dbName, hmsTbl.getTableName(), catalogName, true); + .createExternalTableFromEvent(dbName, hmsTbl.getTableName(), catalogName, eventTime, true); } catch (DdlException e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); } } + + @Override + protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + dbName, tblName, ExternalMetaIdMgr.nextMetaId()); + return ImmutableList.of(metaIdMapping); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java index 3481f832fe0..ca69e6f14d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java @@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -59,4 +61,13 @@ public class DropDatabaseEvent extends MetastoreEvent { debugString("Failed to process event"), e); } } + + @Override + protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + dbName); + return ImmutableList.of(metaIdMapping); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java index f71f44cf5ab..dd443010289 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java @@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -124,7 +126,7 @@ public class DropPartitionEvent extends MetastorePartitionEvent { return false; } - // `that` event can be batched if this event's partitions contains all of the partitions which `that` event has + // `that` event can be batched if this event's partitions contains all the partitions which `that` event has // else just remove `that` event's relevant partitions for (String partitionName : getAllPartitionNames()) { if (thatPartitionEvent instanceof AddPartitionEvent) { @@ -136,4 +138,17 @@ public class DropPartitionEvent extends MetastorePartitionEvent { return getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames()); } + + @Override + protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() { + List<MetaIdMappingsLog.MetaIdMapping> metaIdMappings = Lists.newArrayList(); + for (String partitionName : this.getAllPartitionNames()) { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + dbName, tblName, partitionName); + metaIdMappings.add(metaIdMapping); + } + return ImmutableList.copyOf(metaIdMappings); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java index c333506cad2..0f62e246082 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java @@ -20,8 +20,10 @@ package org.apache.doris.datasource.hive.event; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage; @@ -89,14 +91,23 @@ public class DropTableEvent extends MetastoreTableEvent { return false; } - /** + /* * Check if `that` event is a rename event, a rename event can not be batched * because the process of `that` event will change the reference relation of this table, * otherwise it can be batched because this event is a drop-table event * and the process of this event will drop the whole table, * and `that` event must be a MetastoreTableEvent event otherwise `isSameTable` will return false - * */ + */ MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that; return !thatTblEvent.willChangeTableName(); } + + @Override + protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() { + MetaIdMappingsLog.MetaIdMapping metaIdMapping = new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + dbName, tblName); + return ImmutableList.of(metaIdMapping); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java index f9771562ed4..a9d165b4d03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java @@ -17,8 +17,10 @@ package org.apache.doris.datasource.hive.event; +import org.apache.doris.datasource.MetaIdMappingsLog; import org.apache.doris.datasource.hive.HMSCachedClient; +import com.google.common.collect.ImmutableList; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -227,6 +229,13 @@ public abstract class MetastoreEvent { return name.toString(); } + /** + * Create a MetaIdMapping list from the event if the event is a create/add/drop event + */ + protected List<MetaIdMappingsLog.MetaIdMapping> transferToMetaIdMappings() { + return ImmutableList.of(); + } + @Override public String toString() { return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java index aabc562dba1..a3ba092703b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java @@ -18,7 +18,9 @@ package org.apache.doris.datasource.hive.event; +import org.apache.doris.catalog.Env; import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.datasource.MetaIdMappingsLog; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -77,23 +79,39 @@ public class MetastoreEventFactory implements EventFactory { for (NotificationEvent event : events) { metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, catalogName)); } - return createBatchEvents(catalogName, metastoreEvents); + List<MetastoreEvent> mergedEvents = mergeEvents(catalogName, metastoreEvents); + if (Env.getCurrentEnv().isMaster()) { + logMetaIdMappings(hmsExternalCatalog.getId(), events.get(events.size() - 1).getEventId(), mergedEvents); + } + return mergedEvents; + } + + private void logMetaIdMappings(long catalogId, long lastSyncedEventId, List<MetastoreEvent> mergedEvents) { + MetaIdMappingsLog log = new MetaIdMappingsLog(); + log.setCatalogId(catalogId); + log.setFromHmsEvent(true); + log.setLastSyncedEventId(lastSyncedEventId); + for (MetastoreEvent event : mergedEvents) { + log.addMetaIdMappings(event.transferToMetaIdMappings()); + } + Env.getCurrentEnv().getExternalMetaIdMgr().replayMetaIdMappingsLog(log); + Env.getCurrentEnv().getEditLog().logMetaIdMappingsLog(log); } /** * Merge events to reduce the cost time on event processing, currently mainly handles MetastoreTableEvent * because merge MetastoreTableEvent is simple and cost-effective. * For example, consider there are some events as following: - * + * <pre> * event1: alter table db1.t1 add partition p1; * event2: alter table db1.t1 drop partition p2; * event3: alter table db1.t2 add partition p3; * event4: alter table db2.t3 rename to t4; * event5: drop table db1.t1; - * + * </pre> * Only `event3 event4 event5` will be reserved and other events will be skipped. * */ - public List<MetastoreEvent> createBatchEvents(String catalogName, List<MetastoreEvent> events) { + public List<MetastoreEvent> mergeEvents(String catalogName, List<MetastoreEvent> events) { List<MetastoreEvent> eventsCopy = Lists.newArrayList(events); Map<MetastoreTableEvent.TableKey, List<Integer>> indexMap = Maps.newLinkedHashMap(); for (int i = 0; i < events.size(); i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 622d84428fa..28793aecf21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -18,13 +18,22 @@ package org.apache.doris.datasource.hive.event; +import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.CatalogLog; import org.apache.doris.datasource.HMSClientException; import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.MasterOpExecutor; +import org.apache.doris.qe.OriginStatement; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; @@ -35,6 +44,7 @@ import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; +import java.util.Map; /** * A metastore event is a instance of the class @@ -68,6 +78,13 @@ public class MetastoreEventsProcessor extends MasterDaemon { // event factory which is used to get or create MetastoreEvents private final MetastoreEventFactory metastoreEventFactory; + // manager the lastSyncedEventId of hms catalogs + // use HashMap is fine because all operations are in one thread + private final Map<Long, Long> lastSyncedEventIdMap = Maps.newHashMap(); + + // manager the masterLastSyncedEventId of hms catalogs + private final Map<Long, Long> masterLastSyncedEventIdMap = Maps.newHashMap(); + private boolean isRunning; public MetastoreEventsProcessor() { @@ -76,13 +93,56 @@ public class MetastoreEventsProcessor extends MasterDaemon { this.isRunning = false; } + @Override + protected void runAfterCatalogReady() { + if (isRunning) { + LOG.warn("Last task not finished,ignore current task."); + return; + } + isRunning = true; + try { + realRun(); + } catch (Exception ex) { + LOG.warn("Task failed", ex); + } + isRunning = false; + } + + private void realRun() { + List<Long> catalogIds = Env.getCurrentEnv().getCatalogMgr().getCatalogIds(); + for (Long catalogId : catalogIds) { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); + if (catalog instanceof HMSExternalCatalog) { + HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog; + try { + List<NotificationEvent> events = getNextHMSEvents(hmsExternalCatalog); + if (!events.isEmpty()) { + LOG.info("Events size are {} on catalog [{}]", events.size(), + hmsExternalCatalog.getName()); + processEvents(events, hmsExternalCatalog); + } + } catch (MetastoreNotificationFetchException e) { + LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e); + } catch (Exception ex) { + LOG.warn("Failed to process hive metastore [{}] events .", + hmsExternalCatalog.getName(), ex); + } + } + } + } + /** * Fetch the next batch of NotificationEvents from metastore. The default batch size is * <code>{@link Config#hms_events_batch_size_per_rpc}</code> */ - private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog hmsExternalCatalog) { + private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog hmsExternalCatalog) throws Exception { LOG.debug("Start to pull events on catalog [{}]", hmsExternalCatalog.getName()); - NotificationEventResponse response = hmsExternalCatalog.getNextEventResponse(hmsExternalCatalog); + NotificationEventResponse response; + if (Env.getCurrentEnv().isMaster()) { + response = getNextEventResponseForMaster(hmsExternalCatalog); + } else { + response = getNextEventResponseForSlave(hmsExternalCatalog); + } if (response == null) { return Collections.emptyList(); @@ -99,11 +159,11 @@ public class MetastoreEventsProcessor extends MasterDaemon { && hmsClientException.getCause() instanceof NoSuchObjectException) { LOG.warn(event.debugString("Failed to process event and skip"), hmsClientException); } else { - hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1); + updateLastSyncedEventId(hmsExternalCatalog, event.getEventId() - 1); throw hmsClientException; } } catch (Exception e) { - hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1); + updateLastSyncedEventId(hmsExternalCatalog, event.getEventId() - 1); throw e; } } @@ -116,45 +176,142 @@ public class MetastoreEventsProcessor extends MasterDaemon { //transfer List<MetastoreEvent> metastoreEvents = metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog); doExecute(metastoreEvents, hmsExternalCatalog); - hmsExternalCatalog.setLastSyncedEventId(events.get(events.size() - 1).getEventId()); + updateLastSyncedEventId(hmsExternalCatalog, events.get(events.size() - 1).getEventId()); } - @Override - protected void runAfterCatalogReady() { - if (isRunning) { - LOG.warn("Last task not finished,ignore current task."); - return; + private NotificationEventResponse getNextEventResponseForMaster(HMSExternalCatalog hmsExternalCatalog) + throws MetastoreNotificationFetchException { + long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog); + long currentEventId = getCurrentHmsEventId(hmsExternalCatalog); + if (lastSyncedEventId < 0) { + refreshCatalogForMaster(hmsExternalCatalog); + // invoke getCurrentEventId() and save the event id before refresh catalog to avoid missing events + // but set lastSyncedEventId to currentEventId only if there is not any problems when refreshing catalog + updateLastSyncedEventId(hmsExternalCatalog, currentEventId); + LOG.info( + "First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId," + + "lastSyncedEventId is [{}]", + hmsExternalCatalog.getName(), lastSyncedEventId); + return null; } - isRunning = true; + + LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {}, lastSyncedEventId is {}", + hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId); + if (currentEventId == lastSyncedEventId) { + LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName()); + return null; + } + try { - realRun(); - } catch (Exception ex) { - LOG.warn("Task failed", ex); + return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, + Config.hms_events_batch_size_per_rpc, null); + } catch (MetastoreNotificationFetchException e) { + // Need a fallback to handle this because this error state can not be recovered until restarting FE + if (StringUtils.isNotEmpty(e.getMessage()) + && e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) { + refreshCatalogForMaster(hmsExternalCatalog); + // set lastSyncedEventId to currentEventId after refresh catalog successfully + updateLastSyncedEventId(hmsExternalCatalog, currentEventId); + LOG.warn("Notification events are missing, maybe an event can not be handled " + + "or processing rate is too low, fallback to refresh the catalog"); + return null; + } + throw e; } - isRunning = false; } - private void realRun() { - List<Long> catalogIds = Env.getCurrentEnv().getCatalogMgr().getCatalogIds(); - for (Long catalogId : catalogIds) { - CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); - if (catalog instanceof HMSExternalCatalog) { - HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog; - try { - List<NotificationEvent> events = getNextHMSEvents(hmsExternalCatalog); - if (!events.isEmpty()) { - LOG.info("Events size are {} on catalog [{}]", events.size(), - hmsExternalCatalog.getName()); - processEvents(events, hmsExternalCatalog); - } - } catch (MetastoreNotificationFetchException e) { - LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e); - } catch (Exception ex) { - LOG.warn("Failed to process hive metastore [{}] events .", - hmsExternalCatalog.getName(), ex); - } + private NotificationEventResponse getNextEventResponseForSlave(HMSExternalCatalog hmsExternalCatalog) + throws Exception { + long lastSyncedEventId = getLastSyncedEventId(hmsExternalCatalog); + long masterLastSyncedEventId = getMasterLastSyncedEventId(hmsExternalCatalog); + // do nothing if masterLastSyncedEventId has not been synced + if (masterLastSyncedEventId == -1L) { + LOG.info("LastSyncedEventId of master has not been synced on catalog [{}]", hmsExternalCatalog.getName()); + return null; + } + // do nothing if lastSyncedEventId is equals to masterLastSyncedEventId + if (lastSyncedEventId == masterLastSyncedEventId) { + LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName()); + return null; + } + + if (lastSyncedEventId < 0) { + refreshCatalogForSlave(hmsExternalCatalog); + // Use masterLastSyncedEventId to avoid missing events + updateLastSyncedEventId(hmsExternalCatalog, masterLastSyncedEventId); + LOG.info( + "First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId," + + "lastSyncedEventId is [{}]", + hmsExternalCatalog.getName(), lastSyncedEventId); + return null; + } + + LOG.debug("Catalog [{}] getNextEventResponse, masterLastSyncedEventId is {}, lastSyncedEventId is {}", + hmsExternalCatalog.getName(), masterLastSyncedEventId, lastSyncedEventId); + + // For slave FE nodes, only fetch events which id is lower than masterLastSyncedEventId + int maxEventSize = Math.min((int) (masterLastSyncedEventId - lastSyncedEventId), + Config.hms_events_batch_size_per_rpc); + try { + return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, maxEventSize, null); + } catch (MetastoreNotificationFetchException e) { + // Need a fallback to handle this because this error state can not be recovered until restarting FE + if (StringUtils.isNotEmpty(e.getMessage()) + && e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) { + refreshCatalogForMaster(hmsExternalCatalog); + // set masterLastSyncedEventId to lastSyncedEventId after refresh catalog successfully + updateLastSyncedEventId(hmsExternalCatalog, masterLastSyncedEventId); + LOG.warn("Notification events are missing, maybe an event can not be handled " + + "or processing rate is too low, fallback to refresh the catalog"); + return null; } + throw e; + } + } + + private long getCurrentHmsEventId(HMSExternalCatalog hmsExternalCatalog) { + CurrentNotificationEventId currentNotificationEventId = hmsExternalCatalog.getClient() + .getCurrentNotificationEventId(); + if (currentNotificationEventId == null) { + LOG.warn("Get currentNotificationEventId is null"); + return -1L; } + return currentNotificationEventId.getEventId(); + } + + private long getLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog) { + // Returns to -1 if not exists, otherwise client.getNextNotification will throw exception + // Reference to https://github.com/apDdlache/doris/issues/18251 + return lastSyncedEventIdMap.getOrDefault(hmsExternalCatalog.getId(), -1L); + } + + private void updateLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog, long eventId) { + lastSyncedEventIdMap.put(hmsExternalCatalog.getId(), eventId); + } + + private long getMasterLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog) { + return masterLastSyncedEventIdMap.getOrDefault(hmsExternalCatalog.getId(), -1L); + } + + public void updateMasterLastSyncedEventId(HMSExternalCatalog hmsExternalCatalog, long eventId) { + masterLastSyncedEventIdMap.put(hmsExternalCatalog.getId(), eventId); + } + + private void refreshCatalogForMaster(HMSExternalCatalog hmsExternalCatalog) { + CatalogLog log = new CatalogLog(); + log.setCatalogId(hmsExternalCatalog.getId()); + log.setInvalidCache(true); + Env.getCurrentEnv().getCatalogMgr().replayRefreshCatalog(log); + } + + private void refreshCatalogForSlave(HMSExternalCatalog hmsExternalCatalog) throws Exception { + // Transfer to master to refresh catalog + String sql = "REFRESH CATALOG " + hmsExternalCatalog.getName(); + OriginStatement originStmt = new OriginStatement(sql, 0); + MasterOpExecutor masterOpExecutor = new MasterOpExecutor(originStmt, new ConnectContext(), + RedirectStatus.FORWARD_WITH_SYNC, false); + LOG.debug("Transfer to master to refresh catalog, stmt: {}", sql); + masterOpExecutor.execute(); } public static MessageDeserializer getMessageDeserializer(String messageFormat) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 1bedda0a7e9..d74c519407f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -42,7 +42,7 @@ import org.apache.doris.datasource.CatalogLog; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.InitDatabaseLog; -import org.apache.doris.datasource.InitTableLog; +import org.apache.doris.datasource.MetaIdMappingsLog; import org.apache.doris.ha.MasterInfo; import org.apache.doris.insertoverwrite.InsertOverwriteLog; import org.apache.doris.job.base.AbstractJob; @@ -746,19 +746,18 @@ public class JournalEntity implements Writable { isRead = true; break; } - case OperationType.OP_INIT_EXTERNAL_TABLE: { - data = InitTableLog.read(in); - isRead = true; - break; - } - case OperationType.OP_REFRESH_EXTERNAL_DB: + case OperationType.OP_INIT_EXTERNAL_TABLE: case OperationType.OP_DROP_EXTERNAL_TABLE: case OperationType.OP_CREATE_EXTERNAL_TABLE: case OperationType.OP_DROP_EXTERNAL_DB: case OperationType.OP_CREATE_EXTERNAL_DB: case OperationType.OP_ADD_EXTERNAL_PARTITIONS: case OperationType.OP_DROP_EXTERNAL_PARTITIONS: - case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: + case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: { + isRead = true; + break; + } + case OperationType.OP_REFRESH_EXTERNAL_DB: case OperationType.OP_REFRESH_EXTERNAL_TABLE: { data = ExternalObjectLog.read(in); isRead = true; @@ -906,6 +905,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_ADD_META_ID_MAPPINGS: { + data = MetaIdMappingsLog.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 8c2424d4837..958277dd6b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -51,6 +51,7 @@ import org.apache.doris.datasource.CatalogLog; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.InitDatabaseLog; +import org.apache.doris.datasource.MetaIdMappingsLog; import org.apache.doris.ha.MasterInfo; import org.apache.doris.insertoverwrite.InsertOverwriteLog; import org.apache.doris.job.base.AbstractJob; @@ -1011,38 +1012,24 @@ public class EditLog { break; } case OperationType.OP_DROP_EXTERNAL_TABLE: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayDropExternalTable(log); break; } case OperationType.OP_CREATE_EXTERNAL_TABLE: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayCreateExternalTableFromEvent(log); break; } case OperationType.OP_DROP_EXTERNAL_DB: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayDropExternalDatabase(log); break; } case OperationType.OP_CREATE_EXTERNAL_DB: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayCreateExternalDatabase(log); break; } case OperationType.OP_ADD_EXTERNAL_PARTITIONS: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayAddExternalPartitions(log); break; } case OperationType.OP_DROP_EXTERNAL_PARTITIONS: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayDropExternalPartitions(log); break; } case OperationType.OP_REFRESH_EXTERNAL_PARTITIONS: { - final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); - env.getCatalogMgr().replayRefreshExternalPartitions(log); break; } case OperationType.OP_CREATE_WORKLOAD_GROUP: { @@ -1165,6 +1152,10 @@ public class EditLog { env.getBackupHandler().getRepoMgr().alterRepo(repository, true); break; } + case OperationType.OP_ADD_META_ID_MAPPINGS: { + env.getExternalMetaIdMgr().replayMetaIdMappingsLog((MetaIdMappingsLog) journal.getData()); + break; + } case OperationType.OP_LOG_UPDATE_ROWS: case OperationType.OP_LOG_NEW_PARTITION_LOADED: case OperationType.OP_LOG_ALTER_COLUMN_STATS: { @@ -1886,26 +1877,32 @@ public class EditLog { logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log); } + @Deprecated public void logDropExternalTable(ExternalObjectLog log) { logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log); } + @Deprecated public void logCreateExternalTable(ExternalObjectLog log) { logEdit(OperationType.OP_CREATE_EXTERNAL_TABLE, log); } + @Deprecated public void logDropExternalDatabase(ExternalObjectLog log) { logEdit(OperationType.OP_DROP_EXTERNAL_DB, log); } + @Deprecated public void logCreateExternalDatabase(ExternalObjectLog log) { logEdit(OperationType.OP_CREATE_EXTERNAL_DB, log); } + @Deprecated public void logAddExternalPartitions(ExternalObjectLog log) { logEdit(OperationType.OP_ADD_EXTERNAL_PARTITIONS, log); } + @Deprecated public void logDropExternalPartitions(ExternalObjectLog log) { logEdit(OperationType.OP_DROP_EXTERNAL_PARTITIONS, log); } @@ -2005,6 +2002,10 @@ public class EditLog { logEdit(OperationType.OP_INSERT_OVERWRITE, log); } + public void logMetaIdMappingsLog(MetaIdMappingsLog log) { + logEdit(OperationType.OP_ADD_META_ID_MAPPINGS, log); + } + public String getNotReadyReason() { if (journal == null) { return "journal is null"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 0945dc0f151..0868d7f371b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -288,12 +288,19 @@ public class OperationType { public static final short OP_ADD_CONSTRAINT = 346; public static final short OP_DROP_CONSTRAINT = 347; + @Deprecated public static final short OP_DROP_EXTERNAL_TABLE = 350; + @Deprecated public static final short OP_DROP_EXTERNAL_DB = 351; + @Deprecated public static final short OP_CREATE_EXTERNAL_TABLE = 352; + @Deprecated public static final short OP_CREATE_EXTERNAL_DB = 353; + @Deprecated public static final short OP_ADD_EXTERNAL_PARTITIONS = 354; + @Deprecated public static final short OP_DROP_EXTERNAL_PARTITIONS = 355; + @Deprecated public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356; public static final short OP_ALTER_USER = 400; @@ -364,6 +371,8 @@ public class OperationType { public static final short OP_LOG_ALTER_COLUMN_STATS = 464; + public static final short OP_ADD_META_ID_MAPPINGS = 470; + /** * Get opcode name by op code. **/ diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalMetaIdMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalMetaIdMgrTest.java new file mode 100644 index 00000000000..12e018a4cff --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalMetaIdMgrTest.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ExternalMetaIdMgrTest { + + @Test + public void testReplayMetaIdMappingsLog() { + ExternalMetaIdMgr mgr = new ExternalMetaIdMgr(); + MetaIdMappingsLog log1 = new MetaIdMappingsLog(); + log1.setCatalogId(1L); + log1.setFromHmsEvent(false); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + "db1", ExternalMetaIdMgr.nextMetaId())); + mgr.replayMetaIdMappingsLog(log1); + Assertions.assertNotEquals(-1L, mgr.getDbId(1L, "db1")); + + MetaIdMappingsLog log2 = new MetaIdMappingsLog(); + log2.setCatalogId(1L); + log2.setFromHmsEvent(false); + log2.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + "db1")); + mgr.replayMetaIdMappingsLog(log2); + Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1")); + + MetaIdMappingsLog log3 = new MetaIdMappingsLog(); + log3.setCatalogId(1L); + log3.setFromHmsEvent(false); + log3.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + "db1", "tbl1", ExternalMetaIdMgr.nextMetaId())); + mgr.replayMetaIdMappingsLog(log3); + Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1")); + Assertions.assertNotEquals(-1L, mgr.getTblId(1L, "db1", "tbl1")); + + MetaIdMappingsLog log4 = new MetaIdMappingsLog(); + log4.setCatalogId(1L); + log4.setFromHmsEvent(false); + log4.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + "db1", "tbl1")); + log4.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION, + "db1", "tbl1", "p1", ExternalMetaIdMgr.nextMetaId())); + mgr.replayMetaIdMappingsLog(log4); + Assertions.assertEquals(-1L, mgr.getDbId(1L, "db1")); + Assertions.assertEquals(-1L, mgr.getTblId(1L, "db1", "tbl1")); + Assertions.assertNotEquals(-1L, mgr.getPartitionId(1L, "db1", "tbl1", "p1")); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/MetaIdMappingsLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/MetaIdMappingsLogTest.java new file mode 100644 index 00000000000..fec57c29eda --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/MetaIdMappingsLogTest.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class MetaIdMappingsLogTest { + + @Test + public void testSerialization() throws Exception { + // 1. Write objects to file + MetaIdMappingsLog log1 = new MetaIdMappingsLog(); + Path path = Files.createFile(Paths.get("./metaIdMappingsLogTest.txt")); + try (DataOutputStream dos = new DataOutputStream(Files.newOutputStream(path))) { + log1.setFromHmsEvent(true); + log1.setLastSyncedEventId(-1L); + log1.setCatalogId(1L); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + "db1", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + "db1", "tbl1", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + "db1", "tbl2", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + "db2")); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION, + "db1", "tbl1", "p1", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_ADD, + MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION, + "db1", "tbl1", "p2", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_DATABASE, + "db2")); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_TABLE, + "db1", "tbl1", ExternalMetaIdMgr.nextMetaId())); + log1.addMetaIdMapping(new MetaIdMappingsLog.MetaIdMapping( + MetaIdMappingsLog.OPERATION_TYPE_DELETE, + MetaIdMappingsLog.META_OBJECT_TYPE_PARTITION, + "db1", "tbl1", "p2", ExternalMetaIdMgr.nextMetaId())); + log1.write(dos); + dos.flush(); + } catch (Throwable throwable) { + throwable.printStackTrace(); + Files.deleteIfExists(path); + Assertions.fail(); + } + + // 2. Read objects from file + MetaIdMappingsLog log2; + try (DataInputStream dis = new DataInputStream(Files.newInputStream(path))) { + log2 = MetaIdMappingsLog.read(dis); + Assertions.assertEquals(log1, log2); + } catch (Throwable throwable) { + throwable.printStackTrace(); + Assertions.fail(); + } finally { + Files.deleteIfExists(path); + } + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java index c9e566dc9d8..136cac6b712 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java @@ -454,7 +454,7 @@ public class MetastoreEventFactoryTest { for (int j = 0; j < 1000; j++) { events.add(producer.produceOneEvent(j)); } - List<MetastoreEvent> mergedEvents = factory.createBatchEvents(testCtl, events); + List<MetastoreEvent> mergedEvents = factory.mergeEvents(testCtl, events); for (MetastoreEvent event : events) { processEvent(validateCatalog, event); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org