Jackie-Jiang commented on code in PR #10915: URL: https://github.com/apache/pinot/pull/10915#discussion_r1260446634
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -367,6 +397,16 @@ public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutable Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); segmentLock.lock(); try { + // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL). + // Note: We only update largestSeenComparisonValueMs when addRecord, and access the value when addSegments. + // We only support single comparison column for TTL-enabled upsert tables. + if (_largestSeenComparisonValue > 0) { + Number endTime = + (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue(); + if (endTime.doubleValue() < _largestSeenComparisonValue - _metadataTTL) { Review Comment: Add some logs here ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -556,6 +657,28 @@ protected void finishOperation() { } } + /** + * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes. + * Primary keys that has comparison value earlier than largestSeenComparisonValue - TTL value will be removed. + */ + @Override + public void removeExpiredPrimaryKeys() { + if (_stopped) { + _logger.debug("Skip removing expired primary keys because metadata manager is already stopped"); + return; + } + startOperation(); + try { + if (_metadataTTL > 0) { + doRemoveExpiredPrimaryKeys(_largestSeenComparisonValue); Review Comment: (minor) No need to pass in the value since it is accessible to the child class ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -675,6 +675,10 @@ public void run() { // Take upsert snapshot before starting consuming events if (_partitionUpsertMetadataManager != null) { _partitionUpsertMetadataManager.takeSnapshot(); + // If upsertTTL is enabled, we will remove expired primary keys from upsertMetadata after taking snapshot. + if (_tableConfig.getUpsertConfig().getMetadataTTL() > 0) { + _partitionUpsertMetadataManager.removeExpiredPrimaryKeys(); Review Comment: Similar to `takeSnapshot()`, we can perform the check inside the metadata manager ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -544,6 +584,67 @@ protected void doTakeSnapshot() { numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs); } + /** + * Note: Load watermark when the server is started/restarted. + * */ + protected double loadWatermark() { + File watermarkFile = getWatermarkFile(); + if (watermarkFile.exists()) { + try { + byte[] bytes = FileUtils.readFileToByteArray(watermarkFile); + double watermark = ByteBuffer.wrap(bytes).getDouble(); + _logger.info("Loaded watermark: {} from file for table: {} partition_id: {}", watermark, _tableNameWithType, + _partitionId); + return watermark; + } catch (Exception e) { + _logger.warn("Caught exception while loading watermark file: {}, skipping", + watermarkFile); + } + } + return Double.MIN_VALUE; + } + + /** + * Note: Persist watermark when the expired primary keys are cleanup from upsertMetadata. + * */ + protected void persistWatermark(double watermark) { + File watermarkFile = getWatermarkFile(); + try { + if (watermarkFile.exists()) { + if (!FileUtils.deleteQuietly(watermarkFile)) { + _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile); + return; + } + } + try (OutputStream outputStream = new FileOutputStream(watermarkFile, false)) { + DataOutputStream dataOutputStream = new DataOutputStream(outputStream); + dataOutputStream.writeDouble(watermark); + } Review Comment: ```suggestion try (OutputStream outputStream = new FileOutputStream(watermarkFile, false); DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) { dataOutputStream.writeDouble(watermark); } ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -247,6 +267,16 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); segmentLock.lock(); try { + // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL). + // Note: We only update largestSeenComparisonValueMs when addRecord, and access the value when addSegments. + // We only support single comparison column for TTL-enabled upsert tables. + if (_largestSeenComparisonValue > 0) { + Number endTime = + (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue(); + if (endTime.doubleValue() < _largestSeenComparisonValue - _metadataTTL) { Review Comment: Add some log here indicating the segment is skipped ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java: ########## @@ -234,6 +255,14 @@ protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) { Comparable newComparisonValue = recordInfo.getComparisonValue(); _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (primaryKey, currentRecordLocation) -> { + // Update the largestSeenComparisonValueMs when add new record. If records during addSegments has a newer + // comparison column values than addRecords, it's a bug and should not happen. + if (_metadataTTL > 0) { + Number recordComparisonValue = (Number) recordInfo.getComparisonValue(); + if (recordComparisonValue.doubleValue() > _largestSeenComparisonValue) { + _largestSeenComparisonValue = recordComparisonValue.doubleValue(); + } Review Comment: (nit) ```suggestion _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, recordComparisonValue.doubleValue()); ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -556,6 +657,28 @@ protected void finishOperation() { } } + /** + * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes. + * Primary keys that has comparison value earlier than largestSeenComparisonValue - TTL value will be removed. + */ + @Override + public void removeExpiredPrimaryKeys() { + if (_stopped) { + _logger.debug("Skip removing expired primary keys because metadata manager is already stopped"); + return; + } + startOperation(); + try { + if (_metadataTTL > 0) { Review Comment: Similar to `takeSnapshot()`, perform this check as the first step ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java: ########## @@ -26,6 +26,7 @@ private V1Constants() { public static final String INDEX_MAP_FILE_NAME = "index_map"; public static final String INDEX_FILE_NAME = "columns.psf"; public static final String VALID_DOC_IDS_SNAPSHOT_FILE_NAME = "validdocids.bitmap.snapshot"; + public static final String TTL_WATERMARK_TABLE_PARTITION = ".ttl.watermark.partition."; Review Comment: Remove the leading `.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org