This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b62eb1fadb Revert Upsert Metadata of a segment during Inconsistencies 
(#17324)
2b62eb1fadb is described below

commit 2b62eb1fadbad85666922f878cac9dc07c26d90f
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Wed Dec 31 18:45:00 2025 -0800

    Revert Upsert Metadata of a segment during Inconsistencies (#17324)
    
    * Revert Upsert Metadata For partial upserts and Upserts with outOfOrder = 
true
    
    * Checkstyle fixes
    
    * Revert the reordering of the variables
    
    Co-authored-by: Copilot <[email protected]>
    
    * Apply suggestions from code review
    
    Co-authored-by: Copilot <[email protected]>
    
    * Remove class name in the declaration
    
    * Make methods abstract
    
    * Replace docId while reverting the metadata
    
    * Checkstyle fixes
    
    * Change the comments
    
    * Add primary key instead of segment
    
    * Clear the newly added pks
    
    * Clear keys after removing the entries in the hashset
    
    * update the comment
    
    * Typo for _previousKeyToRecordLocationMap
    
    * Checkstyle Exceptions
    
    * Add exception message
    
    * Add tests
    
    * Remove unused variable
    
    * Change the code to accomodate the newly added keys when there is 
consistent deletes enabled
    
    * Remove the comments
    
    * Checkstyle fixes
    
    * Reorder the imports
    
    * fix the test and compilation error
    
    * Add in more tests and reversion logic into consistent deletes - Upserts
    
    * Update 
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Apply suggestions from code review
    
    Add suggestions from co-pilot
    
    Co-authored-by: Copilot <[email protected]>
    
    * Checkstyle fixes
    
    * Reduce the number of comments
    
    * Update the pk map
    
    * Fix the compilation issue
    
    * Add segment null checks
    
    * Update the comments
    
    * Add in information to track the number of keys getting replaced for a 
segment
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../models/DummyTableUpsertMetadataManager.java    |  13 +
 .../upsert/BasePartitionUpsertMetadataManager.java | 121 +++-
 ...oncurrentMapPartitionUpsertMetadataManager.java |  87 ++-
 ...nUpsertMetadataManagerForConsistentDeletes.java |  95 +++-
 .../BasePartitionUpsertMetadataManagerTest.java    |  13 +
 ...ertMetadataManagerForConsistentDeletesTest.java | 112 ++++
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 609 ++++++++++++++++++++-
 7 files changed, 991 insertions(+), 59 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
index 42dfef78287..b7c9b733ef4 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
@@ -78,6 +78,10 @@ public class DummyTableUpsertMetadataManager extends 
BaseTableUpsertMetadataMana
       return false;
     }
 
+    @Override
+    protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
+    }
+
     @Override
     protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
     }
@@ -90,5 +94,14 @@ public class DummyTableUpsertMetadataManager extends 
BaseTableUpsertMetadataMana
     @Override
     protected void doRemoveExpiredPrimaryKeys() {
     }
