Jackie-Jiang commented on code in PR #15673:
URL: https://github.com/apache/pinot/pull/15673#discussion_r2072252978


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1208,33 +1208,45 @@ public void ensureAllPartitionsConsuming(TableConfig 
tableConfig, List<StreamCon
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
-    HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState 
-> {
-      assert idealState != null;
-      boolean isTableEnabled = idealState.isEnabled();
-      boolean isTablePaused = isTablePaused(idealState);
-      boolean offsetsHaveToChange = offsetCriteria != null;
-      if (isTableEnabled && !isTablePaused) {
-        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
-            offsetsHaveToChange
-                ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
-                : getPartitionGroupConsumptionStatusList(idealState, 
streamConfigs);
-        // FIXME: Right now, we assume topics are sharing same offset criteria
-        OffsetCriteria originalOffsetCriteria = 
streamConfigs.get(0).getOffsetCriteria();
-        // Read the smallest offset when a new partition is detected
-        streamConfigs.stream()
-            .forEach(streamConfig -> streamConfig.setOffsetCriteria(
-                offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
-        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-            getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
-        streamConfigs.stream().forEach(streamConfig -> 
streamConfig.setOffsetCriteria(originalOffsetCriteria));
-        return ensureAllPartitionsConsuming(tableConfig, streamConfigs, 
idealState, newPartitionGroupMetadataList,
-            offsetCriteria);
-      } else {
-        LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
-            realtimeTableName, isTableEnabled, isTablePaused);
-        return idealState;
-      }
-    }, DEFAULT_RETRY_POLICY, true);
+    try {
+      HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
+        assert idealState != null;
+        boolean isTableEnabled = idealState.isEnabled();
+        boolean isTablePaused = isTablePaused(idealState);
+        boolean offsetsHaveToChange = offsetCriteria != null;
+        if (isTableEnabled && !isTablePaused) {
+          List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+              offsetsHaveToChange
+                  ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
+                  : getPartitionGroupConsumptionStatusList(idealState, 
streamConfigs);
+          // FIXME: Right now, we assume topics are sharing same offset 
criteria
+          OffsetCriteria originalOffsetCriteria = 
streamConfigs.get(0).getOffsetCriteria();
+          // Read the smallest offset when a new partition is detected
+          streamConfigs.stream()
+              .forEach(streamConfig -> streamConfig.setOffsetCriteria(
+                  offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
+          List<PartitionGroupMetadata> newPartitionGroupMetadataList;
+          try {
+            newPartitionGroupMetadataList =
+                getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
+          } catch (Exception e) {
+            _controllerMetrics.addMeteredTableValue(realtimeTableName,
+                ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR, 1L);

Review Comment:
   This is not the only place where `PartitionGroupMetadata` is fetched. If you 
want to track this error, probably track it in `PartitionGroupMetadataFetcher`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1208,33 +1208,45 @@ public void ensureAllPartitionsConsuming(TableConfig 
tableConfig, List<StreamCon
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
-    HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState 
-> {
-      assert idealState != null;
-      boolean isTableEnabled = idealState.isEnabled();
-      boolean isTablePaused = isTablePaused(idealState);
-      boolean offsetsHaveToChange = offsetCriteria != null;
-      if (isTableEnabled && !isTablePaused) {
-        List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
-            offsetsHaveToChange
-                ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
-                : getPartitionGroupConsumptionStatusList(idealState, 
streamConfigs);
-        // FIXME: Right now, we assume topics are sharing same offset criteria
-        OffsetCriteria originalOffsetCriteria = 
streamConfigs.get(0).getOffsetCriteria();
-        // Read the smallest offset when a new partition is detected
-        streamConfigs.stream()
-            .forEach(streamConfig -> streamConfig.setOffsetCriteria(
-                offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
-        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-            getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
-        streamConfigs.stream().forEach(streamConfig -> 
streamConfig.setOffsetCriteria(originalOffsetCriteria));
-        return ensureAllPartitionsConsuming(tableConfig, streamConfigs, 
idealState, newPartitionGroupMetadataList,
-            offsetCriteria);
-      } else {
-        LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
-            realtimeTableName, isTableEnabled, isTablePaused);
-        return idealState;
-      }
-    }, DEFAULT_RETRY_POLICY, true);
+    try {
+      HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
+        assert idealState != null;
+        boolean isTableEnabled = idealState.isEnabled();
+        boolean isTablePaused = isTablePaused(idealState);
+        boolean offsetsHaveToChange = offsetCriteria != null;
+        if (isTableEnabled && !isTablePaused) {
+          List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+              offsetsHaveToChange
+                  ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
+                  : getPartitionGroupConsumptionStatusList(idealState, 
streamConfigs);
+          // FIXME: Right now, we assume topics are sharing same offset 
criteria
+          OffsetCriteria originalOffsetCriteria = 
streamConfigs.get(0).getOffsetCriteria();
+          // Read the smallest offset when a new partition is detected
+          streamConfigs.stream()
+              .forEach(streamConfig -> streamConfig.setOffsetCriteria(
+                  offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
+          List<PartitionGroupMetadata> newPartitionGroupMetadataList;
+          try {
+            newPartitionGroupMetadataList =
+                getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList);
+          } catch (Exception e) {
+            _controllerMetrics.addMeteredTableValue(realtimeTableName,
+                ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR, 1L);
+            throw e;
+          }
+          streamConfigs.stream().forEach(streamConfig -> 
streamConfig.setOffsetCriteria(originalOffsetCriteria));
+          return ensureAllPartitionsConsuming(tableConfig, streamConfigs, 
idealState, newPartitionGroupMetadataList,
+              offsetCriteria);
+        } else {
+          LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
+              realtimeTableName, isTableEnabled, isTablePaused);
+          return idealState;
+        }
+      }, DEFAULT_RETRY_POLICY, true);
+    } catch (Exception e) {
+      LOGGER.error("Failed to update ideal state during 
ensureAllPartitionsConsuming.", e);

Review Comment:
   We usually only log exception once. To get more context on the exception, 
you can wrap it into a `RuntimeException` with more details
   ```suggestion
         throw new RuntimeException("Failed to update ideal state during 
ensureAllPartitionsConsuming", e);
   ```
   
   Does the stack trace have enough info if we don't wrap the exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to