This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 893f5f9345 [feature-wip](multi-catalog) support automatic sync hive metastore events (#15401) 893f5f9345 is described below commit 893f5f93455883d3fab681e95afc47363eeeee25 Author: zhangdong <493738...@qq.com> AuthorDate: Tue Jan 3 13:59:14 2023 +0800 [feature-wip](multi-catalog) support automatic sync hive metastore events (#15401) Poll metastore for create/alter/drop operations on database, table, partition events at a given frequency. By observing such events, we can take appropriate action on the (refresh/invalidate/add/remove) so that represents the latest information available in metastore. We keep track of the last synced event id in each polling iteration so the next batch can be requested appropriately. --- .../java/org/apache/doris/catalog/DatabaseIf.java | 2 + .../main/java/org/apache/doris/catalog/Env.java | 7 + .../doris/catalog/external/ExternalDatabase.java | 5 + .../catalog/external/HMSExternalDatabase.java | 10 + .../main/java/org/apache/doris/common/Config.java | 18 ++ .../org/apache/doris/datasource/CatalogMgr.java | 53 +++++- .../apache/doris/datasource/ExternalCatalog.java | 3 + .../doris/datasource/HMSExternalCatalog.java | 54 ++++++ .../datasource/PooledHiveMetaStoreClient.java | 27 +++ .../datasource/hive/event/DropTableEvent.java | 89 +++++++++ .../doris/datasource/hive/event/EventFactory.java | 32 ++++ .../doris/datasource/hive/event/IgnoredEvent.java | 43 +++++ .../datasource/hive/event/MetastoreEvent.java | 203 +++++++++++++++++++++ .../hive/event/MetastoreEventFactory.java | 81 ++++++++ .../datasource/hive/event/MetastoreEventType.java | 68 +++++++ .../hive/event/MetastoreEventsProcessor.java | 151 +++++++++++++++ .../hive/event/MetastoreNotificationException.java | 37 ++++ .../event/MetastoreNotificationFetchException.java | 37 ++++ .../datasource/hive/event/MetastoreTableEvent.java | 50 +++++ .../org/apache/doris/journal/JournalEntity.java | 1 + .../java/org/apache/doris/persist/EditLog.java | 9 + .../org/apache/doris/persist/OperationType.java | 2 + 22 files changed, 976 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java index ffd271d84e..f0dfa53484 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java @@ -214,4 +214,6 @@ public interface DatabaseIf<T extends TableIf> { } return (OlapTable) table; } + + void dropTable(String tableName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 1d95551df8..83bde25845 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -126,6 +126,7 @@ import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor; import org.apache.doris.deploy.DeployManager; import org.apache.doris.deploy.impl.AmbariDeployManager; import org.apache.doris.deploy.impl.K8sDeployManager; @@ -316,6 +317,7 @@ public class Env { private DeleteHandler deleteHandler; private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector; private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector; + private MetastoreEventsProcessor metastoreEventsProcessor; private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos private MasterDaemon txnCleaner; // To clean aborted or timeout txns @@ -549,6 +551,7 @@ public class Env { this.deleteHandler = new DeleteHandler(); this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector(); this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector(); + this.metastoreEventsProcessor = new MetastoreEventsProcessor(); this.replayedJournalId = new AtomicLong(0L); this.isElectable = false; @@ -1402,6 +1405,10 @@ public class Env { if (Config.enable_fqdn_mode) { fqdnManager.start(); } + if (Config.enable_hms_events_incremental_sync) { + metastoreEventsProcessor.start(); + } + } // start threads that should running on all FE diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java index 6ae8594c07..65c027713e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java @@ -258,4 +258,9 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>, @Override public void gsonPostProcess() throws IOException {} + + @Override + public void dropTable(String tableName) { + throw new NotImplementedException(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java index decef86caa..a1f6bcddab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java @@ -170,4 +170,14 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl idToTbl.put(tbl.getId(), tbl); tableNameToId.put(tbl.getName(), tbl.getId()); } + + @Override + public void dropTable(String tableName) { + LOG.debug("drop table [{}]", tableName); + Long tableId = tableNameToId.remove(tableName); + if (tableId == null) { + LOG.warn("drop table [{}] failed", tableName); + } + idToTbl.remove(tableId); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 2bc30113e6..93fbfc6e18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1935,5 +1935,23 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true) public static boolean enable_func_pushdown = true; + + /** + * If set to true, doris will automatically synchronize hms metadata to the cache in fe. + */ + @ConfField(masterOnly = true) + public static boolean enable_hms_events_incremental_sync = false; + + /** + * Maximum number of events to poll in each RPC. + */ + @ConfField(mutable = true, masterOnly = true) + public static int hms_events_batch_size_per_rpc = 500; + + /** + * HMS polling interval in milliseconds. + */ + @ConfField(masterOnly = true) + public static int hms_events_polling_interval_ms = 20000; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index a62562acd7..3f9a76bb7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.Resource.ReferenceType; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.cluster.ClusterNamespace; @@ -439,13 +440,17 @@ public class CatalogMgr implements Writable, GsonPostProcessable { * Refresh the catalog meta and write the meta log. */ public void refreshCatalog(RefreshCatalogStmt stmt) throws UserException { + CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName()); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + stmt.getCatalogName()); + } + CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt); + refreshCatalog(log); + } + + public void refreshCatalog(CatalogLog log) { writeLock(); try { - CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName()); - if (catalog == null) { - throw new DdlException("No catalog found with name: " + stmt.getCatalogName()); - } - CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt); replayRefreshCatalog(log); Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_REFRESH_CATALOG, log); } finally { @@ -481,7 +486,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { /** * Reply for refresh catalog event. */ - public void replayRefreshCatalog(CatalogLog log) throws DdlException { + public void replayRefreshCatalog(CatalogLog log) { writeLock(); try { unprotectedRefreshCatalog(log.getCatalogId(), log.isInvalidCache()); @@ -554,6 +559,42 @@ public class CatalogMgr implements Writable, GsonPostProcessable { .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName()); } + public void dropExternalTable(String dbName, String tableName, String catalogName) throws DdlException { + CatalogIf catalog = nameToCatalog.get(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support drop ExternalCatalog Tables"); + } + DatabaseIf db = catalog.getDbNullable(dbName); + if (db == null) { + throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); + } + + TableIf table = db.getTableNullable(tableName); + if (table == null) { + throw new DdlException("Table " + tableName + " does not exist in db " + dbName); + } + ExternalObjectLog log = new ExternalObjectLog(); + log.setCatalogId(catalog.getId()); + log.setDbId(db.getId()); + log.setTableId(table.getId()); + replayDropExternalTable(log); + Env.getCurrentEnv().getEditLog().logDropExternalTable(log); + } + + public void replayDropExternalTable(ExternalObjectLog log) { + LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), log.getDbId(), + log.getTableId()); + ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId()); + ExternalDatabase db = catalog.getDbForReplay(log.getDbId()); + ExternalTable table = db.getTableForReplay(log.getTableId()); + db.dropTable(table.getName()); + Env.getCurrentEnv().getExtMetaCacheMgr() + .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName()); + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 2d68d28fee..e730fb6bc0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -295,6 +295,9 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr db.setTableExtCatalog(this); } objectCreated = false; + if (this instanceof HMSExternalCatalog) { + ((HMSExternalCatalog) this).setLastSyncedEventId(-1L); + } } public void addDatabaseForTest(ExternalDatabase db) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index e2a5f2f3a8..ec94867fc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -25,14 +25,20 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.HMSExternalDatabase; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.List; @@ -42,8 +48,12 @@ import java.util.Map; * External catalog for hive metastore compatible data sources. */ public class HMSExternalCatalog extends ExternalCatalog { + private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class); + private static final int MAX_CLIENT_POOL_SIZE = 8; protected PooledHiveMetaStoreClient client; + // Record the latest synced event id when processing hive events + private long lastSyncedEventId; /** * Default constructor for HMSExternalCatalog. @@ -170,4 +180,48 @@ public class HMSExternalCatalog extends ExternalCatalog { } return tmpSchema; } + + public void setLastSyncedEventId(long lastSyncedEventId) { + this.lastSyncedEventId = lastSyncedEventId; + } + + public NotificationEventResponse getNextEventResponse(HMSExternalCatalog hmsExternalCatalog) + throws MetastoreNotificationFetchException { + makeSureInitialized(); + if (lastSyncedEventId < 0) { + lastSyncedEventId = getCurrentEventId(); + refreshCatalog(hmsExternalCatalog); + LOG.info( + "First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId," + + "lastSyncedEventId is [{}]", + hmsExternalCatalog.getName(), lastSyncedEventId); + return null; + } + + long currentEventId = getCurrentEventId(); + LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {},lastSyncedEventId is {}", + hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId); + if (currentEventId == lastSyncedEventId) { + LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName()); + return null; + } + return client.getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null); + } + + private void refreshCatalog(HMSExternalCatalog hmsExternalCatalog) { + CatalogLog log = new CatalogLog(); + log.setCatalogId(hmsExternalCatalog.getId()); + log.setInvalidCache(true); + Env.getCurrentEnv().getCatalogMgr().refreshCatalog(log); + } + + private long getCurrentEventId() { + makeSureInitialized(); + CurrentNotificationEventId currentNotificationEventId = client.getCurrentNotificationEventId(); + if (currentNotificationEventId == null) { + LOG.warn("Get currentNotificationEventId is null"); + return -1; + } + return currentNotificationEventId.getEventId(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java index 05e3d9d15c..008253c450 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java @@ -19,6 +19,7 @@ package org.apache.doris.datasource; import org.apache.doris.catalog.HMSResource; import org.apache.doris.common.Config; +import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient; import com.google.common.base.Preconditions; @@ -29,8 +30,10 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.logging.log4j.LogManager; @@ -145,6 +148,30 @@ public class PooledHiveMetaStoreClient { } } + public CurrentNotificationEventId getCurrentNotificationEventId() { + try (CachedClient client = getClient()) { + return client.client.getCurrentNotificationEventId(); + } catch (Exception e) { + LOG.warn("Failed to fetch current notification event id", e); + throw new MetastoreNotificationFetchException( + "Failed to get current notification event id. msg: " + e.getMessage()); + } + } + + public NotificationEventResponse getNextNotification(long lastEventId, + int maxEvents, + IMetaStoreClient.NotificationFilter filter) + throws MetastoreNotificationFetchException { + try (CachedClient client = getClient()) { + return client.client.getNextNotification(lastEventId, maxEvents, filter); + } catch (Exception e) { + LOG.warn("Failed to get next notification based on last event id {}", lastEventId, e); + throw new MetastoreNotificationFetchException( + "Failed to get next notification based on last event id: " + lastEventId + ". msg: " + e + .getMessage()); + } + } + private class CachedClient implements AutoCloseable { private final IMetaStoreClient client; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java new file mode 100644 index 0000000000..8647e47b78 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +/** + * MetastoreEvent for DROP_TABLE event type + */ +public class DropTableEvent extends MetastoreTableEvent { + private static final Logger LOG = LogManager.getLogger(DropTableEvent.class); + private final String dbName; + private final String tableName; + + private DropTableEvent(NotificationEvent event, + String catalogName) { + super(event, catalogName); + Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType())); + JSONDropTableMessage dropTableMessage = + (JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer() + .getDropTableMessage(event.getMessage()); + try { + dbName = dropTableMessage.getDB(); + tableName = dropTableMessage.getTable(); + } catch (Exception e) { + throw new MetastoreNotificationException(debugString( + "Could not parse event message. " + + "Check if %s is set to true in metastore configuration", + MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e); + } + } + + public static List<MetastoreEvent> getEvents(NotificationEvent event, + String catalogName) { + return Lists.newArrayList(new DropTableEvent(event, catalogName)); + } + + @Override + protected boolean existInCache() { + return true; + } + + @Override + protected boolean canBeSkipped() { + return false; + } + + protected boolean isSupported() { + return true; + } + + @Override + protected void process() throws MetastoreNotificationException { + try { + LOG.info("DropTable event process,catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, + tableName); + Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName, tableName, catalogName); + } catch (DdlException e) { + LOG.warn("DropExternalTable failed,dbName:[{}],tableName:[{}],catalogName:[{}].", dbName, tableName, + catalogName, e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java new file mode 100644 index 0000000000..333687e2ab --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import java.util.List; + +/** + * Factory interface to generate a {@link MetastoreEvent} from a {@link NotificationEvent} object. + */ +public interface EventFactory { + + List<MetastoreEvent> transferNotificationEventToMetastoreEvents(NotificationEvent hmsEvent, + String catalogName) throws MetastoreNotificationException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java new file mode 100644 index 0000000000..4d2dc1a178 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import java.util.List; + +/** + * An event type which is ignored. Useful for unsupported metastore event types + */ +public class IgnoredEvent extends MetastoreEvent { + protected IgnoredEvent(NotificationEvent event, String catalogName) { + super(event, catalogName); + } + + private static List<MetastoreEvent> getEvents(NotificationEvent event, + String catalogName) { + return Lists.newArrayList(new IgnoredEvent(event, catalogName)); + } + + @Override + public void process() { + debugLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java new file mode 100644 index 0000000000..5cc4594457 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Abstract base class for all MetastoreEvents. A MetastoreEvent is an object used to + * process a NotificationEvent received from metastore. + */ +public abstract class MetastoreEvent { + private static final Logger LOG = LogManager.getLogger(MetastoreEvent.class); + // String.format compatible string to prepend event id and type + private static final String STR_FORMAT_EVENT_ID_TYPE = "EventId: %d EventType: %s "; + + // logger format compatible string to prepend to a log formatted message + private static final String LOG_FORMAT_EVENT_ID_TYPE = "EventId: {} EventType: {} "; + + // the notification received from metastore which is processed by this + protected final NotificationEvent event; + + // dbName from the event + protected final String dbName; + + // tblName from the event + protected final String tblName; + + // eventId of the event. Used instead of calling getter on event everytime + private final long eventId; + + // eventType from the NotificationEvent + private final MetastoreEventType eventType; + + // Actual notificationEvent object received from Metastore + protected final NotificationEvent metastoreNotificationEvent; + + protected final String catalogName; + + protected MetastoreEvent(NotificationEvent event, String catalogName) { + this.event = event; + this.dbName = event.getDbName(); + this.tblName = event.getTableName(); + this.eventId = event.getEventId(); + this.eventType = MetastoreEventType.from(event.getEventType()); + this.metastoreNotificationEvent = event; + this.catalogName = catalogName; + } + + public long getEventId() { + return eventId; + } + + public MetastoreEventType getEventType() { + return eventType; + } + + public String getDbName() { + return dbName; + } + + public String getTblName() { + return tblName; + } + + /** + * Checks if the given event can be batched into this event. Default behavior is + * to return false which can be overridden by a sub-class. + * The current version is relatively simple to process batch events, so all that need to be processed are true. + * + * @param event The event under consideration to be batched into this event. + * @return false if event cannot be batched into this event; otherwise true. + */ + protected boolean canBeBatched(MetastoreEvent event) { + return false; + } + + /** + * Adds the given event into the batch of events represented by this event. Default + * implementation is to return null. Sub-classes must override this method to + * implement batching. + * + * @param event The event which needs to be added to the batch. + * @return The batch event which represents all the events batched into this event + * until now including the given event. + */ + protected MetastoreEvent addToBatchEvents(MetastoreEvent event) { + return null; + } + + + protected boolean existInCache() throws MetastoreNotificationException { + return false; + } + + /** + * Returns the number of events represented by this event. For most events this is 1. + * In case of batch events this could be more than 1. + */ + protected int getNumberOfEvents() { + return 1; + } + + /** + * Certain events like ALTER_TABLE or ALTER_PARTITION implement logic to ignore + * some events because they do not affect query results. + * + * @return true if this event can be skipped. + */ + protected boolean canBeSkipped() { + return false; + } + + /** + * Whether the current version of FE supports processing of some events, some events are reserved, + * and may be processed later version. + */ + protected boolean isSupported() { + return false; + } + + /** + * Process the information available in the NotificationEvent. + */ + protected abstract void process() throws MetastoreNotificationException; + + /** + * Helper method to get debug string with helpful event information prepended to the + * message. This can be used to generate helpful exception messages + * + * @param msgFormatString String value to be used in String.format() for the given message + * @param args args to the <code>String.format()</code> for the given msgFormatString + */ + protected String debugString(String msgFormatString, Object... args) { + String formatString = STR_FORMAT_EVENT_ID_TYPE + msgFormatString; + Object[] formatArgs = getLogFormatArgs(args); + return String.format(formatString, formatArgs); + } + + /** + * Helper method to generate the format args after prepending the event id and type + */ + private Object[] getLogFormatArgs(Object[] args) { + Object[] formatArgs = new Object[args.length + 2]; + formatArgs[0] = getEventId(); + formatArgs[1] = getEventType(); + int i = 2; + for (Object arg : args) { + formatArgs[i] = arg; + i++; + } + return formatArgs; + } + + /** + * Logs at info level the given log formatted string and its args. The log formatted + * string should have {} pair at the appropriate location in the string for each arg + * value provided. This method prepends the event id and event type before logging the + * message. No-op if the log level is not at INFO + */ + protected void infoLog(String logFormattedStr, Object... args) { + if (!LOG.isInfoEnabled()) { + return; + } + String formatString = LOG_FORMAT_EVENT_ID_TYPE + logFormattedStr; + Object[] formatArgs = getLogFormatArgs(args); + LOG.info(formatString, formatArgs); + } + + /** + * Similar to infoLog excepts logs at debug level + */ + protected void debugLog(String logFormattedStr, Object... args) { + if (!LOG.isDebugEnabled()) { + return; + } + String formatString = LOG_FORMAT_EVENT_ID_TYPE + logFormattedStr; + Object[] formatArgs = getLogFormatArgs(args); + LOG.debug(formatString, formatArgs); + } + + @Override + public String toString() { + return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java new file mode 100644 index 0000000000..2719158c8e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.doris.datasource.HMSExternalCatalog; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Factory class to create various MetastoreEvents. + */ +public class MetastoreEventFactory implements EventFactory { + private static final Logger LOG = LogManager.getLogger(MetastoreEventFactory.class); + + @Override + public List<MetastoreEvent> transferNotificationEventToMetastoreEvents(NotificationEvent event, + String catalogName) { + Preconditions.checkNotNull(event.getEventType()); + MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType()); + switch (metastoreEventType) { + case DROP_TABLE: + return DropTableEvent.getEvents(event, catalogName); + default: + // ignore all the unknown events by creating a IgnoredEvent + return Lists.newArrayList(new IgnoredEvent(event, catalogName)); + } + } + + List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) { + List<MetastoreEvent> metastoreEvents = Lists.newArrayList(); + + for (NotificationEvent event : events) { + metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, hmsExternalCatalog.getName())); + } + + List<MetastoreEvent> tobeProcessEvents = metastoreEvents.stream() + .filter(MetastoreEvent::isSupported) + .collect(Collectors.toList()); + + if (tobeProcessEvents.isEmpty()) { + LOG.info("The metastore events to process is empty on catalog {}", hmsExternalCatalog.getName()); + return Collections.emptyList(); + } + + return createBatchEvents(tobeProcessEvents); + } + + /** + * Create batch event tasks according to HivePartitionName to facilitate subsequent parallel processing. + * For ADD_PARTITION and DROP_PARTITION, we directly override any events before that partition. + * For a partition, it is meaningless to process any events before the drop partition. + */ + List<MetastoreEvent> createBatchEvents(List<MetastoreEvent> events) { + // now do nothing + return events; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java new file mode 100644 index 0000000000..31dce29366 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +/** + * Currently we only support handling some events. + */ +public enum MetastoreEventType { + CREATE_TABLE("CREATE_TABLE"), + DROP_TABLE("DROP_TABLE"), + ALTER_TABLE("ALTER_TABLE"), + CREATE_DATABASE("CREATE_DATABASE"), + DROP_DATABASE("DROP_DATABASE"), + ALTER_DATABASE("ALTER_DATABASE"), + ADD_PARTITION("ADD_PARTITION"), + ALTER_PARTITION("ALTER_PARTITION"), + ALTER_PARTITIONS("ALTER_PARTITIONS"), + DROP_PARTITION("DROP_PARTITION"), + INSERT("INSERT"), + INSERT_PARTITIONS("INSERT_PARTITIONS"), + ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"), + COMMIT_TXN("COMMIT_TXN"), + ABORT_TXN("ABORT_TXN"), + OTHER("OTHER"); + + private final String eventType; + + MetastoreEventType(String msEventType) { + this.eventType = msEventType; + } + + @Override + public String toString() { + return eventType; + } + + /** + * Returns the MetastoreEventType from a given string value of event from Metastore's + * NotificationEvent.eventType. If none of the supported MetastoreEventTypes match, + * return OTHER + * + * @param eventType EventType value from the {@link org.apache.hadoop.hive.metastore.api.NotificationEvent} + */ + public static MetastoreEventType from(String eventType) { + for (MetastoreEventType metastoreEventType : values()) { + if (metastoreEventType.eventType.equalsIgnoreCase(eventType)) { + return metastoreEventType; + } + } + return OTHER; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java new file mode 100644 index 0000000000..1ff3bd98b2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.HMSExternalCatalog; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; + +/** + * A metastore event is a instance of the class + * {@link NotificationEvent}. Metastore can be + * configured, to work with Listeners which are called on various DDL operations like + * create/alter/drop operations on database, table, partition etc. Each event has a unique + * incremental id and the generated events are be fetched from Metastore to get + * incremental updates to the metadata stored in Hive metastore using the the public API + * <code>get_next_notification</code> These events could be generated by external + * Metastore clients like Apache Hive or Apache Spark configured to talk with the same metastore. + * <p> + * This class is used to poll metastore for such events at a given frequency. By observing + * such events, we can take appropriate action on the {@link org.apache.doris.datasource.hive.HiveMetaStoreCache} + * (refresh/invalidate/add/remove) so that represents the latest information + * available in metastore. We keep track of the last synced event id in each polling + * iteration so the next batch can be requested appropriately. The current batch size is + * constant and set to {@link org.apache.doris.common.Config#hms_events_batch_size_per_rpc}. + */ +public class MetastoreEventsProcessor extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(MetastoreEventsProcessor.class); + public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY = + "hive.metastore.notifications.add.thrift.objects"; + + // for deserializing from JSON strings from metastore event + private static final MessageDeserializer MESSAGE_DESERIALIZER = new JSONMessageDeserializer(); + + + // event factory which is used to get or create MetastoreEvents + private final MetastoreEventFactory metastoreEventFactory; + + private boolean isRunning; + + public MetastoreEventsProcessor() { + super(MetastoreEventsProcessor.class.getName(), Config.hms_events_polling_interval_ms); + this.metastoreEventFactory = new MetastoreEventFactory(); + this.isRunning = false; + } + + /** + * Fetch the next batch of NotificationEvents from metastore. The default batch size is + * <code>{@link Config#hms_events_batch_size_per_rpc}</code> + */ + private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog hmsExternalCatalog) { + LOG.debug("Start to pull events on catalog [{}]", hmsExternalCatalog.getName()); + NotificationEventResponse response = hmsExternalCatalog.getNextEventResponse(hmsExternalCatalog); + + if (response == null) { + return Collections.emptyList(); + } + return response.getEvents(); + } + + private void doExecute(List<MetastoreEvent> events, HMSExternalCatalog hmsExternalCatalog) { + for (MetastoreEvent event : events) { + try { + event.process(); + } catch (Exception e) { + hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1); + throw e; + } + } + } + + /** + * Process the given list of notification events. Useful for tests which provide a list of events + */ + private void processEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) { + //transfer + List<MetastoreEvent> metastoreEvents = metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog); + doExecute(metastoreEvents, hmsExternalCatalog); + hmsExternalCatalog.setLastSyncedEventId(events.get(events.size() - 1).getEventId()); + } + + @Override + protected void runAfterCatalogReady() { + if (isRunning) { + LOG.warn("Last task not finished,ignore current task."); + return; + } + isRunning = true; + try { + realRun(); + } catch (Exception ex) { + LOG.warn("Task failed", ex); + } + isRunning = false; + } + + private void realRun() { + List<Long> catalogIds = Env.getCurrentEnv().getCatalogMgr().getCatalogIds(); + for (Long catalogId : catalogIds) { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); + if (catalog instanceof HMSExternalCatalog) { + HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog; + List<NotificationEvent> events = Collections.emptyList(); + try { + events = getNextHMSEvents(hmsExternalCatalog); + if (!events.isEmpty()) { + LOG.info("Events size are {} on catalog [{}]", events.size(), + hmsExternalCatalog.getName()); + processEvents(events, hmsExternalCatalog); + } + } catch (MetastoreNotificationFetchException e) { + LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e); + } catch (Exception ex) { + LOG.warn("Failed to process hive metastore [{}] events .", + hmsExternalCatalog.getName(), ex); + } + } + } + } + + public static MessageDeserializer getMessageDeserializer() { + return MESSAGE_DESERIALIZER; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java new file mode 100644 index 0000000000..2bd5c4c40c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +/** + * Utility exception class to be thrown for errors during event processing + */ +public class MetastoreNotificationException extends RuntimeException { + + public MetastoreNotificationException(String msg, Throwable cause) { + super(msg, cause); + } + + public MetastoreNotificationException(String msg) { + super(msg); + } + + public MetastoreNotificationException(Exception e) { + super(e); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java new file mode 100644 index 0000000000..487165eeca --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +/** + * Utility exception class to be thrown for errors during event processing + */ +public class MetastoreNotificationFetchException extends MetastoreNotificationException { + + public MetastoreNotificationFetchException(String msg, Throwable cause) { + super(msg, cause); + } + + public MetastoreNotificationFetchException(String msg) { + super(msg); + } + + public MetastoreNotificationFetchException(Exception e) { + super(e); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java new file mode 100644 index 0000000000..70f56bdbb0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import java.util.List; + +/** + * Base class for all the table events + */ +public abstract class MetastoreTableEvent extends MetastoreEvent { + + + protected MetastoreTableEvent(NotificationEvent event, String catalogName) { + super(event, catalogName); + Preconditions.checkNotNull(dbName, "Database name cannot be null"); + Preconditions.checkNotNull(tblName, "Table name cannot be null"); + } + + /** + * Returns a list of parameters that are set by Hive for tables/partitions that can be + * ignored to determine if the alter table/partition event is a trivial one. + */ + private static final List<String> PARAMETERS_TO_IGNORE = + new ImmutableList.Builder<String>() + .add("transient_lastDdlTime") + .add("numFilesErasureCoded") + .add("numFiles") + .add("comment") + .build(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 96ae663015..fb8fbb5b99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -706,6 +706,7 @@ public class JournalEntity implements Writable { break; } case OperationType.OP_REFRESH_EXTERNAL_DB: + case OperationType.OP_DROP_EXTERNAL_TABLE: case OperationType.OP_REFRESH_EXTERNAL_TABLE: { data = ExternalObjectLog.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 217c0c117a..9254583294 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -949,6 +949,11 @@ public class EditLog { env.getCatalogMgr().replayRefreshExternalTable(log); break; } + case OperationType.OP_DROP_EXTERNAL_TABLE: { + final ExternalObjectLog log = (ExternalObjectLog) journal.getData(); + env.getCatalogMgr().replayDropExternalTable(log); + break; + } case OperationType.OP_INIT_EXTERNAL_TABLE: { // Do nothing. break; @@ -1624,6 +1629,10 @@ public class EditLog { logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log); } + public void logDropExternalTable(ExternalObjectLog log) { + logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log); + } + public Journal getJournal() { return this.journal; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 6204fc1836..c5889a8001 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -255,6 +255,8 @@ public class OperationType { public static final short OP_DROP_MTMV_TASK = 341; public static final short OP_ALTER_MTMV_TASK = 342; + public static final short OP_DROP_EXTERNAL_TABLE = 350; + public static final short OP_ALTER_USER = 400; /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org