+
+    @Override
+    protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
+        ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds) {
+    }
+
+    @Override
+    protected void eraseKeyToPreviousLocationMap() {
+    }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index fdd4b1bb62e..1c64c6ee3b9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -289,6 +289,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
     try {
       doAddSegment((ImmutableSegmentImpl) segment);
+      eraseKeyToPreviousLocationMap();
       _trackedSegments.add(segment);
       if (_enableSnapshot) {
         _updatedSegmentsSinceLastSnapshot.add(segment);
@@ -404,6 +405,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
     try {
       doPreloadSegment((ImmutableSegmentImpl) segment);
+      eraseKeyToPreviousLocationMap();
       _trackedSegments.add(segment);
       _updatedSegmentsSinceLastSnapshot.add(segment);
     } finally {
@@ -571,6 +573,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
     try {
       doReplaceSegment(segment, oldSegment);
+      eraseKeyToPreviousLocationMap();
       if (!(segment instanceof EmptyIndexSegment)) {
         _trackedSegments.add(segment);
         if (_enableSnapshot) {
@@ -610,7 +613,8 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       replaceSegment(segment, null, null, recordInfoIterator, oldSegment);
     } catch (Exception e) {
       throw new RuntimeException(
-          String.format("Caught exception while replacing segment: %s, table: 
%s", segmentName, _tableNameWithType), e);
+          String.format("Caught exception while replacing segment: %s, table: 
%s, message: %s", segmentName,
+              _tableNameWithType, e.getMessage()), e);
     }
 
     // Update metrics
@@ -620,6 +624,8 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
+  private static final int MAX_UPSERT_REVERT_RETRIES = 3;
+
   /**
    * NOTE: We allow passing in validDocIds and queryableDocIds here so that 
the value can be easily accessed from the
    *       tests. The passed in bitmaps should always be empty.
@@ -659,37 +665,94 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
     }
     if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty()) {
-      int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
-      // Add the new metric tracking here
-      if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode() 
== UpsertConfig.ConsistencyMode.NONE) {
-        // For Upsert tables when some of the records get dropped when 
dropOutOfOrderRecord is enabled, we donot
-        // store the original record location when keys are not replaced, this 
can potentially cause inconsistencies
-        // leading to some rows not getting dropped when reconsumed. This can 
be caused when a consuming segment
-        // that is consumed from a different server is replaced with the 
existing segment which consumed rows ahead
-        // of the other server
-        _logger.warn(
-            "Found {} primary keys not replaced when replacing segment: {} for 
upsert table with dropOutOfOrderRecord"
-                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas",
-            numKeysNotReplaced, segmentName);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
-            numKeysNotReplaced);
-      } else if (_partialUpsertHandler != null) {
-        // For partial-upsert table, because we do not restore the original 
record location when removing the primary
-        // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
-        // consuming segment is replaced by a committed segment that is 
consumed from a different server with
-        // different records (some stream consumer cannot guarantee consuming 
the messages in the same order/
-        // when a segment is replaced with lesser consumed rows from the other 
server).
-        _logger.warn("Found {} primary keys not replaced when replacing 
segment: {} for partial-upsert table. This "
-            + "can potentially cause inconsistency between replicas", 
numKeysNotReplaced, segmentName);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
-            numKeysNotReplaced);
+      checkForInconsistencies(segment, validDocIds, queryableDocIds, 
oldSegment, validDocIdsForOldSegment, segmentName);
+      removeSegment(oldSegment, validDocIdsForOldSegment);
+    }
+  }
+
+  void checkForInconsistencies(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
+      MutableRoaringBitmap validDocIdsForOldSegment, String segmentName) {
+    int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+    boolean isConsumingSegmentSeal = !(oldSegment instanceof ImmutableSegment);
+    // For partial-upsert table and upsert table with dropOutOfOrder=true & 
consistencyMode = NONE, we do not store
+    // the previous record location when removing the primary keys not 
replaced, it can potentially cause inconsistency
+    // between replicas. This can happen when a consuming segment is replaced 
by a committed segment that is consumed
+    // from a different server with different records (some stream consumer 
cannot guarantee consuming the messages in
+    // the same order/ when a segment is replaced with lesser consumed rows 
from the other server).
+    if (isConsumingSegmentSeal && _context.isDropOutOfOrderRecord()
+        && _context.getConsistencyMode() == UpsertConfig.ConsistencyMode.NONE) 
{
+      _logger.warn("Found {} primary keys not replaced when sealing consuming 
segment: {} for upsert table with "
+              + "dropOutOfOrderRecord enabled with no consistency mode. This 
can potentially cause inconsistency "
+              + "between replicas. Reverting back metadata changes and 
triggering segment replacement.",
+          numKeysNotReplaced,
+          segmentName);
+      revertSegmentUpsertMetadataWithRetry(segment, validDocIds, 
queryableDocIds, oldSegment, segmentName);
+    } else if (isConsumingSegmentSeal && _partialUpsertHandler != null) {
+      _logger.warn("Found {} primary keys not replaced when sealing consuming 
segment: {} for partial-upsert table. "
+          + "This can potentially cause inconsistency between replicas. "
+          + "Reverting metadata changes and triggering segment replacement.", 
numKeysNotReplaced, segmentName);
+      revertSegmentUpsertMetadataWithRetry(segment, validDocIds, 
queryableDocIds, oldSegment, segmentName);
+    } else {
+      _logger.warn("Found {} primary keys not replaced for the segment: {}.", 
numKeysNotReplaced, segmentName);
+    }
+  }
+
+  /**
+   * Reverts segment upsert metadata and retries addOrReplaceSegment with a 
maximum retry limit to prevent infinite
+   * recursion in case of persistent inconsistencies.
+   */
+  void revertSegmentUpsertMetadataWithRetry(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, 
String segmentName) {
+    for (int retryCount = 0; retryCount < MAX_UPSERT_REVERT_RETRIES; 
retryCount++) {
+      revertCurrentSegmentUpsertMetadata(oldSegment, validDocIds, 
queryableDocIds);
+      MutableRoaringBitmap validDocIdsForOldSegment = 
getValidDocIdsForOldSegment(oldSegment);
+      try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+          _comparisonColumns, _deleteRecordColumn)) {
+        Iterator<RecordInfo> latestRecordInfoIterator =
+            UpsertUtils.getRecordInfoIterator(recordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
+        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
queryableDocIds, latestRecordInfoIterator,
+            oldSegment, validDocIdsForOldSegment);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format("Caught exception while replacing segment metadata 
during inconsistencies: %s, table: %s",
+                segmentName, _tableNameWithType), e);
+      }
+
+      validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
+      if (validDocIdsForOldSegment.isEmpty()) {
+        _logger.info("Successfully resolved inconsistency for segment: {} 
after {} retry attempt(s)", segmentName,
+            retryCount + 1);
+        return;
+      }
+
+      int numKeysStillNotReplaced = validDocIdsForOldSegment.getCardinality();
+      if (retryCount < MAX_UPSERT_REVERT_RETRIES - 1) {
+        _logger.warn("Retry {}/{}: Still found {} primary keys not replaced 
for segment: {}. Retrying...",
+            retryCount + 1, MAX_UPSERT_REVERT_RETRIES, 
numKeysStillNotReplaced, segmentName);
       } else {
-        _logger.info("Found {} primary keys not replaced when replacing 
segment: {}", numKeysNotReplaced, segmentName);
+        _logger.error("Exhausted all {} retries for segment: {}. Found {} 
primary keys still not replaced. "
+                + "Proceeding with current state which may cause 
inconsistency.", MAX_UPSERT_REVERT_RETRIES,
+            segmentName,
+            numKeysStillNotReplaced);
+        if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode() 
== UpsertConfig.ConsistencyMode.NONE) {
+          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
+              numKeysStillNotReplaced);
+        } else if (_partialUpsertHandler != null) {
+          _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
+              numKeysStillNotReplaced);
+        }
       }
-      removeSegment(oldSegment, validDocIdsForOldSegment);
     }
   }
 
+  protected abstract void removeNewlyAddedKeys(IndexSegment oldSegment);
+
+  protected abstract void eraseKeyToPreviousLocationMap();
+
+  protected abstract void revertCurrentSegmentUpsertMetadata(IndexSegment 
oldSegment,
+      ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds);
+
   private MutableRoaringBitmap getValidDocIdsForOldSegment(IndexSegment 
oldSegment) {
     return oldSegment.getValidDocIds() != null ? 
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
   }
@@ -699,8 +762,8 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       removeSegment(segment, 
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
     } catch (Exception e) {
       throw new RuntimeException(
-          String.format("Caught exception while removing segment: %s, table: 
%s", segment.getSegmentName(),
-              _tableNameWithType), e);
+          String.format("Caught exception while removing segment: %s, table: 
%s, message: %s", segment.getSegmentName(),
+              _tableNameWithType, e.getMessage()), e);
     }
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index ad5058c7030..550a20f9e34 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -54,6 +55,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
   @VisibleForTesting
   final ConcurrentHashMap<Object, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
+  private final ConcurrentHashMap<Object, RecordLocation> 
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
+  private final Map<Object, RecordLocation> _newlyAddedKeys = new 
ConcurrentHashMap<>();
+
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, 
int partitionId, UpsertContext context) {
     super(tableNameWithType, partitionId, context);
   }
