Jackie-Jiang commented on code in PR #17324:
URL: https://github.com/apache/pinot/pull/17324#discussion_r2615975742


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java:
##########
@@ -67,6 +68,12 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
   @VisibleForTesting
   final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
       _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
+      _previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  final Set<Object> _newlyAddedKeys = new HashSet<>();

Review Comment:
   This seems never updated



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -669,27 +673,59 @@ public void replaceSegment(ImmutableSegment segment, 
@Nullable ThreadSafeMutable
         // of the other server
         _logger.warn(
             "Found {} primary keys not replaced when replacing segment: {} for 
upsert table with dropOutOfOrderRecord"
-                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas",
-            numKeysNotReplaced, segmentName);
+                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas. "
+                + "Reverting metadata changes and triggering segment 
replacement.", numKeysNotReplaced, segmentName);
         _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
             numKeysNotReplaced);
+        // Revert consuming segment pks to previous segment locations and 
perform metadata replacement again
+        revertSegmentUpsertMetadata(segment, validDocIds, queryableDocIds, 
oldSegment, segmentName,
+            validDocIdsForOldSegment);
       } else if (_partialUpsertHandler != null) {
         // For partial-upsert table, because we do not restore the original 
record location when removing the primary
         // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
         // consuming segment is replaced by a committed segment that is 
consumed from a different server with
         // different records (some stream consumer cannot guarantee consuming 
the messages in the same order/
         // when a segment is replaced with lesser consumed rows from the other 
server).
         _logger.warn("Found {} primary keys not replaced when replacing 
segment: {} for partial-upsert table. This "
-            + "can potentially cause inconsistency between replicas", 
numKeysNotReplaced, segmentName);
+            + "can potentially cause inconsistency between replicas. "
+            + "Reverting metadata changes and triggering segment 
replacement.", numKeysNotReplaced, segmentName);
         _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
             numKeysNotReplaced);
+        // Revert consuming segment pks to previous segment locations and 
perform metadata replacement again
+        revertSegmentUpsertMetadata(segment, validDocIds, queryableDocIds, 
oldSegment, segmentName,
+            validDocIdsForOldSegment);
       } else {
         _logger.info("Found {} primary keys not replaced when replacing 
segment: {}", numKeysNotReplaced, segmentName);
       }
       removeSegment(oldSegment, validDocIdsForOldSegment);
     }
   }
 
+  private void revertSegmentUpsertMetadata(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, 
String segmentName,
+      MutableRoaringBitmap validDocIdsForOldSegment) {

Review Comment:
   (minor) Not used



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java:
##########
@@ -67,6 +68,12 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
   @VisibleForTesting
   final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
       _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  final ConcurrentHashMap<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>

Review Comment:
   (minor) Make it `private`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -669,27 +673,59 @@ public void replaceSegment(ImmutableSegment segment, 
@Nullable ThreadSafeMutable
         // of the other server
         _logger.warn(
             "Found {} primary keys not replaced when replacing segment: {} for 
upsert table with dropOutOfOrderRecord"
-                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas",
-            numKeysNotReplaced, segmentName);
+                + " enabled with no consistency mode. This can potentially 
cause inconsistency between replicas. "
+                + "Reverting metadata changes and triggering segment 
replacement.", numKeysNotReplaced, segmentName);
         _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
             numKeysNotReplaced);
+        // Revert consuming segment pks to previous segment locations and 
perform metadata replacement again
+        revertSegmentUpsertMetadata(segment, validDocIds, queryableDocIds, 
oldSegment, segmentName,
+            validDocIdsForOldSegment);
       } else if (_partialUpsertHandler != null) {
         // For partial-upsert table, because we do not restore the original 
record location when removing the primary
         // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
         // consuming segment is replaced by a committed segment that is 
consumed from a different server with
         // different records (some stream consumer cannot guarantee consuming 
the messages in the same order/
         // when a segment is replaced with lesser consumed rows from the other 
server).
         _logger.warn("Found {} primary keys not replaced when replacing 
segment: {} for partial-upsert table. This "
-            + "can potentially cause inconsistency between replicas", 
numKeysNotReplaced, segmentName);
+            + "can potentially cause inconsistency between replicas. "
+            + "Reverting metadata changes and triggering segment 
replacement.", numKeysNotReplaced, segmentName);
         _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
             numKeysNotReplaced);
+        // Revert consuming segment pks to previous segment locations and 
perform metadata replacement again
+        revertSegmentUpsertMetadata(segment, validDocIds, queryableDocIds, 
oldSegment, segmentName,
+            validDocIdsForOldSegment);
       } else {
         _logger.info("Found {} primary keys not replaced when replacing 
segment: {}", numKeysNotReplaced, segmentName);
       }
       removeSegment(oldSegment, validDocIdsForOldSegment);
     }
   }
 
+  private void revertSegmentUpsertMetadata(ImmutableSegment segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment, 
String segmentName,
+      MutableRoaringBitmap validDocIdsForOldSegment) {
+    revertCurrentSegmentUpsertMetadata(oldSegment, validDocIds, 
queryableDocIds);
+    validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
+    try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+        _comparisonColumns, _deleteRecordColumn)) {
+      Iterator<RecordInfo> latestRecordInfoIterator =
+          UpsertUtils.getRecordInfoIterator(recordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
+      addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
queryableDocIds, latestRecordInfoIterator,
+          oldSegment, validDocIdsForOldSegment);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Caught exception while replacing segment metadata 
during inconsistencies: %s, table: %s",
+              segmentName, _tableNameWithType), e);
+    }
+  }
+
+  protected abstract void removeNewlyAddedKeys(IndexSegment oldSegment);
+
+  protected abstract void eraseKeyToPreviousLocationMap();
+
+  protected abstract void revertCurrentSegmentUpsertMetadata(IndexSegment 
oldSegment,
+      ThreadSafeMutableRoaringBitmap validDocIds, 
ThreadSafeMutableRoaringBitmap queryableDocIds);

Review Comment:
   Do we need them? Seems not used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to