Jackie-Jiang commented on code in PR #13992: URL: https://github.com/apache/pinot/pull/13992#discussion_r1761577304
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -1200,12 +1219,45 @@ public void trackSegmentForUpsertView(IndexSegment segment) { if (_upsertViewManager != null) { _upsertViewManager.trackSegment(segment); } + if (segment instanceof MutableSegment) { + trackNewlyAddedSegment(segment); + } } @Override public void untrackSegmentForUpsertView(IndexSegment segment) { if (_upsertViewManager != null) { _upsertViewManager.untrackSegment(segment); } + if (segment instanceof MutableSegment) { + untrackNewlyAddedSegment(segment); + } + } + + @VisibleForTesting + void trackNewlyAddedSegment(IndexSegment segment) { + if (_newSegmentTrackingTimeMs > 0) { + _newlyAddedSegments.put(segment.getSegmentName(), -1L); + } + } + + @VisibleForTesting + void untrackNewlyAddedSegment(IndexSegment segment) { + if (_newSegmentTrackingTimeMs > 0) { + _newlyAddedSegments.put(segment.getSegmentName(), System.currentTimeMillis() + _newSegmentTrackingTimeMs); + } + } + + public Set<String> getNewlyAddedSegments() { + if (_newSegmentTrackingTimeMs > 0) { + // Untrack stale segments at query time. The overhead should be limited as the tracking map should be very small. + long nowMs = System.currentTimeMillis(); + if (_logger.isDebugEnabled()) { + _logger.debug("Cleaning stale segments from tracking map: {} with nowMs: {}", _newlyAddedSegments, nowMs); + } + _newlyAddedSegments.entrySet().removeIf(e -> e.getValue() > 0 && e.getValue() < nowMs); Review Comment: (minor) Can we use `valueSet()` to remove entries? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -153,11 +164,17 @@ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int parti _metadataTTL = context.getMetadataTTL(); _deletedKeysTTL = context.getDeletedKeysTTL(); _tableIndexDir = context.getTableIndexDir(); + long trackingTimeMs = context.getNewSegmentTrackingTimeMs(); UpsertConfig.ConsistencyMode cmode = context.getConsistencyMode(); if (cmode == UpsertConfig.ConsistencyMode.SYNC || cmode == UpsertConfig.ConsistencyMode.SNAPSHOT) { _upsertViewManager = new UpsertViewManager(cmode, context); + // For consistency mode, we have to track newly added segments, so use default tracking time to enable the + // tracking of newly added segments if it's not enabled explicitly. + _newSegmentTrackingTimeMs = + trackingTimeMs > 0 ? trackingTimeMs : UpsertViewManager.DEFAULT_NEW_SEGMENT_TRACKING_TIME_MS; Review Comment: Consider banning this config in table validation, instead of ignoring the config that explicitly turns off this feature ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java: ########## @@ -1200,12 +1219,45 @@ public void trackSegmentForUpsertView(IndexSegment segment) { if (_upsertViewManager != null) { _upsertViewManager.trackSegment(segment); } + if (segment instanceof MutableSegment) { + trackNewlyAddedSegment(segment); + } } @Override public void untrackSegmentForUpsertView(IndexSegment segment) { if (_upsertViewManager != null) { _upsertViewManager.untrackSegment(segment); } + if (segment instanceof MutableSegment) { + untrackNewlyAddedSegment(segment); + } + } + + @VisibleForTesting + void trackNewlyAddedSegment(IndexSegment segment) { + if (_newSegmentTrackingTimeMs > 0) { + _newlyAddedSegments.put(segment.getSegmentName(), -1L); + } + } + + @VisibleForTesting + void untrackNewlyAddedSegment(IndexSegment segment) { Review Comment: Why do we need to differentiate track/untrack? It is quite confusing that the segments are actually tracked after calling untrack. Some javadoc can be helpful -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org