Copilot commented on code in PR #17789:
URL: https://github.com/apache/pinot/pull/17789#discussion_r2901164423


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -784,9 +790,122 @@ public List<SegmentContext> 
getSegmentContexts(List<IndexSegment> selectedSegmen
       Map<String, String> queryOptions) {
     List<SegmentContext> segmentContexts = new 
ArrayList<>(selectedSegments.size());
     selectedSegments.forEach(s -> segmentContexts.add(new SegmentContext(s)));
+    if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(queryOptions)) {
+      _tableUpsertMetadataManager.setSegmentContexts(segmentContexts, 
queryOptions);
+    }
     return segmentContexts;
   }
 
+  @Override
+  public boolean isUpsertEnabled() {
+    return _tableUpsertMetadataManager != null;
+  }
+
+  @VisibleForTesting
+  @Override
+  public TableUpsertMetadataManager getTableUpsertMetadataManager() {
+    return _tableUpsertMetadataManager;
+  }
+
+  @Override
+  public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+    if (isUpsertEnabled()) {
+      return _tableUpsertMetadataManager.getPartitionToPrimaryKeyCount();
+    }
+    return Collections.emptyMap();
+  }
+
+  protected void handleUpsert(ImmutableSegment immutableSegment, @Nullable 
SegmentZKMetadata zkMetadata) {
+    String segmentName = immutableSegment.getSegmentName();
+    _logger.info("Adding immutable segment: {} with upsert enabled", 
segmentName);
+
+    setZkOperationTimeIfAvailable(immutableSegment, zkMetadata);
+
+    Integer partitionId = zkMetadata != null
+        ? SegmentUtils.getSegmentPartitionId(zkMetadata, null)
+        : SegmentUtils.getSegmentPartitionId(segmentName, _tableNameWithType, 
_helixManager, null);
+    Preconditions.checkNotNull(partitionId,
+        "Failed to get partition id for segment: %s (upsert-enabled table: 
%s). "
+            + "Segment must follow a naming convention that encodes partition 
id (e.g. LLCSegmentName, "
+            + "UploadedRealtimeSegmentName), or have partition metadata 
configured via SegmentPartitionConfig.",
+        segmentName, _tableNameWithType);
+    PartitionUpsertMetadataManager partitionUpsertMetadataManager =
+        _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
+
+    _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.DOCUMENT_COUNT,
+        immutableSegment.getSegmentMetadata().getTotalDocs());
+    _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
+    ImmutableSegmentDataManager newSegmentManager = new 
ImmutableSegmentDataManager(immutableSegment);
+    if (partitionUpsertMetadataManager.isPreloading()) {
+      // Register segment after it is preloaded and has initialized its 
validDocIds. The order of preloading and
+      // registering segment doesn't matter much as preloading happens before 
the table partition is ready for queries.
+      partitionUpsertMetadataManager.preloadSegment(immutableSegment);
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
+      _logger.info("Preloaded immutable segment: {} with upsert enabled", 
segmentName);
+      return;
+    }
+    SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
+    if (oldSegmentManager == null) {
+      // When adding a new segment, register it *before* 
partitionUpsertMetadataManager.addSegment() fully initializes
+      // the validDocIds bitmap. This lets queries access the new segment 
immediately while the bitmap is being built.
+      // Without early registration, docs in existing segments that get 
invalidated by this new segment would become
+      // invisible to queries until addSegment() completes.
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
+      partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
+      partitionUpsertMetadataManager.addSegment(immutableSegment);
+      _logger.info("Added new immutable segment: {} with upsert enabled", 
segmentName);
+    } else {
+      replaceUpsertSegment(segmentName, oldSegmentManager, newSegmentManager, 
partitionUpsertMetadataManager);
+    }
+  }
+
+  protected void replaceUpsertSegment(String segmentName, SegmentDataManager 
oldSegmentManager,
+      ImmutableSegmentDataManager newSegmentManager, 
PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+    IndexSegment oldSegment = oldSegmentManager.getSegment();
+    ImmutableSegment immutableSegment = newSegmentManager.getSegment();
+    UpsertConfig.ConsistencyMode consistencyMode = 
_tableUpsertMetadataManager.getContext().getConsistencyMode();
+    if (consistencyMode == UpsertConfig.ConsistencyMode.NONE) {
+      // When replacing a segment, register the new segment *after* 
replaceSegment() finishes filling its validDocIds
+      // bitmap. Otherwise queries lose access to valid docs in the old 
segment before the new bitmap is ready.
+      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
+    } else {
+      // For consistency modes, keep both old and new segments visible to 
queries during replacement so that queries
+      // can see the new updates in the new segment while the old segment's 
validDocIds are still being updated.
+      // Register the new segment to the upsert metadata manager before making 
it visible to queries so the upsert
+      // view is updated before any query can access it.
+      SegmentDataManager duoSegmentDataManager = new 
DuoSegmentDataManager(newSegmentManager, oldSegmentManager);
+      registerSegment(segmentName, duoSegmentDataManager, 
partitionUpsertMetadataManager);
+      partitionUpsertMetadataManager.replaceSegment(immutableSegment, 
oldSegment);
+      registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
+    }
+    _logger.info("Replaced {} segment: {} with upsert enabled and consistency 
mode: {}",
+        oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, consistencyMode);
+    oldSegmentManager.offload();
+    releaseSegment(oldSegmentManager);
+  }
+
+  protected void registerSegment(String segmentName, SegmentDataManager 
segmentDataManager,
+      @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) 
{
+    if (partitionUpsertMetadataManager != null) {
+      
partitionUpsertMetadataManager.trackSegmentForUpsertView(segmentDataManager.getSegment());
+    }
+    registerSegment(segmentName, segmentDataManager);
+  }
+
+  protected void setZkOperationTimeIfAvailable(ImmutableSegment segment, 
@Nullable SegmentZKMetadata zkMetadata) {
+    if (zkMetadata != null && zkMetadata.getCreationTime() > 0) {
+      SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+      if (segmentMetadata instanceof SegmentMetadataImpl) {
+        SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) 
segmentMetadata;
+        segmentMetadataImpl.setZkCreationTime(zkMetadata.getCreationTime());
+        segmentMetadataImpl.setZkPushTime(zkMetadata.getPushTime());
+        _logger.info("Set ZK creation time: {}, push time: {} for segment: {} 
in upsert table",
+            zkMetadata.getCreationTime(), zkMetadata.getPushTime(), 
zkMetadata.getSegmentName());

Review Comment:
   `setZkOperationTimeIfAvailable()` only sets `_zkPushTime` when 
`zkMetadata.getCreationTime() > 0`. For uploaded segments, it’s possible to 
have a valid `pushTime` even when `creationTime` is unset/invalid (the codebase 
already treats pushTime as the authoritative cluster creation time in 
`SegmentUtils.getSegmentCreationTimeMs`). With the current guard, 
offline-upsert tie-breaking can fall back to local index creation time across 
replicas, breaking consistency. Consider setting ZK times independently when 
either `pushTime` or `creationTime` is available (e.g., set pushTime when 
`getPushTime() > 0` / `!= Long.MIN_VALUE`, and set creationTime when 
`getCreationTime() > 0`).
   ```suggestion
       if (zkMetadata == null) {
         return;
       }
       SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
       if (segmentMetadata instanceof SegmentMetadataImpl) {
         SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) 
segmentMetadata;
         boolean zkTimeSet = false;
         long creationTime = zkMetadata.getCreationTime();
         if (creationTime > 0) {
           segmentMetadataImpl.setZkCreationTime(creationTime);
           zkTimeSet = true;
         }
         long pushTime = zkMetadata.getPushTime();
         if (pushTime > 0) {
           segmentMetadataImpl.setZkPushTime(pushTime);
           zkTimeSet = true;
         }
         if (zkTimeSet) {
           _logger.info("Set ZK creation time: {}, push time: {} for segment: 
{} in upsert table",
               creationTime, pushTime, zkMetadata.getSegmentName());
   ```



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java:
##########
@@ -81,6 +81,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   private long _dataCrc = Long.MIN_VALUE;
   private long _creationTime = Long.MIN_VALUE;
   private long _zkCreationTime = Long.MIN_VALUE;  // ZooKeeper creation time 
for upsert consistency
+  private long _zkPushTime = Long.MIN_VALUE; // Zookeeper push time for upsert 
consistency

Review Comment:
   Minor naming/capitalization inconsistency in the field comment: 
`_zkCreationTime` uses “ZooKeeper”, but `_zkPushTime` comment uses “Zookeeper”. 
Consider standardizing on “ZooKeeper” to match the rest of the codebase and the 
surrounding comment.
   ```suggestion
     private long _zkPushTime = Long.MIN_VALUE; // ZooKeeper push time for 
upsert consistency
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -786,9 +786,19 @@ static void validateUpsertAndDedupConfig(TableConfig 
tableConfig, Schema schema)
     // check both upsert and dedup are not enabled simultaneously
     Preconditions.checkState(!(isUpsertEnabled && isDedupEnabled),
         "A table can have either Upsert or Dedup enabled, but not both");
-    // check table type is realtime
-    Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
-        "Upsert/Dedup table is for realtime table only.");
+    if (tableConfig.getTableType() == TableType.OFFLINE) {
+      Preconditions.checkState(isUpsertEnabled && !isDedupEnabled,
+          "Dedup is not supported for OFFLINE table. Only upsert is supported 
for OFFLINE table");
+      // Offline upsert tables require segment partition config so that 
segments are assigned to servers
+      // based on partition, ensuring all segments of a partition land on the 
same server for correct dedup.
+      IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+      SegmentPartitionConfig segmentPartitionConfig =
+          indexingConfig != null ? indexingConfig.getSegmentPartitionConfig() 
: null;
+      Preconditions.checkState(
+          segmentPartitionConfig != null && 
MapUtils.isNotEmpty(segmentPartitionConfig.getColumnPartitionMap()),
+          "Offline upsert table must have segment partition config to ensure 
correct partition-based "
+              + "segment assignment. Configure segmentPartitionConfig in the 
indexingConfig.");

Review Comment:
   For OFFLINE upsert, validation currently only checks that 
`segmentPartitionConfig.columnPartitionMap` is non-empty. However, 
`BaseTableDataManager.handleUpsert()` resolves the partition id via 
`SegmentUtils.getSegmentPartitionId(zkMetadata, null)`, which returns `null` 
unless either (a) the segment name encodes a partition id or (b) the ZK 
partition metadata contains exactly one partition column (because 
`partitionColumn` is passed as `null`). This means an OFFLINE upsert table with 
multiple partition columns configured will pass validation but fail at runtime 
when loading/adding segments. Consider tightening OFFLINE-upsert validation to 
ensure the partition id is determinable (e.g., require exactly one partition 
column in `segmentPartitionConfig`, or otherwise validate that the partition 
column used for routing/replica-group is uniquely defined and can be used to 
resolve the partition id).
   ```suggestion
             segmentPartitionConfig != null
                 && 
MapUtils.isNotEmpty(segmentPartitionConfig.getColumnPartitionMap())
                 && segmentPartitionConfig.getColumnPartitionMap().size() == 1,
             "Offline upsert table must have segment partition config with 
exactly one partition column to ensure "
                 + "deterministic partition-based segment assignment. Configure 
a single-column "
                 + "segmentPartitionConfig in the indexingConfig.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to