Copilot commented on code in PR #17789:
URL: https://github.com/apache/pinot/pull/17789#discussion_r2901243043


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -784,9 +791,135 @@ public List<SegmentContext> 
getSegmentContexts(List<IndexSegment> selectedSegmen
       Map<String, String> queryOptions) {
     List<SegmentContext> segmentContexts = new 
ArrayList<>(selectedSegments.size());
     selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
+    if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) {
+      _tableUpsertMetadataManager.setSegmentContexts(segmentContexts, 
queryOptions);
+    }
     return segmentContexts;
   }
 
+  @Override
+  public boolean isUpsertEnabled() {
+    return _tableUpsertMetadataManager != null;
+  }
+
+  @VisibleForTesting
+  @Override
+  public TableUpsertMetadataManager getTableUpsertMetadataManager() {
+    return _tableUpsertMetadataManager;
+  }
+
+  @Override
+  public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+    if (isUpsertEnabled()) {
+      return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
+    }
+    return Collections.emptyMap();
+  }
+
+  protected void handleUpsert(ImmutableSegment immutableSegment, @Nullable 
SegmentZKMetadata zkMetadata) {
+    String segmentName = immutableSegment.getSegmentName();
+    _logger.info("Adding immutable segment: {} with upsert enabled", 
segmentName);
+
+    setZkOperationTimeIfAvailable(immutableSegment, zkMetadata);
+    Integer partitionId;
+    if (TableNameBuilder.isRealtimeTableResource(_tableNameWithType)) {
+      partitionId = SegmentUtils.getSegmentPartitionId(segmentName, 
_tableNameWithType, _helixManager, null);
+    } else {
+      if (zkMetadata == null) {
+        zkMetadata = 
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
_tableNameWithType,
+            segmentName);
+      }
+      Preconditions.checkState(zkMetadata != null,
+          "Failed to find segment ZK metadata for segment: %s of table: %s", 
segmentName, _tableNameWithType);
+      partitionId = SegmentUtils.getSegmentPartitionId(zkMetadata, null);
+    }

Review Comment:
   In handleUpsert() for OFFLINE tables, when `zkMetadata` is null you fetch it 
from ZK later, but `setZkOperationTimeIfAvailable()` has already run (and 
returned early). This means `SegmentMetadataImpl` never gets 
`zkCreationTime`/`zkPushTime` populated in that path, which can break offline 
upsert tie-breaking (especially when using push/creation time as the fallback 
comparison) and can make segment replacement non-deterministic across replicas. 
Consider moving the `setZkOperationTimeIfAvailable()` call to after the ZK 
metadata is guaranteed to be available, or call it again immediately after 
fetching `zkMetadata` in the OFFLINE branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to