swaminathanmanish commented on code in PR #15257: URL: https://github.com/apache/pinot/pull/15257#discussion_r1994032068
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java: ########## @@ -165,6 +183,85 @@ private void manageRetentionForRealtimeTable(String realtimeTableName, Retention } } + @VisibleForTesting + void manageRetentionForHybridTable(TableConfig realtimeTableConfig) { Review Comment: Will this time boundary logic work for user managed hybrid tables as well? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java: ########## @@ -165,6 +183,85 @@ private void manageRetentionForRealtimeTable(String realtimeTableName, Retention } } + @VisibleForTesting + void manageRetentionForHybridTable(TableConfig realtimeTableConfig) { + List<String> segmentsToDelete = new ArrayList<>(); + String realtimeTableName = realtimeTableConfig.getTableName(); + String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); + try { + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(offlineTableName); + ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore(); + Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, offlineTableName); + if (schema == null) { + throw new RuntimeException("Failed to get schema for table: " + offlineTableName); + } + String timeColumn = null; + SegmentsValidationAndRetentionConfig validationConfig = offlineTableConfig.getValidationConfig(); + if (validationConfig != null) { + timeColumn = validationConfig.getTimeColumnName(); + } + if (StringUtils.isEmpty(timeColumn)) { + throw new RuntimeException("TimeColumn is null or empty for table: " + offlineTableName); + } + DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn); + if (dateTimeSpec == null) { + throw new RuntimeException(String.format("Failed to get DateTimeFieldSpec for time column: %s of table: %s", + timeColumn, offlineTableName)); + } + DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec(); + if (timeFormatSpec.getColumnUnit() == null) { + throw new RuntimeException(String.format("Time unit must be configured in the field spec for time column: %s of" + + " table: %s", timeColumn, offlineTableName)); + } + // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary; otherwise, use + // (maxEndTime - 1 DAY) + boolean isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase( + IngestionConfigUtils.getBatchSegmentIngestionFrequency(realtimeTableConfig)) + && timeFormatSpec.getColumnUnit() != TimeUnit.DAYS; + long timeOffsetMs = isHourlyTable ? TimeUnit.HOURS.toMillis(1) : TimeUnit.DAYS.toMillis(1); + long offlineSegmentsMaxEndTimeMs = -1; + // Determine the max end time of all offline segments + for (SegmentZKMetadata segmentZKMetadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + // skip empty segments + if (segmentZKMetadata.getTotalDocs() == 0) { + LOGGER.warn("Skipping empty segment: {} from table: {}", segmentZKMetadata.getSegmentName(), + offlineTableName); + continue; + } + offlineSegmentsMaxEndTimeMs = Math.max(offlineSegmentsMaxEndTimeMs, segmentZKMetadata.getEndTimeMs()); + } + + if (offlineSegmentsMaxEndTimeMs == -1) { + throw new RuntimeException("Failed to determine max end time of segments for table: " + offlineTableName); + } + + // realtime table segments older than the time boundary will be deleted + long timeBoundaryMs = offlineSegmentsMaxEndTimeMs - timeOffsetMs; + + // Iterate over all COMPLETED segments of the REALTIME table and check if they are eligible for deletion. + for (SegmentZKMetadata segmentZKMetadata : _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName)) { + // The segment should be in COMPLETED state + if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS + || segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.COMMITTING) { + continue; + } + // The segment should be older than the cutoff time + if (segmentZKMetadata.getEndTimeMs() < timeBoundaryMs) { + segmentsToDelete.add(segmentZKMetadata.getSegmentName()); + } + } + if (!segmentsToDelete.isEmpty()) { Review Comment: Please move log.info above the empty check so that we know how many segments are there (empty or not empty) ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java: ########## @@ -165,6 +183,85 @@ private void manageRetentionForRealtimeTable(String realtimeTableName, Retention } } + @VisibleForTesting + void manageRetentionForHybridTable(TableConfig realtimeTableConfig) { + List<String> segmentsToDelete = new ArrayList<>(); + String realtimeTableName = realtimeTableConfig.getTableName(); + String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); + try { + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(offlineTableName); + ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore(); + Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, offlineTableName); + if (schema == null) { + throw new RuntimeException("Failed to get schema for table: " + offlineTableName); + } + String timeColumn = null; + SegmentsValidationAndRetentionConfig validationConfig = offlineTableConfig.getValidationConfig(); + if (validationConfig != null) { + timeColumn = validationConfig.getTimeColumnName(); + } + if (StringUtils.isEmpty(timeColumn)) { + throw new RuntimeException("TimeColumn is null or empty for table: " + offlineTableName); + } + DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn); + if (dateTimeSpec == null) { + throw new RuntimeException(String.format("Failed to get DateTimeFieldSpec for time column: %s of table: %s", + timeColumn, offlineTableName)); + } + DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec(); + if (timeFormatSpec.getColumnUnit() == null) { + throw new RuntimeException(String.format("Time unit must be configured in the field spec for time column: %s of" + + " table: %s", timeColumn, offlineTableName)); + } + // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary; otherwise, use + // (maxEndTime - 1 DAY) + boolean isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase( + IngestionConfigUtils.getBatchSegmentIngestionFrequency(realtimeTableConfig)) + && timeFormatSpec.getColumnUnit() != TimeUnit.DAYS; Review Comment: Curious, do we have any validation to ensure offline & realtime table time ranges do not overlap, since hybrid tables can also be directly created & managed by user without RTO like tasks. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java: ########## @@ -165,6 +183,85 @@ private void manageRetentionForRealtimeTable(String realtimeTableName, Retention } } + @VisibleForTesting + void manageRetentionForHybridTable(TableConfig realtimeTableConfig) { + List<String> segmentsToDelete = new ArrayList<>(); + String realtimeTableName = realtimeTableConfig.getTableName(); + String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); + try { + TableConfig offlineTableConfig = _pinotHelixResourceManager.getOfflineTableConfig(offlineTableName); + ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore(); + Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, offlineTableName); + if (schema == null) { + throw new RuntimeException("Failed to get schema for table: " + offlineTableName); + } + String timeColumn = null; + SegmentsValidationAndRetentionConfig validationConfig = offlineTableConfig.getValidationConfig(); + if (validationConfig != null) { + timeColumn = validationConfig.getTimeColumnName(); + } + if (StringUtils.isEmpty(timeColumn)) { + throw new RuntimeException("TimeColumn is null or empty for table: " + offlineTableName); + } + DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn); + if (dateTimeSpec == null) { + throw new RuntimeException(String.format("Failed to get DateTimeFieldSpec for time column: %s of table: %s", + timeColumn, offlineTableName)); + } + DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec(); + if (timeFormatSpec.getColumnUnit() == null) { + throw new RuntimeException(String.format("Time unit must be configured in the field spec for time column: %s of" + + " table: %s", timeColumn, offlineTableName)); + } + // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary; otherwise, use + // (maxEndTime - 1 DAY) + boolean isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase( + IngestionConfigUtils.getBatchSegmentIngestionFrequency(realtimeTableConfig)) Review Comment: Is this the same logic used by broker as well for hybrid tables? Can we share the time boundary logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org