Jackie-Jiang commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1256897188


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - 
startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} 
partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, 
skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from 
upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", 
watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, 
false)) {
+        long longBits = Double.doubleToLongBits(watermark);
+        outputStream.write(Longs.toByteArray(longBits));
+      }
+      _logger.info("Persisted watermark {} to file for table: {} partition_id: 
{}", watermark,
+          _tableNameWithType, _partitionId);
+    } catch (Exception e) {
+      _logger.warn("Caught exception while persisting watermark file: {}, 
skipping",
+          watermarkFile);
+    }
+  }
+
+  /**
+   * Note: Watermark file need to be deleted when upsert TTL is disabled in 
the upsertConfig.
+   * */
+  protected void deleteWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.info("Deleted watermarkFile for table: {} partition_id: {}", 
_tableNameWithType, _partitionId);
+          return;
+        }
+      } catch (Exception e) {
+        _logger.warn("Caught exception while deleting watermarkFile for table: 
{} partition_id: {}", _tableNameWithType,
+            _partitionId);
+      }
+    }
+  }
+
+  protected File getWatermarkFile() {
+    return new File(_tableIndexDir, _tableNameWithType + 
V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId);

Review Comment:
   Let's not prefix it with table name since it is already under the table dir



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -76,23 +85,34 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
 
   protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
+
+  // Used to maintain the largestSeenComparisonValue to avoid handling 
out-of-ttl segments/records.
+  // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to 
compute expired segments.
+  protected volatile double _largestSeenComparisonValue;

Review Comment:
   (minor) Move this after `_numOutOfOrderEvents`. We want to keep variables 
for the same purpose together



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -60,7 +67,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected final String _deleteRecordColumn;
   protected final HashFunction _hashFunction;
   protected final PartialUpsertHandler _partialUpsertHandler;
+  protected final double _upsertTTLInComparisonTimeUnit;

Review Comment:
   (minor) Move `_upsertTTLInComparisonTimeUnit` after `_enableSnapshot` to 
keep the variables for the same feature together



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -220,6 +258,13 @@ protected void doAddRecord(MutableSegment segment, 
RecordInfo recordInfo) {
     Comparable newComparisonValue = recordInfo.getComparisonValue();
     
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
         (primaryKey, currentRecordLocation) -> {
+          // Update the largestSeenComparisonValueMs when add new record. If 
records during addSegments has a newer
+          // comparison column values than addRecords, it's a bug and should 
not happen.
+          if (_upsertTTLInComparisonTimeUnit > 0) {
+            if 
(recordInfo.getComparisonValue().compareTo(_largestSeenComparisonValue) > 0) {
+              _largestSeenComparisonValue = (double) 
recordInfo.getComparisonValue();

Review Comment:
   The cast can throw exception. Please add a test with non-double comparison 
column



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -74,6 +75,16 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap 
validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
+    // Skip adding segments that has segment EndTime in the comparison cols 
earlier than (largestSeenTimestamp - TTL).
+    // Note: We only update largestSeenComparisonValueMs when addRecord, and 
access the value when addSegments.
+    // We only support single comparison column for TTL-enabled upsert tables.
+    if (_largestSeenComparisonValue > 0) {
+      Comparable endTime =

Review Comment:
   (MAJOR) This won't work if end time is not double type. You want to do 
`Number.getDouble()` then compare



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - 
startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} 
partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, 
skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from 
upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", 
watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, 
false)) {

Review Comment:
   (minor) You may use `DateOutputStream` to directly write `double`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - 
startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} 
partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, 
skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from 
upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", 
watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, 
false)) {
+        long longBits = Double.doubleToLongBits(watermark);
+        outputStream.write(Longs.toByteArray(longBits));
+      }
+      _logger.info("Persisted watermark {} to file for table: {} partition_id: 
{}", watermark,
+          _tableNameWithType, _partitionId);
+    } catch (Exception e) {
+      _logger.warn("Caught exception while persisting watermark file: {}, 
skipping",
+          watermarkFile);
+    }
+  }
+
+  /**
+   * Note: Watermark file need to be deleted when upsert TTL is disabled in 
the upsertConfig.
+   * */
+  protected void deleteWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.info("Deleted watermarkFile for table: {} partition_id: {}", 
_tableNameWithType, _partitionId);

Review Comment:
   This is incorrect. We should log warning that the file cannot be deleted 
