Copilot commented on code in PR #17789:
URL: https://github.com/apache/pinot/pull/17789#discussion_r2901243043
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -784,9 +791,135 @@ public List<SegmentContext>
getSegmentContexts(List<IndexSegment> selectedSegmen
Map<String, String> queryOptions) {
List<SegmentContext> segmentContexts = new
ArrayList<>(selectedSegments.size());
selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
+ if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) {
+ _tableUpsertMetadataManager.setSegmentContexts(segmentContexts,
queryOptions);
+ }
return segmentContexts;
}
+ @Override
+ public boolean isUpsertEnabled() {
+ return _tableUpsertMetadataManager != null;
+ }
+
+ @VisibleForTesting
+ @Override
+ public TableUpsertMetadataManager getTableUpsertMetadataManager() {
+ return _tableUpsertMetadataManager;
+ }
+
+ @Override
+ public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+ if (isUpsertEnabled()) {
+ return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
+ }
+ return Collections.emptyMap();
+ }
+
+ protected void handleUpsert(ImmutableSegment immutableSegment, @Nullable
SegmentZKMetadata zkMetadata) {
+ String segmentName = immutableSegment.getSegmentName();
+ _logger.info("Adding immutable segment: {} with upsert enabled",
segmentName);
+
+ setZkOperationTimeIfAvailable(immutableSegment, zkMetadata);
+ Integer partitionId;
+ if (TableNameBuilder.isRealtimeTableResource(_tableNameWithType)) {
+ partitionId = SegmentUtils.getSegmentPartitionId(segmentName,
_tableNameWithType, _helixManager, null);
+ } else {
+ if (zkMetadata == null) {
+ zkMetadata =
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
_tableNameWithType,
+ segmentName);
+ }
+ Preconditions.checkState(zkMetadata != null,
+ "Failed to find segment ZK metadata for segment: %s of table: %s",
segmentName, _tableNameWithType);
+ partitionId = SegmentUtils.getSegmentPartitionId(zkMetadata, null);
+ }
Review Comment:
In handleUpsert() for OFFLINE tables, when `zkMetadata` is null you fetch it
from ZK later, but `setZkOperationTimeIfAvailable()` has already run (and
returned early). This means `SegmentMetadataImpl` never gets
`zkCreationTime`/`zkPushTime` populated in that path, which can break offline
upsert tie-breaking (especially when using push/creation time as the fallback
comparison) and can make segment replacement non-deterministic across replicas.
Consider moving the `setZkOperationTimeIfAvailable()` call to after the ZK
metadata is guaranteed to be available, or call it again immediately after
fetching `zkMetadata` in the OFFLINE branch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]