This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2b62eb1fadb Revert Upsert Metadata of a segment during Inconsistencies
(#17324)
2b62eb1fadb is described below
commit 2b62eb1fadbad85666922f878cac9dc07c26d90f
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Wed Dec 31 18:45:00 2025 -0800
Revert Upsert Metadata of a segment during Inconsistencies (#17324)
* Revert Upsert Metadata For partial upserts and Upserts with outOfOrder =
true
* Checkstyle fixes
* Revert the reordering of the variables
Co-authored-by: Copilot <[email protected]>
* Apply suggestions from code review
Co-authored-by: Copilot <[email protected]>
* Remove class name in the declaration
* Make methods abstract
* Replace docId while reverting the metadata
* Checkstyle fixes
* Change the comments
* Add primary key instead of segment
* Clear the newly added pks
* Clear keys after removing the entries in the hashset
* update the comment
* Typo for _previousKeyToRecordLocationMap
* Checkstyle Exceptions
* Add exception message
* Add tests
* Remove unused variable
* Change the code to accomodate the newly added keys when there is
consistent deletes enabled
* Remove the comments
* Checkstyle fixes
* Reorder the imports
* fix the test and compilation error
* Add in more tests and reversion logic into consistent deletes - Upserts
* Update
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
Co-authored-by: Copilot <[email protected]>
* Apply suggestions from code review
Add suggestions from co-pilot
Co-authored-by: Copilot <[email protected]>
* Checkstyle fixes
* Reduce the number of comments
* Update the pk map
* Fix the compilation issue
* Add segment null checks
* Update the comments
* Add in information to track the number of keys getting replaced for a
segment
---------
Co-authored-by: Copilot <[email protected]>
---
.../models/DummyTableUpsertMetadataManager.java | 13 +
.../upsert/BasePartitionUpsertMetadataManager.java | 121 +++-
...oncurrentMapPartitionUpsertMetadataManager.java | 87 ++-
...nUpsertMetadataManagerForConsistentDeletes.java | 95 +++-
.../BasePartitionUpsertMetadataManagerTest.java | 13 +
...ertMetadataManagerForConsistentDeletesTest.java | 112 ++++
...rrentMapPartitionUpsertMetadataManagerTest.java | 609 ++++++++++++++++++++-
7 files changed, 991 insertions(+), 59 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
index 42dfef78287..b7c9b733ef4 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
@@ -78,6 +78,10 @@ public class DummyTableUpsertMetadataManager extends
BaseTableUpsertMetadataMana
return false;
}
+ @Override
+ protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
+ }
+
@Override
protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
}
@@ -90,5 +94,14 @@ public class DummyTableUpsertMetadataManager extends
BaseTableUpsertMetadataMana
@Override
protected void doRemoveExpiredPrimaryKeys() {
}
+
+ @Override
+ protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
+ ThreadSafeMutableRoaringBitmap validDocIds,
ThreadSafeMutableRoaringBitmap queryableDocIds) {
+ }
+
+ @Override
+ protected void eraseKeyToPreviousLocationMap() {
+ }
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index fdd4b1bb62e..1c64c6ee3b9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -289,6 +289,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
try {
doAddSegment((ImmutableSegmentImpl) segment);
+ eraseKeyToPreviousLocationMap();
_trackedSegments.add(segment);
if (_enableSnapshot) {
_updatedSegmentsSinceLastSnapshot.add(segment);
@@ -404,6 +405,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
try {
doPreloadSegment((ImmutableSegmentImpl) segment);
+ eraseKeyToPreviousLocationMap();
_trackedSegments.add(segment);
_updatedSegmentsSinceLastSnapshot.add(segment);
} finally {
@@ -571,6 +573,7 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
}
try {
doReplaceSegment(segment, oldSegment);
+ eraseKeyToPreviousLocationMap();
if (!(segment instanceof EmptyIndexSegment)) {
_trackedSegments.add(segment);
if (_enableSnapshot) {
@@ -610,7 +613,8 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
replaceSegment(segment, null, null, recordInfoIterator, oldSegment);
} catch (Exception e) {
throw new RuntimeException(
- String.format("Caught exception while replacing segment: %s, table:
%s", segmentName, _tableNameWithType), e);
+ String.format("Caught exception while replacing segment: %s, table:
%s, message: %s", segmentName,
+ _tableNameWithType, e.getMessage()), e);
}
// Update metrics
@@ -620,6 +624,8 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
+ private static final int MAX_UPSERT_REVERT_RETRIES = 3;
+
/**
* NOTE: We allow passing in validDocIds and queryableDocIds here so that
the value can be easily accessed from the
* tests. The passed in bitmaps should always be empty.
@@ -659,37 +665,94 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
}
if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty()) {
- int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
- // Add the new metric tracking here
- if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode()
== UpsertConfig.ConsistencyMode.NONE) {
- // For Upsert tables when some of the records get dropped when
dropOutOfOrderRecord is enabled, we donot
- // store the original record location when keys are not replaced, this
can potentially cause inconsistencies
- // leading to some rows not getting dropped when reconsumed. This can
be caused when a consuming segment
- // that is consumed from a different server is replaced with the
existing segment which consumed rows ahead
- // of the other server
- _logger.warn(
- "Found {} primary keys not replaced when replacing segment: {} for
upsert table with dropOutOfOrderRecord"
- + " enabled with no consistency mode. This can potentially
cause inconsistency between replicas",
- numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
- numKeysNotReplaced);
- } else if (_partialUpsertHandler != null) {
- // For partial-upsert table, because we do not restore the original
record location when removing the primary
- // keys not replaced, it can potentially cause inconsistency between
replicas. This can happen when a
- // consuming segment is replaced by a committed segment that is
consumed from a different server with
- // different records (some stream consumer cannot guarantee consuming
the messages in the same order/
- // when a segment is replaced with lesser consumed rows from the other
server).
- _logger.warn("Found {} primary keys not replaced when replacing
segment: {} for partial-upsert table. This "
- + "can potentially cause inconsistency between replicas",
numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
- numKeysNotReplaced);
+ checkForInconsistencies(segment, validDocIds, queryableDocIds,
oldSegment, validDocIdsForOldSegment, segmentName);
+ removeSegment(oldSegment, validDocIdsForOldSegment);
+ }
+ }
+
+ void checkForInconsistencies(ImmutableSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
+ MutableRoaringBitmap validDocIdsForOldSegment, String segmentName) {
+ int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+ boolean isConsumingSegmentSeal = !(oldSegment instanceof ImmutableSegment);
+ // For partial-upsert table and upsert table with dropOutOfOrder=true &
consistencyMode = NONE, we do not store
+ // the previous record location when removing the primary keys not
replaced, it can potentially cause inconsistency
+ // between replicas. This can happen when a consuming segment is replaced
by a committed segment that is consumed
+ // from a different server with different records (some stream consumer
cannot guarantee consuming the messages in
+ // the same order/ when a segment is replaced with lesser consumed rows
from the other server).
+ if (isConsumingSegmentSeal && _context.isDropOutOfOrderRecord()
+ && _context.getConsistencyMode() == UpsertConfig.ConsistencyMode.NONE)
{
+ _logger.warn("Found {} primary keys not replaced when sealing consuming
segment: {} for upsert table with "
+ + "dropOutOfOrderRecord enabled with no consistency mode. This
can potentially cause inconsistency "
+ + "between replicas. Reverting back metadata changes and
triggering segment replacement.",
+ numKeysNotReplaced,
+ segmentName);
+ revertSegmentUpsertMetadataWithRetry(segment, validDocIds,
queryableDocIds, oldSegment, segmentName);
+ } else if (isConsumingSegmentSeal && _partialUpsertHandler != null) {
+ _logger.warn("Found {} primary keys not replaced when sealing consuming
segment: {} for partial-upsert table. "
+ + "This can potentially cause inconsistency between replicas. "
+ + "Reverting metadata changes and triggering segment replacement.",
numKeysNotReplaced, segmentName);
+ revertSegmentUpsertMetadataWithRetry(segment, validDocIds,
queryableDocIds, oldSegment, segmentName);
+ } else {
+ _logger.warn("Found {} primary keys not replaced for the segment: {}.",
numKeysNotReplaced, segmentName);
+ }
+ }
+
+ /**
+ * Reverts segment upsert metadata and retries addOrReplaceSegment with a
maximum retry limit to prevent infinite
+ * recursion in case of persistent inconsistencies.
+ */
+ void revertSegmentUpsertMetadataWithRetry(ImmutableSegment segment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds, IndexSegment oldSegment,
String segmentName) {
+ for (int retryCount = 0; retryCount < MAX_UPSERT_REVERT_RETRIES;
retryCount++) {
+ revertCurrentSegmentUpsertMetadata(oldSegment, validDocIds,
queryableDocIds);
+ MutableRoaringBitmap validDocIdsForOldSegment =
getValidDocIdsForOldSegment(oldSegment);
+ try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+ _comparisonColumns, _deleteRecordColumn)) {
+ Iterator<RecordInfo> latestRecordInfoIterator =
+ UpsertUtils.getRecordInfoIterator(recordInfoReader,
segment.getSegmentMetadata().getTotalDocs());
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
queryableDocIds, latestRecordInfoIterator,
+ oldSegment, validDocIdsForOldSegment);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while replacing segment metadata
during inconsistencies: %s, table: %s",
+ segmentName, _tableNameWithType), e);
+ }
+
+ validDocIdsForOldSegment = getValidDocIdsForOldSegment(oldSegment);
+ if (validDocIdsForOldSegment.isEmpty()) {
+ _logger.info("Successfully resolved inconsistency for segment: {}
after {} retry attempt(s)", segmentName,
+ retryCount + 1);
+ return;
+ }
+
+ int numKeysStillNotReplaced = validDocIdsForOldSegment.getCardinality();
+ if (retryCount < MAX_UPSERT_REVERT_RETRIES - 1) {
+ _logger.warn("Retry {}/{}: Still found {} primary keys not replaced
for segment: {}. Retrying...",
+ retryCount + 1, MAX_UPSERT_REVERT_RETRIES,
numKeysStillNotReplaced, segmentName);
} else {
- _logger.info("Found {} primary keys not replaced when replacing
segment: {}", numKeysNotReplaced, segmentName);
+ _logger.error("Exhausted all {} retries for segment: {}. Found {}
primary keys still not replaced. "
+ + "Proceeding with current state which may cause
inconsistency.", MAX_UPSERT_REVERT_RETRIES,
+ segmentName,
+ numKeysStillNotReplaced);
+ if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode()
== UpsertConfig.ConsistencyMode.NONE) {
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
+ numKeysStillNotReplaced);
+ } else if (_partialUpsertHandler != null) {
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
+ numKeysStillNotReplaced);
+ }
}
- removeSegment(oldSegment, validDocIdsForOldSegment);
}
}
+ protected abstract void removeNewlyAddedKeys(IndexSegment oldSegment);
+
+ protected abstract void eraseKeyToPreviousLocationMap();
+
+ protected abstract void revertCurrentSegmentUpsertMetadata(IndexSegment
oldSegment,
+ ThreadSafeMutableRoaringBitmap validDocIds,
ThreadSafeMutableRoaringBitmap queryableDocIds);
+
private MutableRoaringBitmap getValidDocIdsForOldSegment(IndexSegment
oldSegment) {
return oldSegment.getValidDocIds() != null ?
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
}
@@ -699,8 +762,8 @@ public abstract class BasePartitionUpsertMetadataManager
implements PartitionUps
removeSegment(segment,
UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, validDocIds));
} catch (Exception e) {
throw new RuntimeException(
- String.format("Caught exception while removing segment: %s, table:
%s", segment.getSegmentName(),
- _tableNameWithType), e);
+ String.format("Caught exception while removing segment: %s, table:
%s, message: %s", segment.getSegmentName(),
+ _tableNameWithType, e.getMessage()), e);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index ad5058c7030..550a20f9e34 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -54,6 +55,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
@VisibleForTesting
final ConcurrentHashMap<Object, RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Object, RecordLocation>
_previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
+ private final Map<Object, RecordLocation> _newlyAddedKeys = new
ConcurrentHashMap<>();
+
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType,
int partitionId, UpsertContext context) {
super(tableNameWithType, partitionId, context);
}
@@ -89,7 +93,12 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
if (currentSegment == segment) {
if (comparisonResult >= 0) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentDocId, newDocId, recordInfo);
- return new RecordLocation(segment, newDocId,
newComparisonValue);
+ RecordLocation newRecordLocation = new
RecordLocation(segment, newDocId, newComparisonValue);
+ // Track the record location of the newly added keys
+ if (_newlyAddedKeys.containsKey(primaryKey)) {
+ _newlyAddedKeys.put(primaryKey, newRecordLocation);
+ }
+ return newRecordLocation;
} else {
return currentRecordLocation;
}
@@ -139,6 +148,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
currentSegmentName, getAuthoritativeCreationTime(segment),
getAuthoritativeCreationTime(currentSegment)))) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
+ if (currentSegment != segment) {
+ _previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
+ }
return new RecordLocation(segment, newDocId,
newComparisonValue);
} else {
return currentRecordLocation;
@@ -146,7 +158,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
} else {
// New primary key
addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
- return new RecordLocation(segment, newDocId, newComparisonValue);
+ RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue);
+ _newlyAddedKeys.put(primaryKey, newRecordLocation);
+ return newRecordLocation;
}
});
}
@@ -245,6 +259,57 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
}
}
+ @Override
+ protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds) {
+ // Revert to previous locations present in other segment
+ // Replace the valid doc id to that segment location
+ _logger.info("Reverting Upsert metadata for {} keys for the segment: {}",
_previousKeyToRecordLocationMap.size(),
+ oldSegment.getSegmentName());
+ for (Map.Entry<Object, RecordLocation> obj :
_previousKeyToRecordLocationMap.entrySet()) {
+ IndexSegment prevSegment = obj.getValue().getSegment();
+ if (prevSegment != null) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(prevSegment,
+ _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) {
+ int newDocId = obj.getValue().getDocId();
+ int currentDocId =
_primaryKeyToRecordLocationMap.get(obj.getKey()).getDocId();
+ RecordInfo recordInfo = recordInfoReader.getRecordInfo(newDocId);
+ replaceDocId(prevSegment, prevSegment.getValidDocIds(),
prevSegment.getQueryableDocIds(), oldSegment,
+ currentDocId, newDocId, recordInfo);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ _primaryKeyToRecordLocationMap.put(obj.getKey(), obj.getValue());
+ } else {
+ _primaryKeyToRecordLocationMap.remove(obj.getKey());
+ }
+ }
+ _logger.info("Reverted Upsert metadata of {} keys to previous segment
locations for the segment: {}",
+ _previousKeyToRecordLocationMap.size(), oldSegment.getSegmentName());
+ removeNewlyAddedKeys(oldSegment);
+ }
+
+ @Override
+ protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
+ // Remove the newly added keys in the metadata map and in the valid doc ids
+ int removedKeys = 0;
+ for (Map.Entry<Object, RecordLocation> entry : _newlyAddedKeys.entrySet())
{
+ if (entry.getValue().getSegment() == oldSegment) {
+ _primaryKeyToRecordLocationMap.remove(entry.getKey());
+ removeDocId(oldSegment, entry.getValue().getDocId());
+ removedKeys++;
+ }
+ }
+ _logger.info("Removed newly added {} keys for the segment: {} out of :
{}", removedKeys,
+ oldSegment.getSegmentName(), _previousKeyToRecordLocationMap.size());
+ }
+
+ @Override
+ protected void eraseKeyToPreviousLocationMap() {
+ _previousKeyToRecordLocationMap.clear();
+ _newlyAddedKeys.clear();
+ }
+
@Override
protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo)
{
AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
@@ -263,19 +328,27 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key
-
+ IndexSegment currentSegment = currentRecordLocation.getSegment();
// Update the record location when the new comparison value is
greater than or equal to the current value.
// Update the record location when there is a tie to keep the
newer record.
if
(newComparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0)
{
- IndexSegment currentSegment = currentRecordLocation.getSegment();
int currentDocId = currentRecordLocation.getDocId();
+ RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue);
if (segment == currentSegment) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentDocId, newDocId, recordInfo);
+ // Track the record location of the newly added keys
+ if (_newlyAddedKeys.containsKey(primaryKey)) {
+ _newlyAddedKeys.put(primaryKey, newRecordLocation);
+ }
} else {
+ _previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
}
- return new RecordLocation(segment, newDocId, newComparisonValue);
+ return newRecordLocation;
} else {
+ if (segment != currentSegment) {
+ _previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
+ }
// Out-of-order record
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(),
recordInfo.getComparisonValue());
isOutOfOrderRecord.set(true);
@@ -284,7 +357,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
} else {
// New primary key
addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
- return new RecordLocation(segment, newDocId, newComparisonValue);
+ RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue);
+ _newlyAddedKeys.put(primaryKey, newRecordLocation);
+ return newRecordLocation;
}
});
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 9af3ec6c234..beb63cfa797 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -67,6 +68,14 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
@VisibleForTesting
final ConcurrentHashMap<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+ private final ConcurrentHashMap<Object,
+
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
+ _previousKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+ private final Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
+ _newlyAddedKeys = new ConcurrentHashMap<>();
+
// Used to initialize a reference to previous row for merging in partial
upsert
private final LazyRow _reusePreviousRow = new LazyRow();
private final Map<String, Object> _reuseMergeResultHolder = new HashMap<>();
@@ -115,8 +124,13 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
if (currentSegment == segment) {
if (comparisonResult >= 0) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentDocId, newDocId, recordInfo);
- return new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation(segment,
+ RecordLocation newRecordLocation = new
RecordLocation(segment,
newDocId, newComparisonValue,
currentDistinctSegmentCount);
+ // Track the record location of the newly added keys
+ if (_newlyAddedKeys.containsKey(primaryKey)) {
+ _newlyAddedKeys.put(primaryKey, newRecordLocation);
+ }
+ return newRecordLocation;
} else {
return currentRecordLocation;
}
@@ -178,7 +192,9 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
} else {
// New primary key
addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
- return new RecordLocation(segment, newDocId, newComparisonValue,
1);
+ RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue, 1);
+ _newlyAddedKeys.put(primaryKey, newRecordLocation);
+ return newRecordLocation;
}
});
}
@@ -241,16 +257,9 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds,
queryableDocIds, recordInfoIterator,
oldSegment, validDocIdsForOldSegment);
}
- if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty() && _partialUpsertHandler != null) {
- int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
- // For partial-upsert table, because we do not restore the original
record location when removing the primary
- // keys not replaced, it can potentially cause inconsistency between
replicas. This can happen when a
- // consuming segment is replaced by a committed segment that is
consumed from a different server with
- // different records (some stream consumer cannot guarantee consuming
the messages in the same order).
- _logger.warn("Found {} primary keys not replaced when replacing
segment: {} for partial-upsert table. This "
- + "can potentially cause inconsistency between replicas",
numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
- numKeysNotReplaced);
+ if (validDocIdsForOldSegment != null &&
!validDocIdsForOldSegment.isEmpty()) {
+ checkForInconsistencies(segment, validDocIds, queryableDocIds,
oldSegment, validDocIdsForOldSegment,
+ segmentName);
}
// we want to always remove a segment in case of
enableDeletedKeysCompactionConsistency = true
// this is to account for the removal of primary-key in the
to-be-removed segment and reduce
@@ -261,6 +270,20 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
}
}
+ @Override
+ protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
+ int removedKeys = 0;
+ for (Map.Entry<Object, RecordLocation> entry : _newlyAddedKeys.entrySet())
{
+ if (entry.getValue().getSegment() == oldSegment) {
+ _primaryKeyToRecordLocationMap.remove(entry.getKey());
+ removeDocId(oldSegment, entry.getValue().getDocId());
+ removedKeys++;
+ }
+ }
+ _logger.info("Removed newly added {} keys for the segment: {} out of :
{}", removedKeys,
+ oldSegment.getSegmentName(), _previousKeyToRecordLocationMap.size());
+ }
+
@Override
protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey>
primaryKeyIterator) {
// We need to decrease the distinctSegmentCount for each unique primary
key in this deleting segment by 1
@@ -345,6 +368,42 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
}
}
+ @Override
+ protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds) {
+ _logger.info("Reverting Upsert metadata for {} keys",
_previousKeyToRecordLocationMap.size());
+ // Revert to previous locations present in other segment
+ for (Map.Entry<Object, RecordLocation> obj :
_previousKeyToRecordLocationMap.entrySet()) {
+ IndexSegment prevSegment = obj.getValue().getSegment();
+ if (prevSegment != null) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader = new
UpsertUtils.RecordInfoReader(prevSegment,
+ _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) {
+ int newDocId = obj.getValue().getDocId();
+ int currentDocId =
_primaryKeyToRecordLocationMap.get(obj.getKey()).getDocId();
+ RecordInfo recordInfo = recordInfoReader.getRecordInfo(newDocId);
+ // Update valid docId to the other segment location
+ replaceDocId(prevSegment, prevSegment.getValidDocIds(),
prevSegment.getQueryableDocIds(), oldSegment,
+ currentDocId, newDocId, recordInfo);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ _primaryKeyToRecordLocationMap.put(obj.getKey(), obj.getValue());
+ } else {
+ _primaryKeyToRecordLocationMap.remove(obj.getKey());
+ }
+ }
+ _logger.info("Reverted Upsert metadata of {} keys to previous segment
locations for the segment: {}",
+ _previousKeyToRecordLocationMap.size(), oldSegment.getSegmentName());
+ // For the newly added keys into the segment, remove the pk and valid doc
id
+ removeNewlyAddedKeys(oldSegment);
+ }
+
+ @Override
+ protected void eraseKeyToPreviousLocationMap() {
+ _previousKeyToRecordLocationMap.clear();
+ _newlyAddedKeys.clear();
+ }
+
@Override
protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo)
{
AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
@@ -370,9 +429,15 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
int currentDocId = currentRecordLocation.getDocId();
if (segment == currentSegment) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentDocId, newDocId, recordInfo);
- return new RecordLocation(segment, newDocId,
newComparisonValue,
+ RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue,
currentRecordLocation.getDistinctSegmentCount());
+ // Track the record location of the newly added keys
+ if (_newlyAddedKeys.containsKey(primaryKey)) {
+ _newlyAddedKeys.put(primaryKey, newRecordLocation);
+ }
+ return newRecordLocation;
} else {
+ _previousKeyToRecordLocationMap.put(primaryKey,
currentRecordLocation);
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
return new RecordLocation(segment, newDocId,
newComparisonValue,
RecordLocation.incrementSegmentCount(currentRecordLocation.getDistinctSegmentCount()));
@@ -393,7 +458,9 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
} else {
// New primary key
addDocId(segment, validDocIds, queryableDocIds, newDocId,
recordInfo);
- return new RecordLocation(segment, newDocId, newComparisonValue,
1);
+ RecordLocation newRecordLocation = new RecordLocation(segment,
newDocId, newComparisonValue, 1);
+ _newlyAddedKeys.put(primaryKey, newRecordLocation);
+ return newRecordLocation;
}
});
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index a18e4904317..c07d3e41cd1 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -1050,6 +1050,10 @@ public class BasePartitionUpsertMetadataManagerTest {
return false;
}
+ @Override
+ protected void removeNewlyAddedKeys(IndexSegment oldSegment) {
+ }
+
@Override
protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
}
@@ -1062,5 +1066,14 @@ public class BasePartitionUpsertMetadataManagerTest {
@Override
protected void doRemoveExpiredPrimaryKeys() {
}
+
+ @Override
+ protected void revertCurrentSegmentUpsertMetadata(IndexSegment oldSegment,
+ ThreadSafeMutableRoaringBitmap validDocIds,
ThreadSafeMutableRoaringBitmap queryableDocIds) {
+ }
+
+ @Override
+ protected void eraseKeyToPreviousLocationMap() {
+ }
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
index 4c420665bf9..350154fc852 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
@@ -50,6 +50,7 @@ import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.PrimaryKey;
@@ -166,6 +167,37 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
return segment;
}
+ private static MutableSegment mockMutableSegmentWithDataSource(int
sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds,
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ int[] primaryKeys) {
+ MutableSegment segment = mock(MutableSegment.class);
+ when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
+ when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+ when(segment.getValidDocIds()).thenReturn(validDocIds);
+
+ DataSource dataSource = mock(DataSource.class);
+ ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
+ when(forwardIndex.isSingleValue()).thenReturn(true);
+ when(forwardIndex.getStoredType()).thenReturn(FieldSpec.DataType.INT);
+ when(forwardIndex.getInt(anyInt(), any())).thenAnswer(invocation -> {
+ int docId = invocation.getArgument(0);
+ if (primaryKeys != null && docId < primaryKeys.length) {
+ return primaryKeys[docId];
+ }
+ return docId;
+ });
+ when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+ when(segment.getDataSource(anyString())).thenReturn(dataSource);
+
when(segment.getDataSource(PRIMARY_KEY_COLUMNS.get(0))).thenReturn(dataSource);
+
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys != null ?
primaryKeys.length : 0);
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+
+ return segment;
+ }
+
private static String getSegmentName(int sequenceNumber) {
return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber,
System.currentTimeMillis()).toString();
}
@@ -1229,4 +1261,84 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
assertEquals(BytesUtils.toHexString(((ByteArray)
HashUtils.hashPrimaryKey(pk, HashFunction.MURMUR3)).getBytes()),
"37fab5ef0ea39711feabcdc623cb8a4e");
}
+
+ @Test
+ public void testRevertOnlyAppliesForConsumingSegmentSeal()
+ throws IOException {
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int[] mutablePrimaryKeys = new int[]{10, 20, 30};
+ ThreadSafeMutableRoaringBitmap validDocIdsMutable = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1,
validDocIdsMutable, null, mutablePrimaryKeys);
+
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(10), 0, 1000, false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(20), 1, 2000, false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(30), 2, 3000, false));
+
+ Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
assertEquals(validDocIdsMutable.getMutableRoaringBitmap().getCardinality(), 3);
+
+ int numRecords = 2;
+ int[] primaryKeys = new int[]{10, 20};
+ int[] timestamps = new int[]{1500, 2500};
+ ThreadSafeMutableRoaringBitmap validDocIdsImmutable = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeysList = getPrimaryKeyList(numRecords,
primaryKeys);
+ ImmutableSegmentImpl immutableSegment = mockImmutableSegment(1,
validDocIdsImmutable, null, primaryKeysList);
+
+ upsertMetadataManager.replaceSegment(immutableSegment,
validDocIdsImmutable, null,
+ getRecordInfoList(numRecords, primaryKeys, timestamps,
null).iterator(), mutableSegment);
+
+ assertEquals(recordLocationMap.size(), 2);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testNoRevertForImmutableSegmentReplacement()
+ throws IOException {
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
upsertMetadataManager =
+ new
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{10, 20, 30};
+ int[] timestamps1 = new int[]{1000, 2000, 3000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1,
null, primaryKeysList1);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoList(numRecords1, primaryKeys1, timestamps1,
null).iterator());
+ Map<Object,
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation>
recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
+ int numRecords2 = 1;
+ int[] primaryKeys2 = new int[]{10};
+ int[] timestamps2 = new int[]{1500};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegment(1, validDocIds2,
null, primaryKeysList2);
+
+ long startTime = System.currentTimeMillis();
+ upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+ getRecordInfoList(numRecords2, primaryKeys2, timestamps2,
null).iterator(), segment1);
+ long duration = System.currentTimeMillis() - startTime;
+
+ assertTrue(duration < 1000, "Immutable-to-immutable replacement should
complete quickly, took: " + duration + "ms");
+
+ assertEquals(recordLocationMap.size(), 1);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 684e9ba9c77..9e3beaf4bbb 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -38,6 +38,9 @@ import
org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
import org.apache.pinot.segment.local.utils.HashUtils;
@@ -46,17 +49,25 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.mockito.MockedConstruction;
@@ -69,6 +80,7 @@ import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.when;
@@ -83,18 +95,38 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
private static final String DELETE_RECORD_COLUMN = "deleteCol";
private static final File INDEX_DIR =
new File(FileUtils.getTempDirectory(),
"ConcurrentMapPartitionUpsertMetadataManagerTest");
+ private static final File SEGMENT_DIR = new File(INDEX_DIR, "segments");
+
+ private static final int MOCK_FALLBACK_BASE_OFFSET = 1000;
+
+ // Schema and TableConfig for creating real segments
+ private static final Schema SEGMENT_SCHEMA = new Schema.SchemaBuilder()
+ .addSingleValueDimension(PRIMARY_KEY_COLUMNS.get(0),
FieldSpec.DataType.INT)
+ .addMetric(COMPARISON_COLUMNS.get(0), FieldSpec.DataType.INT)
+ .setPrimaryKeyColumns(PRIMARY_KEY_COLUMNS)
+ .build();
+ private static final TableConfig SEGMENT_TABLE_CONFIG = new
TableConfigBuilder(TableType.REALTIME)
+ .setTableName(RAW_TABLE_NAME)
+ .build();
private UpsertContext.Builder _contextBuilder;
+ private int _segmentCounter = 0;
@BeforeClass
public void setUp()
throws IOException {
FileUtils.forceMkdir(INDEX_DIR);
+ FileUtils.forceMkdir(SEGMENT_DIR);
ServerMetrics.register(mock(ServerMetrics.class));
}
@BeforeMethod
- public void setUpContextBuilder() {
+ public void setUpContextBuilder()
+ throws IOException {
+ // Clean up segment directory between tests
+ FileUtils.cleanDirectory(SEGMENT_DIR);
+ _segmentCounter = 0;
+
TableDataManager tableDataManager = mock(TableDataManager.class);
when(tableDataManager.getTableDataDir()).thenReturn(INDEX_DIR);
_contextBuilder = new UpsertContext.Builder()
@@ -784,6 +816,17 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
return recordInfoList;
}
+ // Helper method for new reversion tests that need Integer comparison values
+ private List<RecordInfo> getRecordInfoListWithIntegerComparison(int
numRecords, int[] primaryKeys, int[] timestamps,
+ @Nullable boolean[] deleteRecordFlags) {
+ List<RecordInfo> recordInfoList = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i,
timestamps[i],
+ deleteRecordFlags != null && deleteRecordFlags[i]));
+ }
+ return recordInfoList;
+ }
+
/**
* Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True).
*/
@@ -808,26 +851,127 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
List<PrimaryKey> primaryKeys) {
+ return mockImmutableSegmentWithTimestamps(sequenceNumber, validDocIds,
queryableDocIds, primaryKeys, null);
+ }
+
+ private static ImmutableSegmentImpl mockImmutableSegmentWithTimestamps(int
sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, @Nullable int[] timestamps) {
ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
when(segment.getValidDocIds()).thenReturn(validDocIds);
when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
- DataSource dataSource = mock(DataSource.class);
- when(segment.getDataSource(anyString())).thenReturn(dataSource);
- ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
- when(forwardIndex.isSingleValue()).thenReturn(true);
- when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
- when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
- invocation ->
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
- when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+ // Enhanced mocking for RecordInfoReader to work properly
+ // Mock primary key column data source
+ DataSource primaryKeyDataSource = mock(DataSource.class);
+ ForwardIndexReader primaryKeyForwardIndex = mock(ForwardIndexReader.class);
+ when(primaryKeyForwardIndex.isSingleValue()).thenReturn(true);
+ when(primaryKeyForwardIndex.getStoredType()).thenReturn(DataType.INT);
+ when(primaryKeyForwardIndex.createContext()).thenReturn(null);
+ when(primaryKeyForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation
-> {
+ int docId = invocation.getArgument(0);
+ if (primaryKeys != null && docId < primaryKeys.size()) {
+ return (Integer) primaryKeys.get(docId).getValues()[0];
+ }
+ return MOCK_FALLBACK_BASE_OFFSET + docId;
+ });
+
when(primaryKeyDataSource.getForwardIndex()).thenReturn(primaryKeyForwardIndex);
+
+ // Mock comparison column data source
+ DataSource comparisonDataSource = mock(DataSource.class);
+ ForwardIndexReader comparisonForwardIndex = mock(ForwardIndexReader.class);
+ when(comparisonForwardIndex.isSingleValue()).thenReturn(true);
+ when(comparisonForwardIndex.getStoredType()).thenReturn(DataType.INT);
+ when(comparisonForwardIndex.createContext()).thenReturn(null);
+ when(comparisonForwardIndex.getInt(anyInt(), any())).thenAnswer(invocation
-> {
+ int docId = invocation.getArgument(0);
+ // Return actual timestamp values if provided, otherwise default values
+ if (timestamps != null && docId < timestamps.length) {
+ return timestamps[docId];
+ }
+ return MOCK_FALLBACK_BASE_OFFSET + (docId * 100);
+ });
+
when(comparisonDataSource.getForwardIndex()).thenReturn(comparisonForwardIndex);
+
+ // Set up data source mapping - IMPORTANT: anyString() must be registered
FIRST,
+ // then specific matchers override it (Mockito uses last matching stub)
+ when(segment.getDataSource(anyString())).thenReturn(primaryKeyDataSource);
// Default fallback first
+
when(segment.getDataSource(eq(PRIMARY_KEY_COLUMNS.get(0)))).thenReturn(primaryKeyDataSource);
+
when(segment.getDataSource(eq(COMPARISON_COLUMNS.get(0)))).thenReturn(comparisonDataSource);
+
+ // Mock segment metadata with proper total docs and column metadata
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
long creationTimeMs = System.currentTimeMillis();
when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
when(segmentMetadata.getZkCreationTime()).thenReturn(creationTimeMs);
+ when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys != null ?
primaryKeys.size() : 0);
+
+ // Mock column metadata for primary key and comparison columns
+ TreeMap<String, ColumnMetadata> columnMetadataMap = new TreeMap<>();
+ ColumnMetadata primaryKeyColumnMetadata = mock(ColumnMetadata.class);
+ when(primaryKeyColumnMetadata.getFieldSpec()).thenReturn(
+ new DimensionFieldSpec(PRIMARY_KEY_COLUMNS.get(0), DataType.INT,
true));
+ ColumnMetadata comparisonColumnMetadata = mock(ColumnMetadata.class);
+ when(comparisonColumnMetadata.getFieldSpec()).thenReturn(
+ new DimensionFieldSpec(COMPARISON_COLUMNS.get(0), DataType.INT, true));
+ columnMetadataMap.put(PRIMARY_KEY_COLUMNS.get(0),
primaryKeyColumnMetadata);
+ columnMetadataMap.put(COMPARISON_COLUMNS.get(0), comparisonColumnMetadata);
+ when(segmentMetadata.getColumnMetadataMap()).thenReturn(columnMetadataMap);
+
when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
return segment;
}
+ /**
+ * Creates a real ImmutableSegment with actual data on disk.
+ * This avoids the complexity of mocking data sources for RecordInfoReader.
+ *
+ * @param primaryKeys array of primary key values
+ * @param timestamps array of timestamp/comparison values
+ * @param validDocIds bitmap to track valid doc IDs (will be populated)
+ * @return a real ImmutableSegmentImpl that can be read by RecordInfoReader
+ */
+ private ImmutableSegmentImpl createRealSegment(int[] primaryKeys, int[]
timestamps,
+ ThreadSafeMutableRoaringBitmap validDocIds)
+ throws Exception {
+ return createRealSegment("segment_" + (_segmentCounter++), primaryKeys,
timestamps, validDocIds);
+ }
+
+ private ImmutableSegmentImpl createRealSegment(String segmentName, int[]
primaryKeys, int[] timestamps,
+ ThreadSafeMutableRoaringBitmap validDocIds)
+ throws Exception {
+ File segmentOutputDir = new File(SEGMENT_DIR, segmentName);
+
+ // Create rows with primary key and timestamp data
+ List<GenericRow> rows = new ArrayList<>();
+ for (int i = 0; i < primaryKeys.length; i++) {
+ GenericRow row = new GenericRow();
+ row.putValue(PRIMARY_KEY_COLUMNS.get(0), primaryKeys[i]);
+ row.putValue(COMPARISON_COLUMNS.get(0), timestamps[i]);
+ rows.add(row);
+ validDocIds.add(i);
+ }
+
+ // Configure segment generation
+ SegmentGeneratorConfig config = new
SegmentGeneratorConfig(SEGMENT_TABLE_CONFIG, SEGMENT_SCHEMA);
+ config.setOutDir(segmentOutputDir.getAbsolutePath());
+ config.setSegmentName(segmentName);
+ config.setTableName(RAW_TABLE_NAME);
+
+ // Build the segment
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(config, new GenericRowRecordReader(rows));
+ driver.build();
+
+ // Load and return the segment
+ File segmentDir = new File(segmentOutputDir, segmentName);
+ ImmutableSegmentImpl segment =
+ (ImmutableSegmentImpl) ImmutableSegmentLoader.load(segmentDir,
ReadMode.mmap);
+
+ return segment;
+ }
+
private static ImmutableSegmentImpl mockUploadedImmutableSegment(String
suffix,
ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
List<PrimaryKey> primaryKeys, Long creationTimeMs) {
@@ -897,6 +1041,33 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
return segment;
}
+ private static MutableSegment mockMutableSegmentWithDataSource(int
sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds,
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ int[] primaryKeys) {
+ MutableSegment segment = mock(MutableSegment.class);
+ when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
+ when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+ when(segment.getValidDocIds()).thenReturn(validDocIds);
+
+ DataSource dataSource = mock(DataSource.class);
+ ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
+ when(forwardIndex.isSingleValue()).thenReturn(true);
+ when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
+ when(forwardIndex.getInt(anyInt(), any())).thenAnswer(invocation -> {
+ int docId = invocation.getArgument(0);
+ if (primaryKeys != null && docId < primaryKeys.length) {
+ return primaryKeys[docId];
+ }
+ return docId;
+ });
+ when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+
+ when(segment.getDataSource(anyString())).thenReturn(dataSource);
+
when(segment.getDataSource(PRIMARY_KEY_COLUMNS.get(0))).thenReturn(dataSource);
+
+ return segment;
+ }
+
private static String getSegmentName(int sequenceNumber) {
return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber,
System.currentTimeMillis()).toString();
}
@@ -916,7 +1087,15 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
assertNotNull(recordLocation);
assertSame(recordLocation.getSegment(), segment);
assertEquals(recordLocation.getDocId(), docId);
- assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value,
comparisonValue);
+ // Handle both IntWrapper and Integer comparison values
+ Object actualComparisonValue = recordLocation.getComparisonValue();
+ if (actualComparisonValue instanceof IntWrapper) {
+ assertEquals(((IntWrapper) actualComparisonValue)._value,
comparisonValue);
+ } else if (actualComparisonValue instanceof Integer) {
+ assertEquals(((Integer) actualComparisonValue).intValue(),
comparisonValue);
+ } else {
+ fail("Unexpected comparison value type: " +
actualComparisonValue.getClass());
+ }
}
@Test
@@ -1724,4 +1903,414 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
return _value;
}
}
+
+ // Tests for upsert metadata reversion functionality
+ @Test
+ public void testPartialUpsertSameDocsReplacement() throws IOException {
+ // Test partial upserts with old and new segments having same number of
docs
+ // This test verifies that when all keys are present, no reversion occurs
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{1, 2, 3};
+ int[] timestamps1 = new int[]{100, 200, 300};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1,
validDocIds1, null,
+ primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+ timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 1, segment1, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 1, 200,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment1, 2, 300,
HashFunction.NONE);
+
+ // Create new segment with same 3 records but updated timestamps
+ int numRecords2 = 3;
+ int[] primaryKeys2 = new int[]{1, 2, 3};
+ int[] timestamps2 = new int[]{150, 250, 350};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+ List<RecordInfo> recordInfoList2 =
getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
+ timestamps2, null);
+
+ // Replace segment - should trigger reversion logic but no reversion
needed since all keys are present
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ // Verify replacement - all records should be in new segment
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 250,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 350,
HashFunction.NONE);
+
+ // New segment should have all docs valid
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
3);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testPartialUpsertOldSegmentTriggerReversion() throws IOException
{
+ // Test partial upserts with consuming (mutable) segment being sealed -
revert should be triggered
+ // Note: Revert logic only applies when sealing a consuming segment, not
for immutable segment replacement
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ // Create a mutable (consuming) segment with 4 records - use
mockMutableSegmentWithDataSource
+ // to support removeSegment which needs to read primary keys
+ int[] mutablePrimaryKeys = new int[]{1, 2, 3, 4};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1,
validDocIds1, null, mutablePrimaryKeys);
+
+ // Add records to the mutable segment
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(1), 0, 100, false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(2), 1, 200, false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(3), 2, 300, false));
+ upsertMetadataManager.addRecord(mutableSegment, new
RecordInfo(makePrimaryKey(4), 3, 400, false));
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 4);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 4);
+
+ int numRecords2 = 2;
+ int[] primaryKeys2 = new int[]{1, 3};
+ int[] timestamps2 = new int[]{150, 350};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+
+ // Replace mutable with immutable (consuming segment seal) - revert SHOULD
be triggered
+ upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+ getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
timestamps2, null).iterator(),
+ mutableSegment);
+
+ assertEquals(recordLocationMap.size(), 2);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
+ checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 1, 350,
HashFunction.NONE);
+
+ // Mutable segment's validDocIds should be 0 after removal
+ assertEquals(validDocIds1.getMutableRoaringBitmap().getCardinality(), 0);
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testPartialUpsertOldSegmentLesserDocs() throws IOException {
+ // Test partial upserts with old segment having fewer docs than new segment
+ PartialUpsertHandler mockPartialUpsertHandler =
mock(PartialUpsertHandler.class);
+ UpsertContext upsertContext =
_contextBuilder.setPartialUpsertHandler(mockPartialUpsertHandler)
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 2;
+ int[] primaryKeys1 = new int[]{1, 2};
+ int[] timestamps1 = new int[]{100, 200};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithTimestamps(1, validDocIds1, null,
primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
+ getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 2);
+
+ int numRecords2 = 4;
+ int[] primaryKeys2 = new int[]{1, 2, 3, 4};
+ int[] timestamps2 = new int[]{150, 250, 300, 400};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ // Verify state after replacement - all records should be in new segment
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 1, segment2, 0, 150,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 250,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 300,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 4, segment2, 3, 400,
HashFunction.NONE);
+
+ // New segment should have all docs valid
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
4);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testFullUpsertConsistencyNoneSameDocs() throws IOException {
+ // Test full upserts with consistency=NONE and same number of docs
+ UpsertContext upsertContext = _contextBuilder
+ .setConsistencyMode(UpsertConfig.ConsistencyMode.NONE)
+ .setDropOutOfOrderRecord(true)
+ .build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ // Create old segment with 3 records
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{10, 20, 30};
+ int[] timestamps1 = new int[]{1000, 2000, 3000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1,
validDocIds1, null,
+ primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
+ timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
+ int numRecords2 = 3;
+ int[] primaryKeys2 = new int[]{10, 20, 30};
+ int[] timestamps2 = new int[]{1500, 2500, 3500};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 20, segment2, 1, 2500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 30, segment2, 2, 3500,
HashFunction.NONE);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
3);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testFullUpsertConsistencyNoneOldSegmentMoreDocs()
+ throws Exception {
+ // Test full upserts with consistency=NONE where old segment has more docs
+ // Using real segments instead of mocks to avoid complex data source
mocking
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ String segmentName = "test_segment";
+
+ // Create first real segment with 3 records
+ int[] primaryKeys1 = new int[]{10, 30, 40};
+ int[] timestamps1 = new int[]{1500, 3500, 4000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 = createRealSegment(segmentName,
primaryKeys1, timestamps1, validDocIds1);
+
+ upsertMetadataManager.addSegment(segment1);
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
+ // Create second real segment with 2 records (subset of first)
+ int[] primaryKeys2 = new int[]{10, 30};
+ int[] timestamps2 = new int[]{1500, 3500};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment2 = createRealSegment(segmentName,
primaryKeys2, timestamps2, validDocIds2);
+
+ // Replace segment - RecordInfoReader will read real data from segment2
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ assertEquals(recordLocationMap.size(), 2);
+ checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 30, segment2, 1, 3500,
HashFunction.NONE);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+
+ // Clean up real segments
+ segment1.destroy();
+ segment2.destroy();
+ }
+
+ @Test
+ public void testFullUpsertRegularConsistencyMode()
+ throws IOException {
+ // Test full upserts with regular consistency mode (not NONE) - no
reversion should occur
+ UpsertContext upsertContext =
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.SNAPSHOT).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{100, 200, 300};
+ int[] timestamps1 = new int[]{10000, 20000, 30000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithTimestamps(1, validDocIds1, null,
primaryKeysList1, timestamps1);
+ List<RecordInfo> recordInfoList1 =
+ getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
timestamps1, null);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+ int numRecords2 = 1;
+ int[] primaryKeys2 = new int[]{100};
+ int[] timestamps2 = new int[]{15000};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegmentWithTimestamps(1, validDocIds2, null,
primaryKeysList2, timestamps2);
+
+ upsertMetadataManager.replaceSegment(segment2, segment1);
+
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 100, segment2, 0, 15000,
HashFunction.NONE);
+
+
assertEquals(segment1.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
2);
+
assertEquals(segment2.getValidDocIds().getMutableRoaringBitmap().getCardinality(),
1);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testRevertOnlyAppliesForConsumingSegmentSeal()
+ throws IOException {
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ int[] mutablePrimaryKeys = new int[]{10, 20, 30};
+ ThreadSafeMutableRoaringBitmap validDocIdsMutable = new
ThreadSafeMutableRoaringBitmap();
+ MutableSegment mutableSegment = mockMutableSegmentWithDataSource(1,
validDocIdsMutable, null, mutablePrimaryKeys);
+
+ upsertMetadataManager.addRecord(mutableSegment,
+ new RecordInfo(makePrimaryKey(10), 0, Integer.valueOf(1000), false));
+ upsertMetadataManager.addRecord(mutableSegment,
+ new RecordInfo(makePrimaryKey(20), 1, Integer.valueOf(2000), false));
+ upsertMetadataManager.addRecord(mutableSegment,
+ new RecordInfo(makePrimaryKey(30), 2, Integer.valueOf(3000), false));
+
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
assertEquals(validDocIdsMutable.getMutableRoaringBitmap().getCardinality(), 3);
+
+ int numRecords = 2;
+ int[] primaryKeys = new int[]{10, 20};
+ int[] timestamps = new int[]{1500, 2500};
+ ThreadSafeMutableRoaringBitmap validDocIdsImmutable = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeysList = getPrimaryKeyList(numRecords,
primaryKeys);
+ ImmutableSegmentImpl immutableSegment =
mockImmutableSegmentWithTimestamps(1, validDocIdsImmutable, null,
+ primaryKeysList, timestamps);
+
+ // This should trigger the revert logic since old segment is mutable
+ upsertMetadataManager.replaceSegment(immutableSegment,
validDocIdsImmutable, null,
+ getRecordInfoListWithIntegerComparison(numRecords, primaryKeys,
timestamps, null).iterator(), mutableSegment);
+
+ // After replacement, the records from immutable segment should be present
+ assertEquals(recordLocationMap.size(), 2);
+ checkRecordLocation(recordLocationMap, 10, immutableSegment, 0, 1500,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 20, immutableSegment, 1, 2500,
HashFunction.NONE);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
+
+ @Test
+ public void testNoRevertForImmutableSegmentReplacement()
+ throws IOException {
+ // Test that revert logic is NOT applied when replacing immutable segment
with another immutable segment
+ UpsertContext upsertContext =
+
_contextBuilder.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE).setDropOutOfOrderRecord(true).build();
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME,
0, upsertContext);
+
+ // Create first immutable segment with 3 records
+ int numRecords1 = 3;
+ int[] primaryKeys1 = new int[]{10, 20, 30};
+ int[] timestamps1 = new int[]{1000, 2000, 3000};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords1; i++) {
+ validDocIds1.add(i);
+ }
+ List<PrimaryKey> primaryKeysList1 = getPrimaryKeyList(numRecords1,
primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithTimestamps(1,
validDocIds1, null,
+ primaryKeysList1, timestamps1);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoListWithIntegerComparison(numRecords1, primaryKeys1,
timestamps1, null).iterator());
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ assertEquals(recordLocationMap.size(), 3);
+
+ int numRecords2 = 1;
+ int[] primaryKeys2 = new int[]{10};
+ int[] timestamps2 = new int[]{1500};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ for (int i = 0; i < numRecords2; i++) {
+ validDocIds2.add(i);
+ }
+ List<PrimaryKey> primaryKeysList2 = getPrimaryKeyList(numRecords2,
primaryKeys2);
+ ImmutableSegmentImpl segment2 = mockImmutableSegmentWithTimestamps(1,
validDocIds2, null,
+ primaryKeysList2, timestamps2);
+
+ long startTime = System.currentTimeMillis();
+ upsertMetadataManager.replaceSegment(segment2, validDocIds2, null,
+ getRecordInfoListWithIntegerComparison(numRecords2, primaryKeys2,
timestamps2, null).iterator(), segment1);
+ long duration = System.currentTimeMillis() - startTime;
+
+ assertTrue(duration < 1000, "Immutable-to-immutable replacement should
complete quickly, took: " + duration + "ms");
+
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 10, segment2, 0, 1500,
HashFunction.NONE);
+
+ upsertMetadataManager.stop();
+ upsertMetadataManager.close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]