This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new edb1301123 add metric for time retention failing due to end time 
(#13879)
edb1301123 is described below

commit edb13011230228393350cddca54e4ac4c4b7e08b
Author: Johan Adami <4760722+jadam...@users.noreply.github.com>
AuthorDate: Fri Sep 6 18:43:50 2024 -0400

    add metric for time retention failing due to end time (#13879)
---
 .../pinot/common/metrics/ControllerGauge.java      |  5 +++
 .../controller/helix/SegmentStatusChecker.java     | 26 +++++++++++++++
 .../controller/helix/SegmentStatusCheckerTest.java | 37 ++++++++++++++++++++++
 3 files changed, 68 insertions(+)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 82c4e666e1..d052a75485 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -35,6 +35,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge 
{
   PERCENT_OF_REPLICAS("percent", false),
 
   SEGMENTS_IN_ERROR_STATE("segments", false),
+  // Segment start and end time is stored in milliseconds.
+  // Invalid start/end time means the broker time pruner will not work 
correctly.
+  // Invalid end times means time retention will not happen for that segment.
+  SEGMENTS_WITH_INVALID_START_TIME("segments", false),
+  SEGMENTS_WITH_INVALID_END_TIME("segments", false),
 
   // Percentage of segments with at least one online replica in external view 
as compared to total number of segments in
   // ideal state
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index a9a2484752..c9a48022c0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -55,6 +55,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -282,6 +283,8 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
     List<String> offlineSegments = new ArrayList<>();
     // Segments with fewer replicas online (ONLINE/CONSUMING) in external view 
than in ideal state
     List<String> partialOnlineSegments = new ArrayList<>();
+    List<String> segmentsInvalidStartTime = new ArrayList<>();
+    List<String> segmentsInvalidEndTime = new ArrayList<>();
     for (String segment : segments) {
       int numISReplicas = 0;
       for (Map.Entry<String, String> entry : 
idealState.getInstanceStateMap(segment).entrySet()) {
@@ -318,6 +321,15 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
         continue;
       }
 
+      if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) {
+        if 
(!TimeUtils.timeValueInValidRange(segmentZKMetadata.getStartTimeMs())) {
+          segmentsInvalidStartTime.add(segment);
+        }
+        if 
(!TimeUtils.timeValueInValidRange(segmentZKMetadata.getEndTimeMs())) {
+          segmentsInvalidEndTime.add(segment);
+        }
+      }
+
       int numEVReplicas = 0;
       if (externalView != null) {
         Map<String, String> stateMap = externalView.getStateMap(segment);
@@ -378,6 +390,16 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
       LOGGER.warn("Table {} has {} segments with fewer replicas than the 
replication factor: {}", tableNameWithType,
           numPartialOnlineSegments, logSegments(partialOnlineSegments));
     }
+    int numInvalidStartTime = segmentsInvalidStartTime.size();
+    if (numInvalidStartTime > 0) {
+      LOGGER.warn("Table {} has {} segments with invalid start time: {}", 
tableNameWithType, numInvalidStartTime,
+          logSegments(segmentsInvalidStartTime));
+    }
+    int numInvalidEndTime = segmentsInvalidEndTime.size();
+    if (numInvalidEndTime > 0) {
+      LOGGER.warn("Table {} has {} segments with invalid end time: {}", 
tableNameWithType, numInvalidEndTime,
+          logSegments(segmentsInvalidEndTime));
+    }
 
     // Synchronization provided by Controller Gauge to make sure that only one 
thread updates the gauge
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas);
@@ -391,6 +413,10 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
         numPartialOnlineSegments);
     _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.TABLE_COMPRESSED_SIZE,
         tableCompressedSize);
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME,
+        numInvalidStartTime);
+    _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME,
+        numInvalidEndTime);
 
     if (tableType == TableType.REALTIME && tableConfig != null) {
       StreamConfig streamConfig =
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 1edd2176e6..5f2ae7ea32 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -51,6 +51,7 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.Test;
@@ -700,4 +701,40 @@ public class SegmentStatusCheckerTest {
     String jsonString = JsonUtils.objectToPrettyString(segmentStatusInfoList);
     assertEquals(jsonString, json);
   }
+
+  @Test
+  public void testInvalidSegmentStartEndTime() {
+    IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+    idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
+    idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
+    idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
+    idealState.setReplicas("3");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
+    ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+    externalView.setState("myTable_0", "pinot1", "ONLINE");
+    externalView.setState("myTable_0", "pinot2", "ONLINE");
+    externalView.setState("myTable_0", "pinot3", "ONLINE");
+
+    ZNRecord znRecord = new ZNRecord("myTable_0");
+    znRecord.setLongField(CommonConstants.Segment.START_TIME, 
TimeUtils.VALID_MIN_TIME_MILLIS - 1);
+    znRecord.setLongField(CommonConstants.Segment.END_TIME, 
TimeUtils.VALID_MAX_TIME_MILLIS + 1);
+    SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 
11111L);
+    
when(segmentZKMetadata.getStartTimeMs()).thenReturn(TimeUtils.VALID_MIN_TIME_MILLIS
 - 1);
+    
when(segmentZKMetadata.getEndTimeMs()).thenReturn(TimeUtils.VALID_MAX_TIME_MILLIS
 + 1);
+
+    PinotHelixResourceManager resourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+    
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+    when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), 
anyString())).thenReturn(segmentZKMetadata);
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+    runSegmentStatusChecker(resourceManager, 0);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
OFFLINE_TABLE_NAME,
+        ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME), 1);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
OFFLINE_TABLE_NAME,
+        ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME), 1);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to