(same as line 523)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -84,6 +95,13 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
       
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
           (primaryKey, currentRecordLocation) -> {
             if (currentRecordLocation != null) {
+              // Skip the records that has comparisonValue timestamp earlier 
than (largestSeenTimestamp - TTL).

Review Comment:
   I don't follow this logic. Do we need this check?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -60,6 +60,8 @@ public enum Strategy {
   @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata 
recovery")
   private boolean _enableSnapshot;
 
+  private double _upsertTTLInComparisonTimeUnit;

Review Comment:
   This does not necessary to be time though. How about naming it 
`_metadataTTL` and also add some descriptions



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -76,23 +85,34 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
 
   protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
+
+  // Used to maintain the largestSeenComparisonValue to avoid handling 
out-of-ttl segments/records.
+  // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to 
compute expired segments.
+  protected volatile double _largestSeenComparisonValue;
   protected int _numOutOfOrderEvents = 0;
 
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, 
@Nullable String deleteRecordColumn,
-      HashFunction hashFunction, @Nullable PartialUpsertHandler 
partialUpsertHandler, boolean enableSnapshot,
-      ServerMetrics serverMetrics) {
+      HashFunction hashFunction, @Nullable PartialUpsertHandler 
partialUpsertHandler,
+      double upsertTTLInComparisonTimeUnit, boolean enableSnapshot, File 
tableIndexDir, ServerMetrics serverMetrics) {

Review Comment:
   (minor) Move `upsertTTLInComparisonTimeUnit` after `enableSnapshot`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -74,6 +75,16 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap 
validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
+    // Skip adding segments that has segment EndTime in the comparison cols 
earlier than (largestSeenTimestamp - TTL).
+    // Note: We only update largestSeenComparisonValueMs when addRecord, and 
access the value when addSegments.
+    // We only support single comparison column for TTL-enabled upsert tables.
+    if (_largestSeenComparisonValue > 0) {

Review Comment:
   Can we move this logic into the base manager?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - 
startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} 
partition_id: {}", watermark, _tableNameWithType,

Review Comment:
   (minor)
   ```suggestion
           _logger.info("Loaded watermark: {} from file for table: {} 
partition_id: {}", watermark, _tableNameWithType,
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - 
startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} 
partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, 
skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from 
upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", 
watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, 
false)) {
+        long longBits = Double.doubleToLongBits(watermark);
+        outputStream.write(Longs.toByteArray(longBits));
+      }
+      _logger.info("Persisted watermark {} to file for table: {} partition_id: 
{}", watermark,
+          _tableNameWithType, _partitionId);
+    } catch (Exception e) {
+      _logger.warn("Caught exception while persisting watermark file: {}, 
skipping",
+          watermarkFile);
+    }
+  }
+
+  /**
+   * Note: Watermark file need to be deleted when upsert TTL is disabled in 
the upsertConfig.
+   * */
+  protected void deleteWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {

Review Comment:
   (minor) This try-catch is not needed



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -595,6 +595,39 @@ && 
isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
+    validateTTLForUpsertConfig(tableConfig, schema);
+  }
+
+  /**
+   * Validates whether the comparison columns is compatible with Upsert TTL 
feature.
+   * Validation fails when one of the comparison columns is not a numeric 
value.
+   */
+  @VisibleForTesting
+  static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema 
schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE
+        || tableConfig.getUpsertConfig().getUpsertTTLInComparisonTimeUnit() == 
0) {
+      return;
+    }
+
+    // comparison columns should hold timestamp values in numeric values
+    List<String> comparisonColumns = 
tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null && !comparisonColumns.isEmpty()) {
+
+      // currently we only support 1 comparison column since we need to fetch 
endTime in comparisonValue time unit from
+      // columnMetadata. If we have multiple comparison columns, we can only 
use the first comparison column as filter.
+      Preconditions.checkState(comparisonColumns.size() <= 1,
+          String.format("Currently upsert TTL only support 1 comparison 
columns."));
+
+      for (String column : comparisonColumns) {

Review Comment:
   (minor) We can just check `comparisonColumns.get(0)`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -595,6 +595,39 @@ && 
isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
+    validateTTLForUpsertConfig(tableConfig, schema);
+  }
+
+  /**
+   * Validates whether the comparison columns is compatible with Upsert TTL 
feature.
+   * Validation fails when one of the comparison columns is not a numeric 
value.
+   */
+  @VisibleForTesting
+  static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema 
schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE
+        || tableConfig.getUpsertConfig().getUpsertTTLInComparisonTimeUnit() == 
0) {
+      return;
+    }
+
+    // comparison columns should hold timestamp values in numeric values
+    List<String> comparisonColumns = 
tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null && !comparisonColumns.isEmpty()) {
+
+      // currently we only support 1 comparison column since we need to fetch 
endTime in comparisonValue time unit from
+      // columnMetadata. If we have multiple comparison columns, we can only 
use the first comparison column as filter.
+      Preconditions.checkState(comparisonColumns.size() <= 1,

Review Comment:
   (minor)
   ```suggestion
         Preconditions.checkState(comparisonColumns.size() == 1,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to