Jackie-Jiang commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1260446634


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -367,6 +397,16 @@ public void replaceSegment(ImmutableSegment segment, 
@Nullable ThreadSafeMutable
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, 
segmentName);
     segmentLock.lock();
     try {
+      // Skip adding segments that has segment EndTime in the comparison cols 
earlier than (largestSeenTimestamp - TTL).
+      // Note: We only update largestSeenComparisonValueMs when addRecord, and 
access the value when addSegments.
+      // We only support single comparison column for TTL-enabled upsert 
tables.
+      if (_largestSeenComparisonValue > 0) {
+        Number endTime =
+            (Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+        if (endTime.doubleValue() < _largestSeenComparisonValue - 
_metadataTTL) {

Review Comment:
   Add some logs here



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -556,6 +657,28 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired 
keys from the primary key indexes.
+   * Primary keys that has comparison value earlier than 
largestSeenComparisonValue - TTL value will be removed.
+   */
+  @Override
+  public void removeExpiredPrimaryKeys() {
+    if (_stopped) {
+      _logger.debug("Skip removing expired primary keys because metadata 
manager is already stopped");
+      return;
+    }
+    startOperation();
+    try {
+      if (_metadataTTL > 0) {
+        doRemoveExpiredPrimaryKeys(_largestSeenComparisonValue);

Review Comment:
   (minor) No need to pass in the value since it is accessible to the child 
class



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -675,6 +675,10 @@ public void run() {
         // Take upsert snapshot before starting consuming events
         if (_partitionUpsertMetadataManager != null) {
           _partitionUpsertMetadataManager.takeSnapshot();
+          // If upsertTTL is enabled, we will remove expired primary keys from 
upsertMetadata after taking snapshot.
+          if (_tableConfig.getUpsertConfig().getMetadataTTL() > 0) {
+            _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();

Review Comment:
   Similar to `takeSnapshot()`, we can perform the check inside the metadata 
manager



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -544,6 +584,67 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - 
startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark: {} from file for table: {} 
partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, 
skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from 
upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", 
watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, 
false)) {
+        DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
+        dataOutputStream.writeDouble(watermark);
+      }

Review Comment:
   ```suggestion
         try (OutputStream outputStream = new FileOutputStream(watermarkFile, 
false);
           DataOutputStream dataOutputStream = new 
DataOutputStream(outputStream)) {
           dataOutputStream.writeDouble(watermark);
         }
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -247,6 +267,16 @@ public void addSegment(ImmutableSegmentImpl segment, 
@Nullable ThreadSafeMutable
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, 
segmentName);
     segmentLock.lock();
     try {
+      // Skip adding segments that has segment EndTime in the comparison cols 
earlier than (largestSeenTimestamp - TTL).
+      // Note: We only update largestSeenComparisonValueMs when addRecord, and 
access the value when addSegments.
+      // We only support single comparison column for TTL-enabled upsert 
tables.
+      if (_largestSeenComparisonValue > 0) {
+        Number endTime =
+            (Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+        if (endTime.doubleValue() < _largestSeenComparisonValue - 
_metadataTTL) {

Review Comment:
   Add some log here indicating the segment is skipped



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -234,6 +255,14 @@ protected void doAddRecord(MutableSegment segment, 
RecordInfo recordInfo) {
     Comparable newComparisonValue = recordInfo.getComparisonValue();
     
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
         (primaryKey, currentRecordLocation) -> {
+          // Update the largestSeenComparisonValueMs when add new record. If 
records during addSegments has a newer
+          // comparison column values than addRecords, it's a bug and should 
not happen.
+          if (_metadataTTL > 0) {
+            Number recordComparisonValue = (Number) 
recordInfo.getComparisonValue();
+            if (recordComparisonValue.doubleValue() > 
_largestSeenComparisonValue) {
+              _largestSeenComparisonValue = 
recordComparisonValue.doubleValue();
+            }

Review Comment:
   (nit)
   ```suggestion
               _largestSeenComparisonValue = 
Math.max(_largestSeenComparisonValue, recordComparisonValue.doubleValue());
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -556,6 +657,28 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired 
keys from the primary key indexes.
+   * Primary keys that has comparison value earlier than 
largestSeenComparisonValue - TTL value will be removed.
+   */
+  @Override
+  public void removeExpiredPrimaryKeys() {
+    if (_stopped) {
+      _logger.debug("Skip removing expired primary keys because metadata 
manager is already stopped");
+      return;
+    }
+    startOperation();
+    try {
+      if (_metadataTTL > 0) {

Review Comment:
   Similar to `takeSnapshot()`, perform this check as the first step



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java:
##########
@@ -26,6 +26,7 @@ private V1Constants() {
   public static final String INDEX_MAP_FILE_NAME = "index_map";
   public static final String INDEX_FILE_NAME = "columns.psf";
   public static final String VALID_DOC_IDS_SNAPSHOT_FILE_NAME = 
"validdocids.bitmap.snapshot";
+  public static final String TTL_WATERMARK_TABLE_PARTITION = 
".ttl.watermark.partition.";

Review Comment:
   Remove the leading `.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to