This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new edb1301123 add metric for time retention failing due to end time (#13879) edb1301123 is described below commit edb13011230228393350cddca54e4ac4c4b7e08b Author: Johan Adami <4760722+jadam...@users.noreply.github.com> AuthorDate: Fri Sep 6 18:43:50 2024 -0400 add metric for time retention failing due to end time (#13879) --- .../pinot/common/metrics/ControllerGauge.java | 5 +++ .../controller/helix/SegmentStatusChecker.java | 26 +++++++++++++++ .../controller/helix/SegmentStatusCheckerTest.java | 37 ++++++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 82c4e666e1..d052a75485 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -35,6 +35,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { PERCENT_OF_REPLICAS("percent", false), SEGMENTS_IN_ERROR_STATE("segments", false), + // Segment start and end time is stored in milliseconds. + // Invalid start/end time means the broker time pruner will not work correctly. + // Invalid end times means time retention will not happen for that segment. + SEGMENTS_WITH_INVALID_START_TIME("segments", false), + SEGMENTS_WITH_INVALID_END_TIME("segments", false), // Percentage of segments with at least one online replica in external view as compared to total number of segments in // ideal state diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index a9a2484752..c9a48022c0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -55,6 +55,7 @@ import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -282,6 +283,8 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh List<String> offlineSegments = new ArrayList<>(); // Segments with fewer replicas online (ONLINE/CONSUMING) in external view than in ideal state List<String> partialOnlineSegments = new ArrayList<>(); + List<String> segmentsInvalidStartTime = new ArrayList<>(); + List<String> segmentsInvalidEndTime = new ArrayList<>(); for (String segment : segments) { int numISReplicas = 0; for (Map.Entry<String, String> entry : idealState.getInstanceStateMap(segment).entrySet()) { @@ -318,6 +321,15 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh continue; } + if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) { + if (!TimeUtils.timeValueInValidRange(segmentZKMetadata.getStartTimeMs())) { + segmentsInvalidStartTime.add(segment); + } + if (!TimeUtils.timeValueInValidRange(segmentZKMetadata.getEndTimeMs())) { + segmentsInvalidEndTime.add(segment); + } + } + int numEVReplicas = 0; if (externalView != null) { Map<String, String> stateMap = externalView.getStateMap(segment); @@ -378,6 +390,16 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh LOGGER.warn("Table {} has {} segments with fewer replicas than the replication factor: {}", tableNameWithType, numPartialOnlineSegments, logSegments(partialOnlineSegments)); } + int numInvalidStartTime = segmentsInvalidStartTime.size(); + if (numInvalidStartTime > 0) { + LOGGER.warn("Table {} has {} segments with invalid start time: {}", tableNameWithType, numInvalidStartTime, + logSegments(segmentsInvalidStartTime)); + } + int numInvalidEndTime = segmentsInvalidEndTime.size(); + if (numInvalidEndTime > 0) { + LOGGER.warn("Table {} has {} segments with invalid end time: {}", tableNameWithType, numInvalidEndTime, + logSegments(segmentsInvalidEndTime)); + } // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas); @@ -391,6 +413,10 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh numPartialOnlineSegments); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE, tableCompressedSize); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME, + numInvalidStartTime); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME, + numInvalidEndTime); if (tableType == TableType.REALTIME && tableConfig != null) { StreamConfig streamConfig = diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 1edd2176e6..5f2ae7ea32 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -51,6 +51,7 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.Test; @@ -700,4 +701,40 @@ public class SegmentStatusCheckerTest { String jsonString = JsonUtils.objectToPrettyString(segmentStatusInfoList); assertEquals(jsonString, json); } + + @Test + public void testInvalidSegmentStartEndTime() { + IdealState idealState = new IdealState(OFFLINE_TABLE_NAME); + idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); + idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); + idealState.setPartitionState("myTable_0", "pinot3", "ONLINE"); + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME); + externalView.setState("myTable_0", "pinot1", "ONLINE"); + externalView.setState("myTable_0", "pinot2", "ONLINE"); + externalView.setState("myTable_0", "pinot3", "ONLINE"); + + ZNRecord znRecord = new ZNRecord("myTable_0"); + znRecord.setLongField(CommonConstants.Segment.START_TIME, TimeUtils.VALID_MIN_TIME_MILLIS - 1); + znRecord.setLongField(CommonConstants.Segment.END_TIME, TimeUtils.VALID_MAX_TIME_MILLIS + 1); + SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 11111L); + when(segmentZKMetadata.getStartTimeMs()).thenReturn(TimeUtils.VALID_MIN_TIME_MILLIS - 1); + when(segmentZKMetadata.getEndTimeMs()).thenReturn(TimeUtils.VALID_MAX_TIME_MILLIS + 1); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), anyString())).thenReturn(segmentZKMetadata); + + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + + runSegmentStatusChecker(resourceManager, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME), 1); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME), 1); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org