This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b206ccd076 Log IS update failures in ensureAllPartitionsConsuming (#15673) b206ccd076 is described below commit b206ccd076a5590c54dc607ee4b1286b0045c2b1 Author: NOOB <43700604+noob-se...@users.noreply.github.com> AuthorDate: Wed May 7 05:23:54 2025 +0530 Log IS update failures in ensureAllPartitionsConsuming (#15673) --- .../pinot/common/metrics/ControllerMeter.java | 4 +- .../common/utils/helix/IdealStateGroupCommit.java | 3 +- .../helix/core/PinotTableIdealStateBuilder.java | 8 ++- .../realtime/PinotLLCRealtimeSegmentManager.java | 58 ++++++++++++---------- 4 files changed, 43 insertions(+), 30 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java index 5f934561ad..45ae4c76db 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java @@ -73,7 +73,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter { // Total Bytes read from deep store DEEP_STORE_READ_BYTES_COMPLETED("deepStoreReadBytesCompleted", true), // Total Bytes written to deep store - DEEP_STORE_WRITE_BYTES_COMPLETED("deepStoreWriteBytesCompleted", true); + DEEP_STORE_WRITE_BYTES_COMPLETED("deepStoreWriteBytesCompleted", true), + // Tracks failures encountered while fetching partition group metadata + PARTITION_GROUP_METADATA_FETCH_ERROR("failures", true); private final String _brokerMeterName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java index f7e7981a1a..0b55fd9faf 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java @@ -105,7 +105,8 @@ public class IdealStateGroupCommit { * @param helixManager helixManager with the ability to pull from the current data\ * @param resourceName the resource name to be updated * @param updater the idealState updater to be applied - * @return IdealState if the update is successful, null if not + * @return IdealState if the update is successful, exception if the update fails and null if interrupted while + * committing change */ public IdealState commit(HelixManager helixManager, String resourceName, Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 244f7853d8..8bc9ea442f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -21,6 +21,8 @@ package org.apache.pinot.controller.helix.core; import java.util.List; import org.apache.helix.model.IdealState; import org.apache.helix.model.builder.CustomModeISBuilder; +import org.apache.pinot.common.metrics.ControllerMeter; +import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; @@ -96,9 +98,13 @@ public class PinotTableIdealStateBuilder { return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); } catch (Exception e) { Exception fetcherException = partitionGroupMetadataFetcher.getException(); + String tableNameWithType = streamConfigs.get(0).getTableNameWithType(); LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfigs.stream().map(streamConfig -> streamConfig.getTopicName()).reduce((a, b) -> a + "," + b), - streamConfigs.get(0).getTableNameWithType(), fetcherException); + tableNameWithType, fetcherException); + ControllerMetrics controllerMetrics = ControllerMetrics.get(); + controllerMetrics.addMeteredTableValue(tableNameWithType, ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR, + 1L); throw new RuntimeException(fetcherException); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 765b25852a..d2445f3234 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1208,33 +1208,37 @@ public class PinotLLCRealtimeSegmentManager { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); - HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { - assert idealState != null; - boolean isTableEnabled = idealState.isEnabled(); - boolean isTablePaused = isTablePaused(idealState); - boolean offsetsHaveToChange = offsetCriteria != null; - if (isTableEnabled && !isTablePaused) { - List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = - offsetsHaveToChange - ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions - : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); - // FIXME: Right now, we assume topics are sharing same offset criteria - OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); - // Read the smallest offset when a new partition is detected - streamConfigs.stream() - .forEach(streamConfig -> streamConfig.setOffsetCriteria( - offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); - List<PartitionGroupMetadata> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); - streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); - return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, - offsetCriteria); - } else { - LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", - realtimeTableName, isTableEnabled, isTablePaused); - return idealState; - } - }, DEFAULT_RETRY_POLICY, true); + try { + HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + assert idealState != null; + boolean isTableEnabled = idealState.isEnabled(); + boolean isTablePaused = isTablePaused(idealState); + boolean offsetsHaveToChange = offsetCriteria != null; + if (isTableEnabled && !isTablePaused) { + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = + offsetsHaveToChange + ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions + : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); + // FIXME: Right now, we assume topics are sharing same offset criteria + OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); + // Read the smallest offset when a new partition is detected + streamConfigs.stream() + .forEach(streamConfig -> streamConfig.setOffsetCriteria( + offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList); + streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); + return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, + offsetCriteria); + } else { + LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", + realtimeTableName, isTableEnabled, isTablePaused); + return idealState; + } + }, DEFAULT_RETRY_POLICY, true); + } catch (Exception e) { + throw new RuntimeException("Failed to update ideal state during ensureAllPartitionsConsuming.", e); + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org