rohityadav1993 commented on code in PR #18486:
URL: https://github.com/apache/pinot/pull/18486#discussion_r3239738514


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/RetentionUtils.java:
##########
@@ -54,18 +61,40 @@ private RetentionUtils() {
    */
   public static boolean isPurgeable(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata, long retentionMs,
       long currentTimeMs, boolean useCreationTimeFallback) {
-    String segmentName = segmentZKMetadata.getSegmentName();
-    long endTimeMs = segmentZKMetadata.getEndTimeMs();
+    return isPurgeableInternal(segmentZKMetadata.getEndTimeMs(), 
segmentZKMetadata.getCreationTime(), retentionMs,
+        currentTimeMs, useCreationTimeFallback, tableNameWithType, 
segmentZKMetadata.getSegmentName());
+  }
+
+  /**
+   * Whether segment file metadata is past the retention window, using end 
time from
+   * {@link #segmentMetadataEndTimeMillis(SegmentMetadata)} and {@link 
SegmentMetadata#getIndexCreationTime()} as the
+   * optional fallback clock. Invalid-end paths emit the same style of 
debug/warn lines as the ZK overload, using
+   * {@link SegmentMetadata#getName()} for the segment in log messages.
+   * <ul>
+   *   <li>If end time is valid: purgeable when {@code currentTimeMs - 
endTimeMs > retentionMs}.</li>
+   *   <li>If end time is invalid and {@code useCreationTimeFallback} is true 
and index creation time is valid:
+   *       purgeable when {@code currentTimeMs - creationTimeMs > 
retentionMs}.</li>
+   *   <li>Otherwise: not purgeable (fail-open).</li>
+   * </ul>
+   */
+  public static boolean isPurgeable(String tableNameWithType, SegmentMetadata 
segmentMetadata, long retentionMs,
+      long currentTimeMs, boolean useCreationTimeFallback) {
+    return isPurgeableInternal(segmentMetadataEndTimeMillis(segmentMetadata), 
segmentMetadata.getIndexCreationTime(),
+        retentionMs, currentTimeMs, useCreationTimeFallback, 
tableNameWithType, segmentMetadata.getName());
+  }
+
+  private static boolean isPurgeableInternal(long endTimeMs, long 
creationTimeMs, long retentionMs,
+      long currentTimeMs, boolean useCreationTimeFallback, String 
tableNameWithType,
+      String segmentName) {
     if (TimeUtils.timeValueInValidRange(endTimeMs)) {
       return currentTimeMs - endTimeMs > retentionMs;
     }
+    if (useCreationTimeFallback && 
TimeUtils.timeValueInValidRange(creationTimeMs)) {

Review Comment:
   The creationTime fallback may be of no value for offline tables. It means 
that if a valid endTime was not computed for data then we fall back to the time 
when the segment was created. In many cases the createTimeMs will be closer to 
now(). so `currentTimeMs - creationTimeMs > retentionMs;` will always be false 
in fallback.
   
   
   In case of upsertCompaction like minion tasks, the creationTime is preserved 
from the original segments so thay may be very old. The segments would get 
rejected here. It looks like a similar handling is done previously on minion 
side to avoid uploading segments near retention boundary: 
https://github.com/apache/pinot/pull/18285



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/RetentionUtils.java:
##########
@@ -74,4 +103,46 @@ public static boolean isPurgeable(String tableNameWithType, 
SegmentZKMetadata se
     }
     return false;
   }
+
+  /**
+   * Parses {@link 
SegmentsValidationAndRetentionConfig#getRetentionTimeUnit()} and
+   * {@link SegmentsValidationAndRetentionConfig#getRetentionTimeValue()} into 
table data retention duration in
+   * milliseconds (same interpretation as controller time-based retention).
+   *
+   * @return millis, or empty if unset or not parseable
+   */
+  public static OptionalLong parseTableDataRetentionMillis(
+      @Nullable SegmentsValidationAndRetentionConfig validationConfig) {
+    if (validationConfig == null) {
+      return OptionalLong.empty();
+    }
+    String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+    String retentionTimeValue = validationConfig.getRetentionTimeValue();
+    if (StringUtils.isEmpty(retentionTimeUnit) || 
StringUtils.isEmpty(retentionTimeValue)) {
+      return OptionalLong.empty();
+    }
+    try {
+      return OptionalLong.of(TimeUnit.valueOf(retentionTimeUnit.toUpperCase())
+          .toMillis(Long.parseLong(retentionTimeValue)));
+    } catch (Exception e) {

Review Comment:
   This exception is silently ignored.



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/RetentionUtils.java:
##########
@@ -74,4 +103,46 @@ public static boolean isPurgeable(String tableNameWithType, 
SegmentZKMetadata se
     }
     return false;
   }
+
+  /**
+   * Parses {@link 
SegmentsValidationAndRetentionConfig#getRetentionTimeUnit()} and
+   * {@link SegmentsValidationAndRetentionConfig#getRetentionTimeValue()} into 
table data retention duration in
+   * milliseconds (same interpretation as controller time-based retention).
+   *
+   * @return millis, or empty if unset or not parseable
+   */
+  public static OptionalLong parseTableDataRetentionMillis(
+      @Nullable SegmentsValidationAndRetentionConfig validationConfig) {
+    if (validationConfig == null) {
+      return OptionalLong.empty();
+    }
+    String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+    String retentionTimeValue = validationConfig.getRetentionTimeValue();
+    if (StringUtils.isEmpty(retentionTimeUnit) || 
StringUtils.isEmpty(retentionTimeValue)) {
+      return OptionalLong.empty();
+    }
+    try {
+      return OptionalLong.of(TimeUnit.valueOf(retentionTimeUnit.toUpperCase())
+          .toMillis(Long.parseLong(retentionTimeValue)));
+    } catch (Exception e) {
+      return OptionalLong.empty();
+    }
+  }
+
+  /**
+   * End timestamp in epoch millis for retention comparison from segment file 
metadata, aligned with segment impl time
+   * interval when valid.
+   */
+  static long segmentMetadataEndTimeMillis(SegmentMetadata segmentMetadata) {

Review Comment:
   nit: naming does not imply `get` behaviour.



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/RetentionUtils.java:
##########
@@ -74,4 +103,46 @@ public static boolean isPurgeable(String tableNameWithType, 
SegmentZKMetadata se
     }
     return false;
   }
+
+  /**
+   * Parses {@link 
SegmentsValidationAndRetentionConfig#getRetentionTimeUnit()} and
+   * {@link SegmentsValidationAndRetentionConfig#getRetentionTimeValue()} into 
table data retention duration in
+   * milliseconds (same interpretation as controller time-based retention).
+   *
+   * @return millis, or empty if unset or not parseable
+   */
+  public static OptionalLong parseTableDataRetentionMillis(
+      @Nullable SegmentsValidationAndRetentionConfig validationConfig) {
+    if (validationConfig == null) {
+      return OptionalLong.empty();
+    }
+    String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+    String retentionTimeValue = validationConfig.getRetentionTimeValue();
+    if (StringUtils.isEmpty(retentionTimeUnit) || 
StringUtils.isEmpty(retentionTimeValue)) {
+      return OptionalLong.empty();
+    }
+    try {
+      return OptionalLong.of(TimeUnit.valueOf(retentionTimeUnit.toUpperCase())
+          .toMillis(Long.parseLong(retentionTimeValue)));
+    } catch (Exception e) {
+      return OptionalLong.empty();
+    }
+  }
+
+  /**
+   * End timestamp in epoch millis for retention comparison from segment file 
metadata, aligned with segment impl time
+   * interval when valid.
+   */
+  static long segmentMetadataEndTimeMillis(SegmentMetadata segmentMetadata) {
+    Interval interval = segmentMetadata.getTimeInterval();
+    if (interval != null && TimeUtils.isValidTimeInterval(interval)) {
+      return interval.getEndMillis();
+    }
+    TimeUnit timeUnit = segmentMetadata.getTimeUnit();

Review Comment:
   why do we need this fallback, generally interval and endtime are set 
together:
   
https://github.com/apache/pinot/blob/21c88f716f376bb7a0238e0feeeff12b088f55f7/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java#L181



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java:
##########
@@ -57,6 +63,56 @@ public static void validateTimeInterval(SegmentMetadata 
segmentMetadata, TableCo
     }
   }
 
+  /**
+   * When {@code controller.segment.upload.rejectOutOfRetention.enabled} is 
true on the controller, rejects the upload
+   * if segment end time (or index creation time when {@code 
controller.retentionManager.enableCreationTimeFallback} is
+   * enabled and end time is invalid) is past the table's {@code 
retentionTimeValue}/{@code retentionTimeUnit} window,
+   * using the same boundary as the controller retention manager.
+   *
+   * <p>Controller wiring: {@link 
org.apache.pinot.controller.api.resources.PinotSegmentUploadDownloadRestletResource}
+   * invokes this for single-segment upload only. METADATA batch upload 
({@code POST /segments/batchUpload}) and
+   * reingested-segment upload do not call it.
+   *
+   * <p>For <b>OFFLINE</b> tables, when batch segment ingestion type is not 
<b>APPEND</b>, this method returns without
+   * evaluating retention (aligned with time retention for completed 
segments). <b>REALTIME</b> tables always proceed
+   * to the retention comparison when the flag and parseable retention are set.
+   */
+  public static void rejectUploadIfOutOfRetention(SegmentMetadata 
segmentMetadata, TableConfig tableConfig,

Review Comment:
   Let's add metrics when we reject a segment 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java:
##########
@@ -57,6 +63,56 @@ public static void validateTimeInterval(SegmentMetadata 
segmentMetadata, TableCo
     }
   }
 
+  /**
+   * When {@code controller.segment.upload.rejectOutOfRetention.enabled} is 
true on the controller, rejects the upload
+   * if segment end time (or index creation time when {@code 
controller.retentionManager.enableCreationTimeFallback} is
+   * enabled and end time is invalid) is past the table's {@code 
retentionTimeValue}/{@code retentionTimeUnit} window,
+   * using the same boundary as the controller retention manager.
+   *
+   * <p>Controller wiring: {@link 
org.apache.pinot.controller.api.resources.PinotSegmentUploadDownloadRestletResource}
+   * invokes this for single-segment upload only. METADATA batch upload 
({@code POST /segments/batchUpload}) and
+   * reingested-segment upload do not call it.
+   *
+   * <p>For <b>OFFLINE</b> tables, when batch segment ingestion type is not 
<b>APPEND</b>, this method returns without
+   * evaluating retention (aligned with time retention for completed 
segments). <b>REALTIME</b> tables always proceed
+   * to the retention comparison when the flag and parseable retention are set.
+   */
+  public static void rejectUploadIfOutOfRetention(SegmentMetadata 
segmentMetadata, TableConfig tableConfig,
+      long currentTimeMs, boolean controllerRejectOutOfRetentionEnabled,
+      boolean retentionCreationTimeFallbackEnabled) {
+    if (!controllerRejectOutOfRetentionEnabled) {
+      return;
+    }
+    SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
+    if (validationConfig == null) {
+      return;
+    }
+    if (tableConfig.getTableType() == TableType.OFFLINE

Review Comment:
   Let's reuse the logic from 
https://github.com/apache/pinot/blob/21c88f716f376bb7a0238e0feeeff12b088f55f7/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java#L138
   
   We can move to RetentionUtils class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to