@@ -89,7 +93,12 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
               if (currentSegment == segment) {
                 if (comparisonResult >= 0) {
                   replaceDocId(segment, validDocIds, queryableDocIds, 
currentDocId, newDocId, recordInfo);
-                  return new RecordLocation(segment, newDocId, 
newComparisonValue);
+                  RecordLocation newRecordLocation = new 
RecordLocation(segment, newDocId, newComparisonValue);
+                  // Track the record location of the newly added keys
+                  if (_newlyAddedKeys.containsKey(primaryKey)) {
+                    _newlyAddedKeys.put(primaryKey, newRecordLocation);
+                  }
+                  return newRecordLocation;
                 } else {
                   return currentRecordLocation;
                 }
@@ -139,6 +148,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
                   currentSegmentName, getAuthoritativeCreationTime(segment),
                   getAuthoritativeCreationTime(currentSegment)))) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
+                if (currentSegment != segment) {
+                  _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
+                }
                 return new RecordLocation(segment, newDocId, 
newComparisonValue);
               } else {
                 return currentRecordLocation;
@@ -146,7 +158,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
             } else {
               // New primary key
               addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
-              return new RecordLocation(segment, newDocId, newComparisonValue);
+              RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue);
+              _newlyAddedKeys.put(primaryKey, newRecordLocation);
+              return newRecordLocation;
             }
           });
     }
@@ -245,6 +259,57 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
     }
   }
 
+  @Override
+  protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds) {
+    // Revert to previous locations present in other segment
+    // Replace the valid doc id to that segment location
+    _logger.info("Reverting Upsert metadata for {} keys for the segment: {}", 
_previousKeyToRecordLocationMap.size(),
+        oldSegment.getSegmentName());
+    for (Map.Entry<Object, RecordLocation> obj : 
_previousKeyToRecordLocationMap.entrySet()) {
+      IndexSegment prevSegment = obj.getValue().getSegment();
+      if (prevSegment != null) {
+        try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(prevSegment,
+            _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) {
+          int newDocId = obj.getValue().getDocId();
+          int currentDocId = 
_primaryKeyToRecordLocationMap.get(obj.getKey()).getDocId();
+          RecordInfo recordInfo = recordInfoReader.getRecordInfo(newDocId);
+          replaceDocId(prevSegment, prevSegment.getValidDocIds(), 
prevSegment.getQueryableDocIds(), oldSegment,
+              currentDocId, newDocId, recordInfo);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        _primaryKeyToRecordLocationMap.put(obj.getKey(), obj.getValue());
+      } else {
+        _primaryKeyToRecordLocationMap.remove(obj.getKey());
+      }
+    }
+    _logger.info("Reverted Upsert metadata of {} keys to previous segment 
locations for the segment: {}",
+        _previousKeyToRecordLocationMap.size(), oldSegment.getSegmentName());
+    removeNewlyAddedKeys(oldSegment);
+  }
+
+  @Override
+  protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
+    // Remove the newly added keys in the metadata map and in the valid doc ids
+    int removedKeys = 0;
+    for (Map.Entry<Object, RecordLocation> entry : _newlyAddedKeys.entrySet()) 
{
+      if (entry.getValue().getSegment() == oldSegment) {
+        _primaryKeyToRecordLocationMap.remove(entry.getKey());
+        removeDocId(oldSegment, entry.getValue().getDocId());
+        removedKeys++;
+      }
+    }
+    _logger.info("Removed newly added {} keys for the segment: {} out of : 
{}", removedKeys,
+        oldSegment.getSegmentName(), _previousKeyToRecordLocationMap.size());
+  }
+
+  @Override
+  protected void eraseKeyToPreviousLocationMap() {
+    _previousKeyToRecordLocationMap.clear();
+    _newlyAddedKeys.clear();
+  }
+
   @Override
   protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) 
{
     AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
@@ -263,19 +328,27 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
         (primaryKey, currentRecordLocation) -> {
           if (currentRecordLocation != null) {
             // Existing primary key
-
+            IndexSegment currentSegment = currentRecordLocation.getSegment();
             // Update the record location when the new comparison value is 
greater than or equal to the current value.
             // Update the record location when there is a tie to keep the 
newer record.
             if 
(newComparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) 
{
-              IndexSegment currentSegment = currentRecordLocation.getSegment();
               int currentDocId = currentRecordLocation.getDocId();
+              RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue);
               if (segment == currentSegment) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentDocId, newDocId, recordInfo);
+                // Track the record location of the newly added keys
+                if (_newlyAddedKeys.containsKey(primaryKey)) {
+                  _newlyAddedKeys.put(primaryKey, newRecordLocation);
+                }
               } else {
+                _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
               }
-              return new RecordLocation(segment, newDocId, newComparisonValue);
+              return newRecordLocation;
             } else {
+              if (segment != currentSegment) {
+                _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
+              }
               // Out-of-order record
               
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), 
recordInfo.getComparisonValue());
               isOutOfOrderRecord.set(true);
@@ -284,7 +357,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
           } else {
             // New primary key
             addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
-            return new RecordLocation(segment, newDocId, newComparisonValue);
+            RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue);
+            _newlyAddedKeys.put(primaryKey, newRecordLocation);
+            return newRecordLocation;
           }
         });
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 9af3ec6c234..beb63cfa797 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -67,6 +68,14 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
   @VisibleForTesting
   final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
       _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  private final ConcurrentHashMap<Object,
+      
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
+      _previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  private final Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
+      _newlyAddedKeys = new ConcurrentHashMap<>();
+
   // Used to initialize a reference to previous row for merging in partial 
upsert
   private final LazyRow _reusePreviousRow = new LazyRow();
   private final Map<String, Object> _reuseMergeResultHolder = new HashMap<>();
@@ -115,8 +124,13 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
               if (currentSegment == segment) {
                 if (comparisonResult >= 0) {
                   replaceDocId(segment, validDocIds, queryableDocIds, 
currentDocId, newDocId, recordInfo);
-                  return new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation(segment,
+                  RecordLocation newRecordLocation = new 
RecordLocation(segment,
                       newDocId, newComparisonValue, 
currentDistinctSegmentCount);
+                  // Track the record location of the newly added keys
+                  if (_newlyAddedKeys.containsKey(primaryKey)) {
+                    _newlyAddedKeys.put(primaryKey, newRecordLocation);
+                  }
+                  return newRecordLocation;
                 } else {
                   return currentRecordLocation;
                 }
@@ -178,7 +192,9 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
             } else {
               // New primary key
               addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
-              return new RecordLocation(segment, newDocId, newComparisonValue, 
1);
+              RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue, 1);
+              _newlyAddedKeys.put(primaryKey, newRecordLocation);
+              return newRecordLocation;
             }
           });
     }
