This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f05a7ffd1aea27ec910f46e342ce475e89d29b3f
Author: Xiangyu Wang <dut.xian...@gmail.com>
AuthorDate: Wed Jul 5 16:27:01 2023 +0800

    [Fix](multi-catalog) Fallback to refresh catalog when hms events are 
missing (#21333)
    
    Fix #20227, the implementation has some problems and can not catch 
event-missing-exception.
---
 .../org/apache/doris/datasource/HMSExternalCatalog.java   | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 9d4fc479a9..1f9f7c59c0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -32,6 +32,7 @@ import 
org.apache.doris.datasource.property.constants.HMSProperties;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -203,9 +204,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
     public NotificationEventResponse getNextEventResponse(HMSExternalCatalog 
hmsExternalCatalog)
             throws MetastoreNotificationFetchException {
         makeSureInitialized();
+        long currentEventId = getCurrentEventId();
         if (lastSyncedEventId < 0) {
-            lastSyncedEventId = getCurrentEventId();
             refreshCatalog(hmsExternalCatalog);
+            // invoke getCurrentEventId() and save the event id before refresh 
catalog to avoid missing events
+            // but set lastSyncedEventId to currentEventId only if there is 
not any problems when refreshing catalog
+            lastSyncedEventId = currentEventId;
             LOG.info(
                     "First pulling events on catalog [{}],refreshCatalog and 
init lastSyncedEventId,"
                             + "lastSyncedEventId is [{}]",
@@ -213,7 +217,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
             return null;
         }
 
-        long currentEventId = getCurrentEventId();
         LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is 
{},lastSyncedEventId is {}",
                 hmsExternalCatalog.getName(), currentEventId, 
lastSyncedEventId);
         if (currentEventId == lastSyncedEventId) {
@@ -223,11 +226,13 @@ public class HMSExternalCatalog extends ExternalCatalog {
 
         try {
             return client.getNextNotification(lastSyncedEventId, 
Config.hms_events_batch_size_per_rpc, null);
-        } catch (IllegalStateException e) {
+        } catch (MetastoreNotificationFetchException e) {
             // Need a fallback to handle this because this error state can not 
be recovered until restarting FE
-            if 
(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE.equals(e.getMessage())) {
-                lastSyncedEventId = getCurrentEventId();
+            if (StringUtils.isNotEmpty(e.getMessage())
+                        && 
e.getMessage().contains(HiveMetaStoreClient.REPL_EVENTS_MISSING_IN_METASTORE)) {
                 refreshCatalog(hmsExternalCatalog);
+                // set lastSyncedEventId to currentEventId after refresh 
catalog successfully
+                lastSyncedEventId = currentEventId;
                 LOG.warn("Notification events are missing, maybe an event can 
not be handled "
                         + "or processing rate is too low, fallback to refresh 
the catalog");
                 return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to