Jackie-Jiang commented on code in PR #10047:
URL: https://github.com/apache/pinot/pull/10047#discussion_r1177122398


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -57,12 +57,18 @@ public enum Strategy {
   @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata 
recovery")
   private boolean _enableSnapshot;
 
+  @JsonPropertyDescription("Whether to use TTL to reduce upsert metadata 
memory footprint")
+  private boolean _enableTTL;

Review Comment:
   Suggest removing this. Providing `_upsertTTLConfig` implicitly indicate that 
TTL is enabled



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -53,14 +56,19 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
   @VisibleForTesting
   final ConcurrentHashMap<Object, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
+  // keep track of all segments that haven't reach stable state to persist 
snapshot
+  @VisibleForTesting
+  final PriorityBlockingQueue<SegmentInfo> _nonPersistedSegmentsQueue = new 
PriorityBlockingQueue<>();

Review Comment:
   We can construct it only when TTL is enabled, and in the following places 
just check if it is `null`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -395,6 +399,31 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired 
keys from the primary key indexes.
+   */
+  @Override
+  public void removeExpiredPrimaryKeys(Comparable timestamp) {
+    if (_upsertTTLConfig != null && _upsertTTLConfig.getTtlInMs() > 0) {

Review Comment:
   This check is redundant because it can only be invoked when TTL is 
configured. We may remove the default implementation. Same for 
`persistSnapshotForStableSegment()`



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertTTLConfig.java:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code UpsertTTLConfig} class contains the upsert TTL (time-to-live) 
related configurations.
+ * Pinot upsert keeps track of all primary keys in heap, it's costly and also 
affects performance when table is large.
+ *
+ * If primary keys in the table have lifecycle, they won't get updated after a 
certain period time, then we can use the
+ * following configuration to enable upsert ttl feature. Pinot will only keeps 
track of alive primary keys in heap.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UpsertTTLConfig {
+
+  @JsonPropertyDescription("ttl time unit, supported time units are DAYS, 
HOURS, MINUTES, SECONDS, MILLISECONDS")
+  private String _ttlTimeUnit;

Review Comment:
   We should be able to directly declare it as `TimeUnit` type



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -181,6 +198,55 @@ protected void removeSegment(IndexSegment segment, 
MutableRoaringBitmap validDoc
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired 
keys from the primary key indexes.
+   *
+   * When committing consuming segment, we replace the consuming segment with 
an immutable segments.
+   * After replaceSegment, we iterate over recordInfoIterator to find 
validDocIds that are expired (out-of-TTL).
+   * Primarykey expired when the comparison time value of the record is less 
or equal to (segmentEndTime - TTL).
+   *
+   * @param expiredTimestamp segmentEndTime - TTLTime (converted ttl time 
values in millis time unit)
+   * @return void
+   */
+  @Override
+  public void doRemoveExpiredPrimaryKeys(Comparable expiredTimestamp) {

Review Comment:
   Currently this works only when the comparison column is timestamp (time 
column in milliseconds). We should add some validation to the table config



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -267,4 +333,28 @@ public Comparable getComparisonValue() {
       return _comparisonValue;
     }
   }
+
+  @VisibleForTesting
+  static class SegmentInfo implements Comparable<SegmentInfo> {
+    private final IndexSegment _segment;
+    private final long _endtime;

Review Comment:
   ```suggestion
       private final long _endTimeMs;
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -181,6 +198,55 @@ protected void removeSegment(IndexSegment segment, 
MutableRoaringBitmap validDoc
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired 
keys from the primary key indexes.
+   *
+   * When committing consuming segment, we replace the consuming segment with 
an immutable segments.
+   * After replaceSegment, we iterate over recordInfoIterator to find 
validDocIds that are expired (out-of-TTL).
+   * Primarykey expired when the comparison time value of the record is less 
or equal to (segmentEndTime - TTL).
+   *
+   * @param expiredTimestamp segmentEndTime - TTLTime (converted ttl time 
values in millis time unit)
+   * @return void
+   */
+  @Override
+  public void doRemoveExpiredPrimaryKeys(Comparable expiredTimestamp) {
+    _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
+      assert recordLocation.getComparisonValue() != null;
+      if (recordLocation.getComparisonValue().compareTo(expiredTimestamp) < 0) 
{
+        _primaryKeyToRecordLocationMap.remove(primaryKey);

Review Comment:
   This can cause race condition. You want to do check and remove in case the 
record is updated during remove
   ```suggestion
           _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertTTLConfig.java:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code UpsertTTLConfig} class contains the upsert TTL (time-to-live) 
related configurations.
+ * Pinot upsert keeps track of all primary keys in heap, it's costly and also 
affects performance when table is large.
+ *
+ * If primary keys in the table have lifecycle, they won't get updated after a 
certain period time, then we can use the
+ * following configuration to enable upsert ttl feature. Pinot will only keeps 
track of alive primary keys in heap.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UpsertTTLConfig {
+
+  @JsonPropertyDescription("ttl time unit, supported time units are DAYS, 
HOURS, MINUTES, SECONDS, MILLISECONDS")
+  private String _ttlTimeUnit;
+  @JsonPropertyDescription("ttl time value")
+  private String _ttlTimeValue;

Review Comment:
   Declare it as `long` type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to