Jackie-Jiang commented on code in PR #10047: URL: https://github.com/apache/pinot/pull/10047#discussion_r1177122398
########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java: ########## @@ -57,12 +57,18 @@ public enum Strategy { @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery") private boolean _enableSnapshot; + @JsonPropertyDescription("Whether to use TTL to reduce upsert metadata memory footprint") + private boolean _enableTTL; Review Comment: Suggest removing this. Providing `_upsertTTLConfig` implicitly indicate that TTL is enabled ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java: ########## @@ -53,14 +56,19 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp @VisibleForTesting final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); + // keep track of all segments that haven't reach stable state to persist snapshot + @VisibleForTesting + final PriorityBlockingQueue<SegmentInfo> _nonPersistedSegmentsQueue = new PriorityBlockingQueue<>(); Review Comment: We can construct it only when TTL is enabled, and in the following places just check if it is `null` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -395,6 +399,31 @@ protected void finishOperation() { } } + /** + * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes. + */ + @Override + public void removeExpiredPrimaryKeys(Comparable timestamp) { + if (_upsertTTLConfig != null && _upsertTTLConfig.getTtlInMs() > 0) { Review Comment: This check is redundant because it can only be invoked when TTL is configured. We may remove the default implementation. Same for `persistSnapshotForStableSegment()` ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertTTLConfig.java: ########## @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + + +/** + * The {@code UpsertTTLConfig} class contains the upsert TTL (time-to-live) related configurations. + * Pinot upsert keeps track of all primary keys in heap, it's costly and also affects performance when table is large. + * + * If primary keys in the table have lifecycle, they won't get updated after a certain period time, then we can use the + * following configuration to enable upsert ttl feature. Pinot will only keeps track of alive primary keys in heap. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class UpsertTTLConfig { + + @JsonPropertyDescription("ttl time unit, supported time units are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS") + private String _ttlTimeUnit; Review Comment: We should be able to directly declare it as `TimeUnit` type ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java: ########## @@ -181,6 +198,55 @@ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDoc } } + /** + * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes. + * + * When committing consuming segment, we replace the consuming segment with an immutable segments. + * After replaceSegment, we iterate over recordInfoIterator to find validDocIds that are expired (out-of-TTL). + * Primarykey expired when the comparison time value of the record is less or equal to (segmentEndTime - TTL). + * + * @param expiredTimestamp segmentEndTime - TTLTime (converted ttl time values in millis time unit) + * @return void + */ + @Override + public void doRemoveExpiredPrimaryKeys(Comparable expiredTimestamp) { Review Comment: Currently this works only when the comparison column is timestamp (time column in milliseconds). We should add some validation to the table config ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java: ########## @@ -267,4 +333,28 @@ public Comparable getComparisonValue() { return _comparisonValue; } } + + @VisibleForTesting + static class SegmentInfo implements Comparable<SegmentInfo> { + private final IndexSegment _segment; + private final long _endtime; Review Comment: ```suggestion private final long _endTimeMs; ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java: ########## @@ -181,6 +198,55 @@ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDoc } } + /** + * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes. + * + * When committing consuming segment, we replace the consuming segment with an immutable segments. + * After replaceSegment, we iterate over recordInfoIterator to find validDocIds that are expired (out-of-TTL). + * Primarykey expired when the comparison time value of the record is less or equal to (segmentEndTime - TTL). + * + * @param expiredTimestamp segmentEndTime - TTLTime (converted ttl time values in millis time unit) + * @return void + */ + @Override + public void doRemoveExpiredPrimaryKeys(Comparable expiredTimestamp) { + _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> { + assert recordLocation.getComparisonValue() != null; + if (recordLocation.getComparisonValue().compareTo(expiredTimestamp) < 0) { + _primaryKeyToRecordLocationMap.remove(primaryKey); Review Comment: This can cause race condition. You want to do check and remove in case the record is updated during remove ```suggestion _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation); ``` ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertTTLConfig.java: ########## @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + + +/** + * The {@code UpsertTTLConfig} class contains the upsert TTL (time-to-live) related configurations. + * Pinot upsert keeps track of all primary keys in heap, it's costly and also affects performance when table is large. + * + * If primary keys in the table have lifecycle, they won't get updated after a certain period time, then we can use the + * following configuration to enable upsert ttl feature. Pinot will only keeps track of alive primary keys in heap. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class UpsertTTLConfig { + + @JsonPropertyDescription("ttl time unit, supported time units are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS") + private String _ttlTimeUnit; + @JsonPropertyDescription("ttl time value") + private String _ttlTimeValue; Review Comment: Declare it as `long` type -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org