@@ -241,16 +257,9 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
         addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
queryableDocIds, recordInfoIterator,
             oldSegment, validDocIdsForOldSegment);
       }
-      if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty() && _partialUpsertHandler != null) {
-        int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
-        // For partial-upsert table, because we do not restore the original 
record location when removing the primary
-        // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
-        // consuming segment is replaced by a committed segment that is 
consumed from a different server with
-        // different records (some stream consumer cannot guarantee consuming 
the messages in the same order).
-        _logger.warn("Found {} primary keys not replaced when replacing 
segment: {} for partial-upsert table. This "
-            + "can potentially cause inconsistency between replicas", 
numKeysNotReplaced, segmentName);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
-            numKeysNotReplaced);
+      if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty()) {
+        checkForInconsistencies(segment, validDocIds, queryableDocIds, 
oldSegment, validDocIdsForOldSegment,
+            segmentName);
       }
       // we want to always remove a segment in case of 
enableDeletedKeysCompactionConsistency = true
       // this is to account for the removal of primary-key in the 
to-be-removed segment and reduce
@@ -261,6 +270,20 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
     }
   }
 
+  @Override
+  protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
+    int removedKeys = 0;
+    for (Map.Entry<Object, RecordLocation> entry : _newlyAddedKeys.entrySet()) 
{
+      if (entry.getValue().getSegment() == oldSegment) {
+        _primaryKeyToRecordLocationMap.remove(entry.getKey());
+        removeDocId(oldSegment, entry.getValue().getDocId());
+        removedKeys++;
+      }
+    }
+    _logger.info("Removed newly added {} keys for the segment: {} out of : 
{}", removedKeys,
+        oldSegment.getSegmentName(), _previousKeyToRecordLocationMap.size());
+  }
+
   @Override
   protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
     // We need to decrease the distinctSegmentCount for each unique primary 
key in this deleting segment by 1
@@ -345,6 +368,42 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
     }
   }
 
+  @Override
+  protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds) {
+    _logger.info("Reverting Upsert metadata for {} keys", 
_previousKeyToRecordLocationMap.size());
+    // Revert to previous locations present in other segment
+    for (Map.Entry<Object, RecordLocation> obj : 
_previousKeyToRecordLocationMap.entrySet()) {
+      IndexSegment prevSegment = obj.getValue().getSegment();
+      if (prevSegment != null) {
+        try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(prevSegment,
+            _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) {
+          int newDocId = obj.getValue().getDocId();
+          int currentDocId = 
_primaryKeyToRecordLocationMap.get(obj.getKey()).getDocId();
+          RecordInfo recordInfo = recordInfoReader.getRecordInfo(newDocId);
+          // Update valid docId to the other segment location
+          replaceDocId(prevSegment, prevSegment.getValidDocIds(), 
prevSegment.getQueryableDocIds(), oldSegment,
+              currentDocId, newDocId, recordInfo);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        _primaryKeyToRecordLocationMap.put(obj.getKey(), obj.getValue());
+      } else {
+        _primaryKeyToRecordLocationMap.remove(obj.getKey());
+      }
+    }
+    _logger.info("Reverted Upsert metadata of {} keys to previous segment 
locations for the segment: {}",
+        _previousKeyToRecordLocationMap.size(), oldSegment.getSegmentName());
+    // For the newly added keys into the segment, remove the pk and valid doc 
id
+    removeNewlyAddedKeys(oldSegment);
+  }
+
+  @Override
+  protected void eraseKeyToPreviousLocationMap() {
+    _previousKeyToRecordLocationMap.clear();
+    _newlyAddedKeys.clear();
+  }
+
   @Override
   protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) 
{
     AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
@@ -370,9 +429,15 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
               int currentDocId = currentRecordLocation.getDocId();
               if (segment == currentSegment) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentDocId, newDocId, recordInfo);
-                return new RecordLocation(segment, newDocId, 
newComparisonValue,
+                RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue,
                     currentRecordLocation.getDistinctSegmentCount());
+                // Track the record location of the newly added keys
+                if (_newlyAddedKeys.containsKey(primaryKey)) {
+                  _newlyAddedKeys.put(primaryKey, newRecordLocation);
+                }
+                return newRecordLocation;
               } else {
+                _previousKeyToRecordLocationMap.put(primaryKey, 
currentRecordLocation);
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
                 return new RecordLocation(segment, newDocId, 
newComparisonValue,
                     
RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount()));
@@ -393,7 +458,9 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
           } else {
             // New primary key
             addDocId(segment, validDocIds, queryableDocIds, newDocId, 
recordInfo);
-            return new RecordLocation(segment, newDocId, newComparisonValue, 
1);
+            RecordLocation newRecordLocation = new RecordLocation(segment, 
newDocId, newComparisonValue, 1);
+            _newlyAddedKeys.put(primaryKey, newRecordLocation);
+            return newRecordLocation;
           }
         });
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index a18e4904317..c07d3e41cd1 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -1050,6 +1050,10 @@ public class BasePartitionUpsertMetadataManagerTest {
       return false;
     }
 
+    @Override
+    protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
+    }
+
     @Override
     protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
     }
@@ -1062,5 +1066,14 @@ public class BasePartitionUpsertMetadataManagerTest {
     @Override
     protected void doRemoveExpiredPrimaryKeys() {
     }
+
+    @Override
+    protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
+        ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds) {
+    }
+
+    @Override
+    protected void eraseKeyToPreviousLocationMap() {
+    }
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
index 4c420665bf9..350154fc852 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
@@ -50,6 +50,7 @@ import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
@@ -166,6 +167,37 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     return segment;
   }
 
