This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f05a7ffd1aea27ec910f46e342ce475e89d29b3f Author: Xiangyu Wang <dut.xian...@gmail.com> AuthorDate: Wed Jul 5 16:27:01 2023 +0800 [Fix](multi-catalog) Fallback to refresh catalog when hms events are missing (#21333) Fix #20227, the implementation has some problems and can not catch event-missing-exception. --- .../org/apache/doris/datasource/HMSExternalCatalog.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 9d4fc479a9..1f9f7c59c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -32,6 +32,7 @@ import org.apache.doris.datasource.property.constants.HMSProperties; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -203,9 +204,12 @@ public class HMSExternalCatalog extends ExternalCatalog { public NotificationEventResponse getNextEventResponse(HMSExternalCatalog hmsExternalCatalog) throws MetastoreNotificationFetchException { makeSureInitialized(); + long currentEventId = getCurrentEventId(); if (lastSyncedEventId < 0) { - lastSyncedEventId = getCurrentEventId(); refreshCatalog(hmsExternalCatalog); + // invoke getCurrentEventId() and save the event id before refresh catalog to avoid missing events + // but set lastSyncedEventId to currentEventId only if there is not any problems when refreshing catalog + lastSyncedEventId = currentEventId; LOG.info( "First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId," + "lastSyncedEventId is [{}]", @@ -213,7 +217,6 @@ public class HMSExternalCatalog extends ExternalCatalog { return null; } - long currentEventId = getCurrentEventId(); LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {},lastSyncedEventId is {}", hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId); if (currentEventId == lastSyncedEventId) { @@ -223,11 +226,13 @@ public class HMSExternalCatalog extends ExternalCatalog { try { return client.getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null); - } catch (IllegalStateException e) { + } catch (MetastoreNotificationFetchException e) { // Need a fallback to handle this because this error state can not be recovered until restarting FE - if (HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE.equals(e.getMessage())) { - lastSyncedEventId = getCurrentEventId(); + if (StringUtils.isNotEmpty(e.getMessage()) + && e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) { refreshCatalog(hmsExternalCatalog); + // set lastSyncedEventId to currentEventId after refresh catalog successfully + lastSyncedEventId = currentEventId; LOG.warn("Notification events are missing, maybe an event can not be handled " + "or processing rate is too low, fallback to refresh the catalog"); return null; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org