This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f75d584b33 Do not return UNHEALTHY status when Temporary 
ConcurrentModificationException from Kafka (#16741)
8f75d584b33 is described below

commit 8f75d584b335fe2a24e0402159016af93201c59c
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Fri Sep 5 10:09:48 2025 -0700

    Do not return UNHEALTHY status when Temporary 
ConcurrentModificationException from Kafka (#16741)
    
    * Add Concurrent Modification Exception so ingestion status returned is not 
UNHEALTHY when the Kafka consumer returns 
java.util.ConcurrentModificationException
    
    * Change the error message
    
    * Update TablesResource.java
    
    * Remove the exception message
    
    * change the log message
    
    ---------
    
    Co-authored-by: Xiang Fu <[email protected]>
---
 .../java/org/apache/pinot/server/api/resources/TablesResource.java   | 5 +++++
 1 file changed, 5 insertions(+)

diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index a2a2e691141..f3e3f4405ba 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -34,6 +34,7 @@ import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -1089,6 +1090,10 @@ public class TablesResource {
                   recordsLagMap, availabilityLagMsMap)));
         }
       }
+    } catch (ConcurrentModificationException e) {
+      LOGGER.warn("Multi-threaded access is unsafe for KafkaConsumer, caught 
exception when fetching stream offset",
+          e);
+      return segmentConsumerInfoList;
     } catch (Exception e) {
       throw new WebApplicationException("Caught exception when getting 
consumer info for table: " + realtimeTableName);
     } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to