+  private static MutableSegment mockMutableSegmentWithDataSource(int 
sequenceNumber,
+      ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds,
+      int[] primaryKeys) {
+    MutableSegment segment = mock(MutableSegment.class);
+    when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
+    when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+    when(segment.getValidDocIds()).thenReturn(validDocIds);
+
+    DataSource dataSource = mock(DataSource.class);
+    ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
+    when(forwardIndex.isSingleValue()).thenReturn(true);
+    when(forwardIndex.getStoredType()).thenReturn(FieldSpec.DataType.INT);
+    when(forwardIndex.getInt(anyInt(), any())).thenAnswer(invocation -> {
+      int docId = invocation.getArgument(0);
+      if (primaryKeys != null && docId < primaryKeys.length) {
+        return primaryKeys[docId];
+      }
+      return docId;
+    });
+    when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+    when(segment.getDataSource(anyString())).thenReturn(dataSource);
+    
when(segment.getDataSource(PRIMARY_KEY_COLUMNS.get(0))).thenReturn(dataSource);
+
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys != null ? 
primaryKeys.length : 0);
+    when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+
+    return segment;
+  }
+
   private static String getSegmentName(int sequenceNumber) {
     return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, 
System.currentTimeMillis()).toString();
   }
@@ -1229,4 +1261,84 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     assertEquals(BytesUtils.toHexString(((ByteArray) 
HashUtils.hashPrimaryKey(pk, HashFunction.MURMUR3)).getBytes()),
         "37fab5ef0ea39711feabcdc623cb8a4e");
   }
+
+  @Test
+  public void testRevertOnlyAppliesForConsumingSegmentSeal()
+      throws IOException {
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0, upsertContext);
+
+    int[] mutablePrimaryKeys = new int[]{10, 20, 30};
+    ThreadSafeMutableRoaringBitmap validDocIdsMutable = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1, 
validDocIdsMutable, null, mutablePrimaryKeys);
+
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(10), 0, 1000, false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(20), 1, 2000, false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(30), 2, 3000, false));
+
+    Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    
assertEquals(validDocIdsMutable.getMutableRoaringBitmap().getCardinality(), 3);
+
+    int numRecords = 2;
+    int[] primaryKeys = new int[]{10, 20};
+    int[] timestamps = new int[]{1500, 2500};
+    ThreadSafeMutableRoaringBitmap validDocIdsImmutable = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeysList = getPrimaryKeyList(numRecords, 
primaryKeys);
+    ImmutableSegmentImpl immutableSegment = mockImmutableSegment(1, 
validDocIdsImmutable, null, primaryKeysList);
+
+    upsertMetadataManager.replaceSegment(immutableSegment, 
validDocIdsImmutable, null,
+        getRecordInfoList(numRecords, primaryKeys, timestamps, 
null).iterator(), mutableSegment);
+
+    assertEquals(recordLocationMap.size(), 2);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testNoRevertForImmutableSegmentReplacement()
+      throws IOException {
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
+        new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0, upsertContext);
+
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{10, 20, 30};
+    int[] timestamps1 = new int[]{1000, 2000, 3000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, 
null, primaryKeysList1);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+        getRecordInfoList(numRecords1, primaryKeys1, timestamps1, 
null).iterator());
+    Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+
+    int numRecords2 = 1;
+    int[] primaryKeys2 = new int[]{10};
+    int[] timestamps2 = new int[]{1500};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegment(1, validDocIds2, 
null, primaryKeysList2);
+
+    long startTime = System.currentTimeMillis();
+    upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+        getRecordInfoList(numRecords2, primaryKeys2, timestamps2, 
null).iterator(), segment1);
+    long duration = System.currentTimeMillis() - startTime;
+
+    assertTrue(duration < 1000, "Immutable-to-immutable replacement should 
complete quickly, took: " + duration + "ms");
+
+    assertEquals(recordLocationMap.size(), 1);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 684e9ba9c77..9e3beaf4bbb 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -38,6 +38,9 @@ import 
org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import 
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
 import org.apache.pinot.segment.local.utils.HashUtils;
@@ -46,17 +49,25 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.mockito.MockedConstruction;
@@ -69,6 +80,7 @@ import org.testng.annotations.Test;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.when;
@@ -83,18 +95,38 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
   private static final String DELETE_RECORD_COLUMN = "deleteCol";
   private static final File INDEX_DIR =
       new File(FileUtils.getTempDirectory(), 
"ConcurrentMapPartitionUpsertMetadataManagerTest");
+  private static final File SEGMENT_DIR = new File(INDEX_DIR, "segments");
+
+  private static final int MOCK_FALLBACK_BASE_OFFSET = 1000;
+
+  // Schema and TableConfig for creating real segments
+  private static final Schema SEGMENT_SCHEMA = new Schema.SchemaBuilder()
+      .addSingleValueDimension(PRIMARY_KEY_COLUMNS.get(0), 
FieldSpec.DataType.INT)
+      .addMetric(COMPARISON_COLUMNS.get(0), FieldSpec.DataType.INT)
+      .setPrimaryKeyColumns(PRIMARY_KEY_COLUMNS)
+      .build();
+  private static final TableConfig SEGMENT_TABLE_CONFIG = new 
TableConfigBuilder(TableType.REALTIME)
+      .setTableName(RAW_TABLE_NAME)
+      .build();
 
   private UpsertContext.Builder _contextBuilder;
+  private int _segmentCounter = 0;
 
   @BeforeClass
   public void setUp()
       throws IOException {
     FileUtils.forceMkdir(INDEX_DIR);
+    FileUtils.forceMkdir(SEGMENT_DIR);
     ServerMetrics.register(mock(ServerMetrics.class));
   }
 
   @BeforeMethod
-  public void setUpContextBuilder() {
+  public void setUpContextBuilder()
+      throws IOException {
+    // Clean up segment directory between tests
+    FileUtils.cleanDirectory(SEGMENT_DIR);
+    _segmentCounter = 0;
+
     TableDataManager tableDataManager = mock(TableDataManager.class);
     when(tableDataManager.getTableDataDir()).thenReturn(INDEX_DIR);
     _contextBuilder = new UpsertContext.Builder()
@@ -784,6 +816,17 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     return recordInfoList;
   }
 
+  // Helper method for new reversion tests that need Integer comparison values
+  private List<RecordInfo> getRecordInfoListWithIntegerComparison(int 
numRecords, int[] primaryKeys, int[] timestamps,
+      @Nullable boolean[] deleteRecordFlags) {
+    List<RecordInfo> recordInfoList = new ArrayList<>();
+    for (int i = 0; i < numRecords; i++) {
+      recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, 
timestamps[i],
+          deleteRecordFlags != null && deleteRecordFlags[i]));
+    }
+    return recordInfoList;
+  }
+
   /**
    * Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True).
    */
