Jackie-Jiang commented on code in PR #15673: URL: https://github.com/apache/pinot/pull/15673#discussion_r2072252978
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1208,33 +1208,45 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamCon Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); - HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { - assert idealState != null; - boolean isTableEnabled = idealState.isEnabled(); - boolean isTablePaused = isTablePaused(idealState); - boolean offsetsHaveToChange = offsetCriteria != null; - if (isTableEnabled && !isTablePaused) { - List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = - offsetsHaveToChange - ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions - : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); - // FIXME: Right now, we assume topics are sharing same offset criteria - OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); - // Read the smallest offset when a new partition is detected - streamConfigs.stream() - .forEach(streamConfig -> streamConfig.setOffsetCriteria( - offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); - List<PartitionGroupMetadata> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); - streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); - return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, - offsetCriteria); - } else { - LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", - realtimeTableName, isTableEnabled, isTablePaused); - return idealState; - } - }, DEFAULT_RETRY_POLICY, true); + try { + HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + assert idealState != null; + boolean isTableEnabled = idealState.isEnabled(); + boolean isTablePaused = isTablePaused(idealState); + boolean offsetsHaveToChange = offsetCriteria != null; + if (isTableEnabled && !isTablePaused) { + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + offsetsHaveToChange + ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions + : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + // FIXME: Right now, we assume topics are sharing same offset criteria + OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); + // Read the smallest offset when a new partition is detected + streamConfigs.stream() + .forEach(streamConfig -> streamConfig.setOffsetCriteria( + offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); + List<PartitionGroupMetadata> newPartitionGroupMetadataList; + try { + newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, + ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR, 1L); Review Comment: This is not the only place where `PartitionGroupMetadata` is fetched. If you want to track this error, probably track it in `PartitionGroupMetadataFetcher` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1208,33 +1208,45 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamCon Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); - HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { - assert idealState != null; - boolean isTableEnabled = idealState.isEnabled(); - boolean isTablePaused = isTablePaused(idealState); - boolean offsetsHaveToChange = offsetCriteria != null; - if (isTableEnabled && !isTablePaused) { - List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = - offsetsHaveToChange - ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions - : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); - // FIXME: Right now, we assume topics are sharing same offset criteria - OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); - // Read the smallest offset when a new partition is detected - streamConfigs.stream() - .forEach(streamConfig -> streamConfig.setOffsetCriteria( - offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); - List<PartitionGroupMetadata> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); - streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); - return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, - offsetCriteria); - } else { - LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", - realtimeTableName, isTableEnabled, isTablePaused); - return idealState; - } - }, DEFAULT_RETRY_POLICY, true); + try { + HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + assert idealState != null; + boolean isTableEnabled = idealState.isEnabled(); + boolean isTablePaused = isTablePaused(idealState); + boolean offsetsHaveToChange = offsetCriteria != null; + if (isTableEnabled && !isTablePaused) { + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + offsetsHaveToChange + ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions + : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + // FIXME: Right now, we assume topics are sharing same offset criteria + OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); + // Read the smallest offset when a new partition is detected + streamConfigs.stream() + .forEach(streamConfig -> streamConfig.setOffsetCriteria( + offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); + List<PartitionGroupMetadata> newPartitionGroupMetadataList; + try { + newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, + ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR, 1L); + throw e; + } + streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); + return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, + offsetCriteria); + } else { + LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", + realtimeTableName, isTableEnabled, isTablePaused); + return idealState; + } + }, DEFAULT_RETRY_POLICY, true); + } catch (Exception e) { + LOGGER.error("Failed to update ideal state during ensureAllPartitionsConsuming.", e); Review Comment: We usually only log exception once. To get more context on the exception, you can wrap it into a `RuntimeException` with more details ```suggestion throw new RuntimeException("Failed to update ideal state during ensureAllPartitionsConsuming", e); ``` Does the stack trace have enough info if we don't wrap the exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org