rajagopr commented on code in PR #15257:
URL: https://github.com/apache/pinot/pull/15257#discussion_r1994358106


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java:
##########
@@ -165,6 +183,85 @@ private void manageRetentionForRealtimeTable(String 
realtimeTableName, Retention
     }
   }
 
+  @VisibleForTesting
+  void manageRetentionForHybridTable(TableConfig realtimeTableConfig) {

Review Comment:
   The notion of time boundary would apply to a hybrid table, bet it created by 
Pinot or manually created by user. Given this, it would be safe to delete 
segments from the REALTIME table that are older than the offline table time 
boundary.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java:
##########
@@ -165,6 +183,85 @@ private void manageRetentionForRealtimeTable(String 
realtimeTableName, Retention
     }
   }
 
+  @VisibleForTesting
+  void manageRetentionForHybridTable(TableConfig realtimeTableConfig) {
+    List<String> segmentsToDelete = new ArrayList<>();
+    String realtimeTableName = realtimeTableConfig.getTableName();
+    String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+    try {
+      TableConfig offlineTableConfig = 
_pinotHelixResourceManager.getOfflineTableConfig(offlineTableName);
+      ZkHelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
+      Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
offlineTableName);
+      if (schema == null) {
+        throw new RuntimeException("Failed to get schema for table: " + 
offlineTableName);
+      }
+      String timeColumn = null;
+      SegmentsValidationAndRetentionConfig validationConfig = 
offlineTableConfig.getValidationConfig();
+      if (validationConfig != null) {
+        timeColumn = validationConfig.getTimeColumnName();
+      }
+      if (StringUtils.isEmpty(timeColumn)) {
+        throw new RuntimeException("TimeColumn is null or empty for table: " + 
offlineTableName);
+      }
+      DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
+      if (dateTimeSpec == null) {
+        throw new RuntimeException(String.format("Failed to get 
DateTimeFieldSpec for time column: %s of table: %s",
+            timeColumn, offlineTableName));
+      }
+      DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec();
+      if (timeFormatSpec.getColumnUnit() == null) {
+        throw new RuntimeException(String.format("Time unit must be configured 
in the field spec for time column: %s of"
+            + " table: %s", timeColumn, offlineTableName));
+      }
+      // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 
HOUR) as the time boundary; otherwise, use
+      // (maxEndTime - 1 DAY)
+      boolean isHourlyTable = 
CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(
+          
IngestionConfigUtils.getBatchSegmentIngestionFrequency(realtimeTableConfig))
+          && timeFormatSpec.getColumnUnit() != TimeUnit.DAYS;

Review Comment:
   Did you mean to ask that overlapping data should not be there? If so, it's 
fine for overlapping data to be present in the REALTIME and OFFLINE tables. The 
time boundary would take care of querying the appropriate table.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java:
##########
@@ -165,6 +183,85 @@ private void manageRetentionForRealtimeTable(String 
realtimeTableName, Retention
     }
   }
 
