This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b206ccd076 Log IS update failures in ensureAllPartitionsConsuming 
(#15673)
b206ccd076 is described below

commit b206ccd076a5590c54dc607ee4b1286b0045c2b1
Author: NOOB <43700604+noob-se...@users.noreply.github.com>
AuthorDate: Wed May 7 05:23:54 2025 +0530

    Log IS update failures in ensureAllPartitionsConsuming (#15673)
---
 .../pinot/common/metrics/ControllerMeter.java      |  4 +-
 .../common/utils/helix/IdealStateGroupCommit.java  |  3 +-
 .../helix/core/PinotTableIdealStateBuilder.java    |  8 ++-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 58 ++++++++++++----------
 4 files changed, 43 insertions(+), 30 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 5f934561ad..45ae4c76db 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -73,7 +73,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   // Total Bytes read from deep store
   DEEP_STORE_READ_BYTES_COMPLETED("deepStoreReadBytesCompleted", true),
   // Total Bytes written to deep store
-  DEEP_STORE_WRITE_BYTES_COMPLETED("deepStoreWriteBytesCompleted", true);
+  DEEP_STORE_WRITE_BYTES_COMPLETED("deepStoreWriteBytesCompleted", true),
+  // Tracks failures encountered while fetching partition group metadata
+  PARTITION_GROUP_METADATA_FETCH_ERROR("failures", true);
 
   private final String _brokerMeterName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
index f7e7981a1a..0b55fd9faf 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
@@ -105,7 +105,8 @@ public class IdealStateGroupCommit {
    * @param helixManager helixManager with the ability to pull from the 
current data\
    * @param resourceName the resource name to be updated
    * @param updater the idealState updater to be applied
-   * @return IdealState if the update is successful, null if not
+   * @return IdealState if the update is successful, exception if the update 
fails and null if interrupted while
+   * committing change
    */
   public IdealState commit(HelixManager helixManager, String resourceName, 
Function<IdealState, IdealState> updater,
       RetryPolicy retryPolicy, boolean noChangeOk) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 244f7853d8..8bc9ea442f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -21,6 +21,8 @@ package org.apache.pinot.controller.helix.core;
 import java.util.List;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
@@ -96,9 +98,13 @@ public class PinotTableIdealStateBuilder {
       return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
     } catch (Exception e) {
       Exception fetcherException = 
partitionGroupMetadataFetcher.getException();
+      String tableNameWithType = streamConfigs.get(0).getTableNameWithType();
       LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}",
           streamConfigs.stream().map(streamConfig -> 
streamConfig.getTopicName()).reduce((a, b) -> a + "," + b),
-          streamConfigs.get(0).getTableNameWithType(), fetcherException);
+          tableNameWithType, fetcherException);
+      ControllerMetrics controllerMetrics = ControllerMetrics.get();
+      controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR,
+          1L);
       throw new RuntimeException(fetcherException);
     }
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 765b25852a..d2445f3234 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1208,33 +1208,37 @@ public class PinotLLCRealtimeSegmentManager {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
-    HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState 
-> {
-      assert idealState != null;
-      boolean isTableEnabled = idealState.isEnabled();
-      boolean isTablePaused = isTablePaused(idealState);
-      boolean offsetsHaveToChange = offsetCriteria != null;
-      if (isTableEnabled && !isTablePaused) {
-        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
-            offsetsHaveToChange
-                ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
-                : getPartitionGroupConsumptionStatusList(idealState, 
streamConfigs);
-        // FIXME: Right now, we assume topics are sharing same offset criteria
-        OffsetCriteria originalOffsetCriteria = 
streamConfigs.get(0).getOffsetCriteria();
-        // Read the smallest offset when a new partition is detected
-        streamConfigs.stream()
-            .forEach(streamConfig -> streamConfig.setOffsetCriteria(
-                offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
-        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-            getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
-        streamConfigs.stream().forEach(streamConfig -> 
streamConfig.setOffsetCriteria(originalOffsetCriteria));
-        return ensureAllPartitionsConsuming(tableConfig, streamConfigs, 
idealState, newPartitionGroupMetadataList,
-            offsetCriteria);
-      } else {
-        LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
-            realtimeTableName, isTableEnabled, isTablePaused);
-        return idealState;
-      }
-    }, DEFAULT_RETRY_POLICY, true);
+    try {
+      HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
+        assert idealState != null;
+        boolean isTableEnabled = idealState.isEnabled();
+        boolean isTablePaused = isTablePaused(idealState);
+        boolean offsetsHaveToChange = offsetCriteria != null;
+        if (isTableEnabled && !isTablePaused) {
+          List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+              offsetsHaveToChange
+                  ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
+                  : getPartitionGroupConsumptionStatusList(idealState, 
streamConfigs);
+          // FIXME: Right now, we assume topics are sharing same offset 
criteria
+          OffsetCriteria originalOffsetCriteria = 
streamConfigs.get(0).getOffsetCriteria();
+          // Read the smallest offset when a new partition is detected
+          streamConfigs.stream()
+              .forEach(streamConfig -> streamConfig.setOffsetCriteria(
+                  offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
+          List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+              getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
+          streamConfigs.stream().forEach(streamConfig -> 
streamConfig.setOffsetCriteria(originalOffsetCriteria));
+          return ensureAllPartitionsConsuming(tableConfig, streamConfigs, 
idealState, newPartitionGroupMetadataList,
+              offsetCriteria);
+        } else {
+          LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
+              realtimeTableName, isTableEnabled, isTablePaused);
+          return idealState;
+        }
+      }, DEFAULT_RETRY_POLICY, true);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to update ideal state during 
ensureAllPartitionsConsuming.", e);
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to