@@ -808,26 +851,127 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
   private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
       ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
       List<PrimaryKey> primaryKeys) {
+    return mockImmutableSegmentWithTimestamps(sequenceNumber, validDocIds, 
queryableDocIds, primaryKeys, null);
+  }
+
+  private static ImmutableSegmentImpl mockImmutableSegmentWithTimestamps(int 
sequenceNumber,
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys, @Nullable int[] timestamps) {
     ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
     when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
     when(segment.getValidDocIds()).thenReturn(validDocIds);
     when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
-    DataSource dataSource = mock(DataSource.class);
-    when(segment.getDataSource(anyString())).thenReturn(dataSource);
-    ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
-    when(forwardIndex.isSingleValue()).thenReturn(true);
-    when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
-    when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
-        invocation -> 
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
-    when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+    // Enhanced mocking for RecordInfoReader to work properly
+    // Mock primary key column data source
+    DataSource primaryKeyDataSource = mock(DataSource.class);
+    ForwardIndexReader primaryKeyForwardIndex = mock(ForwardIndexReader.class);
+    when(primaryKeyForwardIndex.isSingleValue()).thenReturn(true);
+    when(primaryKeyForwardIndex.getStoredType()).thenReturn(DataType.INT);
+    when(primaryKeyForwardIndex.createContext()).thenReturn(null);
+    when(primaryKeyForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation 
-> {
+      int docId = invocation.getArgument(0);
+      if (primaryKeys != null && docId < primaryKeys.size()) {
+        return (Integer) primaryKeys.get(docId).getValues()[0];
+      }
+      return MOCK_FALLBACK_BASE_OFFSET + docId;
+    });
+    
when(primaryKeyDataSource.getForwardIndex()).thenReturn(primaryKeyForwardIndex);
+
+    // Mock comparison column data source
+    DataSource comparisonDataSource = mock(DataSource.class);
+    ForwardIndexReader comparisonForwardIndex = mock(ForwardIndexReader.class);
+    when(comparisonForwardIndex.isSingleValue()).thenReturn(true);
+    when(comparisonForwardIndex.getStoredType()).thenReturn(DataType.INT);
+    when(comparisonForwardIndex.createContext()).thenReturn(null);
+    when(comparisonForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation 
-> {
+      int docId = invocation.getArgument(0);
+      // Return actual timestamp values if provided, otherwise default values
+      if (timestamps != null && docId < timestamps.length) {
+        return timestamps[docId];
+      }
+      return MOCK_FALLBACK_BASE_OFFSET + (docId * 100);
+    });
+    
when(comparisonDataSource.getForwardIndex()).thenReturn(comparisonForwardIndex);
+
+    // Set up data source mapping - IMPORTANT: anyString() must be registered 
FIRST,
+    // then specific matchers override it (Mockito uses last matching stub)
+    when(segment.getDataSource(anyString())).thenReturn(primaryKeyDataSource); 
// Default fallback first
+    
when(segment.getDataSource(eq(PRIMARY_KEY_COLUMNS.get(0)))).thenReturn(primaryKeyDataSource);
+    
when(segment.getDataSource(eq(COMPARISON_COLUMNS.get(0)))).thenReturn(comparisonDataSource);
+
+    // Mock segment metadata with proper total docs and column metadata
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
     long creationTimeMs = System.currentTimeMillis();
     when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
     when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs);
+    when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys != null ? 
primaryKeys.size() : 0);
+
+    // Mock column metadata for primary key and comparison columns
+    TreeMap<String, ColumnMetadata> columnMetadataMap = new TreeMap<>();
+    ColumnMetadata primaryKeyColumnMetadata = mock(ColumnMetadata.class);
+    when(primaryKeyColumnMetadata.getFieldSpec()).thenReturn(
+        new DimensionFieldSpec(PRIMARY_KEY_COLUMNS.get(0), DataType.INT, 
true));
+    ColumnMetadata comparisonColumnMetadata = mock(ColumnMetadata.class);
+    when(comparisonColumnMetadata.getFieldSpec()).thenReturn(
+        new DimensionFieldSpec(COMPARISON_COLUMNS.get(0), DataType.INT, true));
+    columnMetadataMap.put(PRIMARY_KEY_COLUMNS.get(0), 
primaryKeyColumnMetadata);
+    columnMetadataMap.put(COMPARISON_COLUMNS.get(0), comparisonColumnMetadata);
+    when(segmentMetadata.getColumnMetadataMap()).thenReturn(columnMetadataMap);
+
     when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
     return segment;
   }
 
+  /**
+   * Creates a real ImmutableSegment with actual data on disk.
+   * This avoids the complexity of mocking data sources for RecordInfoReader.
+   *
+   * @param primaryKeys array of primary key values
+   * @param timestamps array of timestamp/comparison values
+   * @param validDocIds bitmap to track valid doc IDs (will be populated)
+   * @return a real ImmutableSegmentImpl that can be read by RecordInfoReader
+   */
+  private ImmutableSegmentImpl createRealSegment(int[] primaryKeys, int[] 
timestamps,
+      ThreadSafeMutableRoaringBitmap validDocIds)
+      throws Exception {
+    return createRealSegment("segment_" + (_segmentCounter++), primaryKeys, 
timestamps, validDocIds);
+  }
+
+  private ImmutableSegmentImpl createRealSegment(String segmentName, int[] 
primaryKeys, int[] timestamps,
+      ThreadSafeMutableRoaringBitmap validDocIds)
+      throws Exception {
+    File segmentOutputDir = new File(SEGMENT_DIR, segmentName);
+
+    // Create rows with primary key and timestamp data
+    List<GenericRow> rows = new ArrayList<>();
+    for (int i = 0; i < primaryKeys.length; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue(PRIMARY_KEY_COLUMNS.get(0), primaryKeys[i]);
+      row.putValue(COMPARISON_COLUMNS.get(0), timestamps[i]);
+      rows.add(row);
+      validDocIds.add(i);
+    }
+
+    // Configure segment generation
+    SegmentGeneratorConfig config = new 
SegmentGeneratorConfig(SEGMENT_TABLE_CONFIG, SEGMENT_SCHEMA);
+    config.setOutDir(segmentOutputDir.getAbsolutePath());
+    config.setSegmentName(segmentName);
+    config.setTableName(RAW_TABLE_NAME);
+
+    // Build the segment
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config, new GenericRowRecordReader(rows));
+    driver.build();
+
+    // Load and return the segment
+    File segmentDir = new File(segmentOutputDir, segmentName);
+    ImmutableSegmentImpl segment =
+        (ImmutableSegmentImpl) ImmutableSegmentLoader.load(segmentDir, 
ReadMode.mmap);
+
+    return segment;
+  }
+
   private static ImmutableSegmentImpl mockUploadedImmutableSegment(String 
suffix,
       ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
       List<PrimaryKey> primaryKeys, Long creationTimeMs) {
@@ -897,6 +1041,33 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     return segment;
   }
 