+  @VisibleForTesting
+  void manageRetentionForHybridTable(TableConfig realtimeTableConfig) {
+    List<String> segmentsToDelete = new ArrayList<>();
+    String realtimeTableName = realtimeTableConfig.getTableName();
+    String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+    try {
+      TableConfig offlineTableConfig = 
_pinotHelixResourceManager.getOfflineTableConfig(offlineTableName);
+      ZkHelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
+      Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
offlineTableName);
+      if (schema == null) {
+        throw new RuntimeException("Failed to get schema for table: " + 
offlineTableName);
+      }
+      String timeColumn = null;
+      SegmentsValidationAndRetentionConfig validationConfig = 
offlineTableConfig.getValidationConfig();
+      if (validationConfig != null) {
+        timeColumn = validationConfig.getTimeColumnName();
+      }
+      if (StringUtils.isEmpty(timeColumn)) {
+        throw new RuntimeException("TimeColumn is null or empty for table: " + 
offlineTableName);
+      }
+      DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
+      if (dateTimeSpec == null) {
+        throw new RuntimeException(String.format("Failed to get 
DateTimeFieldSpec for time column: %s of table: %s",
+            timeColumn, offlineTableName));
+      }
+      DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec();
+      if (timeFormatSpec.getColumnUnit() == null) {
+        throw new RuntimeException(String.format("Time unit must be configured 
in the field spec for time column: %s of"
+            + " table: %s", timeColumn, offlineTableName));
+      }
+      // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 
HOUR) as the time boundary; otherwise, use
+      // (maxEndTime - 1 DAY)
+      boolean isHourlyTable = 
CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(
+          
IngestionConfigUtils.getBatchSegmentIngestionFrequency(realtimeTableConfig))
+          && timeFormatSpec.getColumnUnit() != TimeUnit.DAYS;
+      long timeOffsetMs = isHourlyTable ? TimeUnit.HOURS.toMillis(1) : 
TimeUnit.DAYS.toMillis(1);
+      long offlineSegmentsMaxEndTimeMs = -1;
+      // Determine the max end time of all offline segments
+      for (SegmentZKMetadata segmentZKMetadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+        // skip empty segments
+        if (segmentZKMetadata.getTotalDocs() == 0) {
+          LOGGER.warn("Skipping empty segment: {} from table: {}", 
segmentZKMetadata.getSegmentName(),
+              offlineTableName);
+          continue;
+        }
+        offlineSegmentsMaxEndTimeMs = Math.max(offlineSegmentsMaxEndTimeMs, 
segmentZKMetadata.getEndTimeMs());
+      }
+
+      if (offlineSegmentsMaxEndTimeMs == -1) {
+        throw new RuntimeException("Failed to determine max end time of 
segments for table: " + offlineTableName);
+      }
+
+      // realtime table segments older than the time boundary will be deleted
+      long timeBoundaryMs = offlineSegmentsMaxEndTimeMs - timeOffsetMs;
+
+      // Iterate over all COMPLETED segments of the REALTIME table and check 
if they are eligible for deletion.
+      for (SegmentZKMetadata segmentZKMetadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName)) {
+        // The segment should be in COMPLETED state
+        if (segmentZKMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.IN_PROGRESS
+            || segmentZKMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.COMMITTING) {
+          continue;
+        }
+        // The segment should be older than the cutoff time
+        if (segmentZKMetadata.getEndTimeMs() < timeBoundaryMs) {
+          segmentsToDelete.add(segmentZKMetadata.getSegmentName());
+        }
+      }
+      if (!segmentsToDelete.isEmpty()) {

Review Comment:
   I'm removing this line as this info is explicitly getting printed within the 
`pinotHelixResourceManager. deleteSegments(...)` method.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java:
##########
@@ -165,6 +183,85 @@ private void manageRetentionForRealtimeTable(String 
realtimeTableName, Retention
     }
   }
 
+  @VisibleForTesting
+  void manageRetentionForHybridTable(TableConfig realtimeTableConfig) {
+    List<String> segmentsToDelete = new ArrayList<>();
+    String realtimeTableName = realtimeTableConfig.getTableName();
+    String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+    try {
+      TableConfig offlineTableConfig = 
_pinotHelixResourceManager.getOfflineTableConfig(offlineTableName);
+      ZkHelixPropertyStore<ZNRecord> propertyStore = 
_pinotHelixResourceManager.getPropertyStore();
+      Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
offlineTableName);
+      if (schema == null) {
+        throw new RuntimeException("Failed to get schema for table: " + 
offlineTableName);
+      }
+      String timeColumn = null;
+      SegmentsValidationAndRetentionConfig validationConfig = 
offlineTableConfig.getValidationConfig();
+      if (validationConfig != null) {
+        timeColumn = validationConfig.getTimeColumnName();
+      }
+      if (StringUtils.isEmpty(timeColumn)) {
+        throw new RuntimeException("TimeColumn is null or empty for table: " + 
offlineTableName);
+      }
+      DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
+      if (dateTimeSpec == null) {
+        throw new RuntimeException(String.format("Failed to get 
DateTimeFieldSpec for time column: %s of table: %s",
+            timeColumn, offlineTableName));
+      }
+      DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec();
+      if (timeFormatSpec.getColumnUnit() == null) {
+        throw new RuntimeException(String.format("Time unit must be configured 
in the field spec for time column: %s of"
+            + " table: %s", timeColumn, offlineTableName));
+      }
+      // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 
HOUR) as the time boundary; otherwise, use
+      // (maxEndTime - 1 DAY)
+      boolean isHourlyTable = 
CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(
+          
IngestionConfigUtils.getBatchSegmentIngestionFrequency(realtimeTableConfig))

Review Comment:
   I had the same thought initially. However, it's a small portion (offsetTime) 
within the `TimeBoundaryManager` that we are using here and that logic is the 
same. For queries, the time boundary is calculated by looking into an 
explicitly set time boundary or calculate like the logic here. I chose to avoid 
making changes that would affect the query path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to