noob-se7en commented on code in PR #15673: URL: https://github.com/apache/pinot/pull/15673#discussion_r2072695083
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1208,33 +1208,45 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamCon Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); - HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { - assert idealState != null; - boolean isTableEnabled = idealState.isEnabled(); - boolean isTablePaused = isTablePaused(idealState); - boolean offsetsHaveToChange = offsetCriteria != null; - if (isTableEnabled && !isTablePaused) { - List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = - offsetsHaveToChange - ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions - : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); - // FIXME: Right now, we assume topics are sharing same offset criteria - OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); - // Read the smallest offset when a new partition is detected - streamConfigs.stream() - .forEach(streamConfig -> streamConfig.setOffsetCriteria( - offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); - List<PartitionGroupMetadata> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); - streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); - return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, - offsetCriteria); - } else { - LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", - realtimeTableName, isTableEnabled, isTablePaused); - return idealState; - } - }, DEFAULT_RETRY_POLICY, true); + try { + HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + assert idealState != null; + boolean isTableEnabled = idealState.isEnabled(); + boolean isTablePaused = isTablePaused(idealState); + boolean offsetsHaveToChange = offsetCriteria != null; + if (isTableEnabled && !isTablePaused) { + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + offsetsHaveToChange + ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions + : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + // FIXME: Right now, we assume topics are sharing same offset criteria + OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); + // Read the smallest offset when a new partition is detected + streamConfigs.stream() + .forEach(streamConfig -> streamConfig.setOffsetCriteria( + offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); + List<PartitionGroupMetadata> newPartitionGroupMetadataList; + try { + newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, + ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR, 1L); Review Comment: Then I am not sure if it should be tracked under controller metrics or not since `PartitionGroupMetadataFetcher ` is in spi module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org