+  private static MutableSegment mockMutableSegmentWithDataSource(int 
sequenceNumber,
+      ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds,
+      int[] primaryKeys) {
+    MutableSegment segment = mock(MutableSegment.class);
+    when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
+    when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+    when(segment.getValidDocIds()).thenReturn(validDocIds);
+
+    DataSource dataSource = mock(DataSource.class);
+    ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
+    when(forwardIndex.isSingleValue()).thenReturn(true);
+    when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
+    when(forwardIndex.getInt(anyInt(), any())).thenAnswer(invocation -> {
+      int docId = invocation.getArgument(0);
+      if (primaryKeys != null && docId < primaryKeys.length) {
+        return primaryKeys[docId];
+      }
+      return docId;
+    });
+    when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+    when(segment.getDataSource(anyString())).thenReturn(dataSource);
+    
when(segment.getDataSource(PRIMARY_KEY_COLUMNS.get(0))).thenReturn(dataSource);
+
+    return segment;
+  }
+
   private static String getSegmentName(int sequenceNumber) {
     return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, 
System.currentTimeMillis()).toString();
   }
@@ -916,7 +1087,15 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertNotNull(recordLocation);
     assertSame(recordLocation.getSegment(), segment);
     assertEquals(recordLocation.getDocId(), docId);
-    assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, 
comparisonValue);
+    // Handle both IntWrapper and Integer comparison values
+    Object actualComparisonValue = recordLocation.getComparisonValue();
+    if (actualComparisonValue instanceof IntWrapper) {
+      assertEquals(((IntWrapper) actualComparisonValue)._value, 
comparisonValue);
+    } else if (actualComparisonValue instanceof Integer) {
+      assertEquals(((Integer) actualComparisonValue).intValue(), 
comparisonValue);
+    } else {
+      fail("Unexpected comparison value type: " + 
actualComparisonValue.getClass());
+    }
   }
 
   @Test
@@ -1724,4 +1903,414 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
       return _value;
     }
   }
+
+  // Tests for upsert metadata reversion functionality
+  @Test
+  public void testPartialUpsertSameDocsReplacement() throws IOException {
+    // Test partial upserts with old and new segments having same number of 
docs
+    // This test verifies that when all keys are present, no reversion occurs
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{1, 2, 3};
+    int[] timestamps1 = new int[]{100, 200, 300};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1, 
validDocIds1, null,
+        primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 = 
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+        timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 1, segment1, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 1, 200, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment1, 2, 300, 
HashFunction.NONE);
+
+    // Create new segment with same 3 records but updated timestamps
+    int numRecords2 = 3;
+    int[] primaryKeys2 = new int[]{1, 2, 3};
+    int[] timestamps2 = new int[]{150, 250, 350};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+    List<RecordInfo> recordInfoList2 = 
getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
+        timestamps2, null);
+
+    // Replace segment - should trigger reversion logic but no reversion 
needed since all keys are present
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    // Verify replacement - all records should be in new segment
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 250, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 350, 
HashFunction.NONE);
+
+    // New segment should have all docs valid
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 3);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testPartialUpsertOldSegmentTriggerReversion() throws IOException 
{
+    // Test partial upserts with consuming (mutable) segment being sealed - 
revert should be triggered
+    // Note: Revert logic only applies when sealing a consuming segment, not 
for immutable segment replacement
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    // Create a mutable (consuming) segment with 4 records - use 
mockMutableSegmentWithDataSource
+    // to support removeSegment which needs to read primary keys
+    int[] mutablePrimaryKeys = new int[]{1, 2, 3, 4};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1, 
validDocIds1, null, mutablePrimaryKeys);
+
+    // Add records to the mutable segment
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(1), 0, 100, false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(2), 1, 200, false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(3), 2, 300, false));
+    upsertMetadataManager.addRecord(mutableSegment, new 
RecordInfo(makePrimaryKey(4), 3, 400, false));
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 4);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 4);
+
+    int numRecords2 = 2;
+    int[] primaryKeys2 = new int[]{1, 3};
+    int[] timestamps2 = new int[]{150, 350};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+
+    // Replace mutable with immutable (consuming segment seal) - revert SHOULD 
be triggered
+    upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+        getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2, 
timestamps2, null).iterator(),
+        mutableSegment);
+
+    assertEquals(recordLocationMap.size(), 2);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
+    checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 1, 350, 
HashFunction.NONE);
+
+    // Mutable segment's validDocIds should be 0 after removal
+    assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 0);
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testPartialUpsertOldSegmentLesserDocs() throws IOException {
+    // Test partial upserts with old segment having fewer docs than new segment
+    PartialUpsertHandler mockPartialUpsertHandler = 
mock(PartialUpsertHandler.class);
+    UpsertContext upsertContext = 
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int numRecords1 = 2;
+    int[] primaryKeys1 = new int[]{1, 2};
+    int[] timestamps1 = new int[]{100, 200};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithTimestamps(1, validDocIds1, null, 
primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 =
+        getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1, 
timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 2);
+
+    int numRecords2 = 4;
+    int[] primaryKeys2 = new int[]{1, 2, 3, 4};
+    int[] timestamps2 = new int[]{150, 250, 300, 400};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    // Verify state after replacement - all records should be in new segment
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 1, segment2, 0, 150, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 250, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 300, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 4, segment2, 3, 400, 
HashFunction.NONE);
+
+    // New segment should have all docs valid
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 4);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testFullUpsertConsistencyNoneSameDocs() throws IOException {
+    // Test full upserts with consistency=NONE and same number of docs
+    UpsertContext upsertContext = _contextBuilder
+        .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE)
+        .setDropOutOfOrderRecord(true)
+        .build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    // Create old segment with 3 records
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{10, 20, 30};
+    int[] timestamps1 = new int[]{1000, 2000, 3000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1, 
validDocIds1, null,
+        primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 = 
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+        timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+
+    int numRecords2 = 3;
+    int[] primaryKeys2 = new int[]{10, 20, 30};
+    int[] timestamps2 = new int[]{1500, 2500, 3500};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 20, segment2, 1, 2500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 30, segment2, 2, 3500, 
HashFunction.NONE);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 3);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testFullUpsertConsistencyNoneOldSegmentMoreDocs()
+      throws Exception {
+    // Test full upserts with consistency=NONE where old segment has more docs
+    // Using real segments instead of mocks to avoid complex data source 
mocking
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    String segmentName = "test_segment";
+
+    // Create first real segment with 3 records
+    int[] primaryKeys1 = new int[]{10, 30, 40};
+    int[] timestamps1 = new int[]{1500, 3500, 4000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 = createRealSegment(segmentName, 
primaryKeys1, timestamps1, validDocIds1);
+
+    upsertMetadataManager.addSegment(segment1);
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+
+    // Create second real segment with 2 records (subset of first)
+    int[] primaryKeys2 = new int[]{10, 30};
+    int[] timestamps2 = new int[]{1500, 3500};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment2 = createRealSegment(segmentName, 
primaryKeys2, timestamps2, validDocIds2);
+
+    // Replace segment - RecordInfoReader will read real data from segment2
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    assertEquals(recordLocationMap.size(), 2);
+    checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 30, segment2, 1, 3500, 
HashFunction.NONE);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+
+    // Clean up real segments
+    segment1.destroy();
+    segment2.destroy();
+  }
+
+  @Test
+  public void testFullUpsertRegularConsistencyMode()
+      throws IOException {
+    // Test full upserts with regular consistency mode (not NONE) - no 
reversion should occur
+    UpsertContext upsertContext = 
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.SNAPSHOT).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{100, 200, 300};
+    int[] timestamps1 = new int[]{10000, 20000, 30000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithTimestamps(1, validDocIds1, null, 
primaryKeysList1, timestamps1);
+    List<RecordInfo> recordInfoList1 =
+        getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1, 
timestamps1, null);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    int numRecords2 = 1;
+    int[] primaryKeys2 = new int[]{100};
+    int[] timestamps2 = new int[]{15000};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegmentWithTimestamps(1, validDocIds2, null, 
primaryKeysList2, timestamps2);
+
+    upsertMetadataManager.replaceSegment(segment2, segment1);
+
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 100, segment2, 0, 15000, 
HashFunction.NONE);
+
+    
assertEquals(segment1.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 2);
+    
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
 1);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testRevertOnlyAppliesForConsumingSegmentSeal()
+      throws IOException {
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    int[] mutablePrimaryKeys = new int[]{10, 20, 30};
+    ThreadSafeMutableRoaringBitmap validDocIdsMutable = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1, 
validDocIdsMutable, null, mutablePrimaryKeys);
+
+    upsertMetadataManager.addRecord(mutableSegment,
+        new RecordInfo(makePrimaryKey(10), 0, Integer.valueOf(1000), false));
+    upsertMetadataManager.addRecord(mutableSegment,
+        new RecordInfo(makePrimaryKey(20), 1, Integer.valueOf(2000), false));
+    upsertMetadataManager.addRecord(mutableSegment,
+        new RecordInfo(makePrimaryKey(30), 2, Integer.valueOf(3000), false));
+
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+    
assertEquals(validDocIdsMutable.getMutableRoaringBitmap().getCardinality(), 3);
+
+    int numRecords = 2;
+    int[] primaryKeys = new int[]{10, 20};
+    int[] timestamps = new int[]{1500, 2500};
+    ThreadSafeMutableRoaringBitmap validDocIdsImmutable = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeysList = getPrimaryKeyList(numRecords, 
primaryKeys);
+    ImmutableSegmentImpl immutableSegment = 
mockImmutableSegmentWithTimestamps(1, validDocIdsImmutable, null,
+        primaryKeysList, timestamps);
+
+    // This should trigger the revert logic since old segment is mutable
+    upsertMetadataManager.replaceSegment(immutableSegment, 
validDocIdsImmutable, null,
+        getRecordInfoListWithIntegerComparison(numRecords, primaryKeys, 
timestamps, null).iterator(), mutableSegment);
+
+    // After replacement, the records from immutable segment should be present
+    assertEquals(recordLocationMap.size(), 2);
+    checkRecordLocation(recordLocationMap, 10, immutableSegment, 0, 1500, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 20, immutableSegment, 1, 2500, 
HashFunction.NONE);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testNoRevertForImmutableSegmentReplacement()
+      throws IOException {
+    // Test that revert logic is NOT applied when replacing immutable segment 
with another immutable segment
+    UpsertContext upsertContext =
+        
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, upsertContext);
+
+    // Create first immutable segment with 3 records
+    int numRecords1 = 3;
+    int[] primaryKeys1 = new int[]{10, 20, 30};
+    int[] timestamps1 = new int[]{1000, 2000, 3000};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords1; i++) {
+      validDocIds1.add(i);
+    }
+    List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1, 
primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1, 
validDocIds1, null,
+        primaryKeysList1, timestamps1);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+        getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1, 
timestamps1, null).iterator());
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    assertEquals(recordLocationMap.size(), 3);
+
+    int numRecords2 = 1;
+    int[] primaryKeys2 = new int[]{10};
+    int[] timestamps2 = new int[]{1500};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    for (int i = 0; i < numRecords2; i++) {
+      validDocIds2.add(i);
+    }
+    List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2, 
primaryKeys2);
+    ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1, 
validDocIds2, null,
+        primaryKeysList2, timestamps2);
+
+    long startTime = System.currentTimeMillis();
+    upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+        getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2, 
timestamps2, null).iterator(), segment1);
+    long duration = System.currentTimeMillis() - startTime;
+
+    assertTrue(duration < 1000, "Immutable-to-immutable replacement should 
complete quickly, took: " + duration + "ms");
+
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500, 
HashFunction.NONE);
+
+    upsertMetadataManager.stop();
+    upsertMetadataManager.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to