This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2c57462d3b Avoid upsert metadata access after the manager is closed (#10251) 2c57462d3b is described below commit 2c57462d3bd3e5b2e15283e45f263adb773f1dc8 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Feb 9 10:29:38 2023 -0800 Avoid upsert metadata access after the manager is closed (#10251) --- .../core/data/manager/BaseTableDataManager.java | 8 + .../realtime/LLRealtimeSegmentDataManager.java | 5 + .../manager/realtime/RealtimeTableDataManager.java | 12 +- .../local/data/manager/TableDataManager.java | 2 + .../upsert/BasePartitionUpsertMetadataManager.java | 161 ++++++++++++++++++--- ...oncurrentMapPartitionUpsertMetadataManager.java | 10 +- .../ConcurrentMapTableUpsertMetadataManager.java | 12 +- .../upsert/PartitionUpsertMetadataManager.java | 5 + .../local/upsert/TableUpsertMetadataManager.java | 5 + ...rrentMapPartitionUpsertMetadataManagerTest.java | 43 +++++- 10 files changed, 223 insertions(+), 40 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 6ce2c3e451..7a3a32f903 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -107,6 +107,8 @@ public abstract class BaseTableDataManager implements TableDataManager { // Cache used for identifying segments which could not be acquired since they were recently deleted. protected Cache<String, String> _recentlyDeletedSegments; + protected volatile boolean _shutDown; + @Override public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId, ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager, @@ -182,12 +184,18 @@ public abstract class BaseTableDataManager implements TableDataManager { @Override public void shutDown() { _logger.info("Shutting down table data manager for table: {}", _tableNameWithType); + _shutDown = true; doShutdown(); _logger.info("Shut down table data manager for table: {}", _tableNameWithType); } protected abstract void doShutdown(); + @Override + public boolean isShutDown() { + return _shutDown; + } + /** * {@inheritDoc} * <p>If one segment already exists with the same name, replaces it with the new one. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 6e43a77202..4d82f6953e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -873,6 +873,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { closeStreamConsumers(); + // Do not allow building segment when table data manager is already shut down + if (_realtimeTableDataManager.isShutDown()) { + _segmentLogger.warn("Table data manager is already shut down"); + return null; + } try { final long startTimeMillis = now(); if (_segBuildSemaphore != null) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 9315770943..96adea8ee6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -246,14 +246,20 @@ public class RealtimeTableDataManager extends BaseTableDataManager { @Override protected void doShutdown() { if (_tableUpsertMetadataManager != null) { + // Stop the upsert metadata manager first to prevent removing metadata when destroying segments + _tableUpsertMetadataManager.stop(); + for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) { + segmentDataManager.destroy(); + } try { _tableUpsertMetadataManager.close(); } catch (IOException e) { _logger.warn("Cannot close upsert metadata manager properly for table: {}", _tableNameWithType, e); } - } - for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) { - segmentDataManager.destroy(); + } else { + for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) { + segmentDataManager.destroy(); + } } if (_leaseExtender != null) { _leaseExtender.shutDown(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 121e47f6f6..ae53ce3383 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -62,6 +62,8 @@ public interface TableDataManager { */ void shutDown(); + boolean isShutDown(); + /** * Adds a loaded immutable segment into the table. */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 6bc9e94067..361052fdc2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -37,8 +38,10 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp import org.apache.pinot.segment.local.utils.SegmentLocks; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.MutableSegment; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.data.readers.GenericRow; import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +64,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @VisibleForTesting public final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet(); - protected volatile boolean _closed = false; + protected volatile boolean _stopped = false; + // Initialize with 1 pending operation to indicate the metadata manager can take more operations + protected final AtomicInteger _numPendingOperations = new AtomicInteger(1); protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE; protected int _numOutOfOrderEvents = 0; @@ -87,6 +92,19 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @Override public void addSegment(ImmutableSegment segment) { + if (_stopped) { + _logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName()); + return; + } + startOperation(); + try { + doAddSegment(segment); + } finally { + finishOperation(); + } + } + + protected void doAddSegment(ImmutableSegment segment) { String segmentName = segment.getSegmentName(); _logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys()); @@ -162,8 +180,38 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment); + @Override + public void addRecord(MutableSegment segment, RecordInfo recordInfo) { + if (_stopped) { + _logger.debug("Skip adding record to segment: {} because metadata manager is already stopped", + segment.getSegmentName()); + return; + } + startOperation(); + try { + doAddRecord(segment, recordInfo); + } finally { + finishOperation(); + } + } + + protected abstract void doAddRecord(MutableSegment segment, RecordInfo recordInfo); + @Override public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) { + if (_stopped) { + _logger.info("Skip replacing segment: {} because metadata manager is already stopped", segment.getSegmentName()); + return; + } + startOperation(); + try { + doReplaceSegment(segment, oldSegment); + } finally { + finishOperation(); + } + } + + protected void doReplaceSegment(ImmutableSegment segment, IndexSegment oldSegment) { String segmentName = segment.getSegmentName(); Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()), "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}", @@ -250,34 +298,43 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @Override public void removeSegment(IndexSegment segment) { String segmentName = segment.getSegmentName(); - _logger.info("Removing {} segment: {}, current primary key count: {}", - segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys()); - if (_replacedSegments.remove(segment)) { _logger.info("Skip removing replaced segment: {}", segmentName); return; } + // Allow persisting valid doc ids snapshot after metadata manager is stopped + MutableRoaringBitmap validDocIds = + segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null; + if (_enableSnapshot && segment instanceof ImmutableSegmentImpl) { + ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds); + } + if (validDocIds == null || validDocIds.isEmpty()) { + _logger.info("Skip removing segment without valid docs: {}", segmentName); + return; + } + if (_stopped) { + _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName); + return; + } + startOperation(); + try { + doRemoveSegment(segment); + } finally { + finishOperation(); + } + } + + protected void doRemoveSegment(IndexSegment segment) { + String segmentName = segment.getSegmentName(); + _logger.info("Removing {} segment: {}, current primary key count: {}", + segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys()); Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); segmentLock.lock(); try { - MutableRoaringBitmap validDocIds = - segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null; - - if (_enableSnapshot && segment instanceof ImmutableSegmentImpl && validDocIds != null) { - ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds); - } - - if (_closed) { - _logger.info("Skip removing segment: {} because metadata manager is already closed", segment); - return; - } - - if (validDocIds == null || validDocIds.isEmpty()) { - _logger.info("Skip removing segment without valid docs: {}", segmentName); - return; - } - + assert segment.getValidDocIds() != null; + MutableRoaringBitmap validDocIds = segment.getValidDocIds().getMutableRoaringBitmap(); + assert validDocIds != null; _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName); removeSegment(segment, validDocIds); } finally { @@ -292,6 +349,26 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); } + @Override + public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) { + // Directly return the record when partial-upsert is not enabled + if (_partialUpsertHandler == null) { + return record; + } + if (_stopped) { + _logger.debug("Skip updating record because metadata manager is already stopped"); + return record; + } + startOperation(); + try { + return doUpdateRecord(record, recordInfo); + } finally { + finishOperation(); + } + } + + protected abstract GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo); + protected void handleOutOfOrderEvent(Object currentComparisonValue, Object recordComparisonValue) { _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L); _numOutOfOrderEvents++; @@ -305,10 +382,48 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } } + protected void startOperation() { + _numPendingOperations.getAndIncrement(); + } + + protected void finishOperation() { + if (_numPendingOperations.decrementAndGet() == 0) { + synchronized (_numPendingOperations) { + _numPendingOperations.notifyAll(); + } + } + } + + @Override + public void stop() { + _stopped = true; + int numPendingOperations = _numPendingOperations.decrementAndGet(); + _logger.info("Stopped the metadata manager with {} pending operations, current primary key count: {}", + numPendingOperations, getNumPrimaryKeys()); + } + @Override public void close() throws IOException { - _logger.info("Closing the metadata manager, current primary key count: {}", getNumPrimaryKeys()); - _closed = true; + Preconditions.checkState(_stopped, "Must stop the metadata manager before closing it"); + _logger.info("Closing the metadata manager"); + synchronized (_numPendingOperations) { + int numPendingOperations; + while ((numPendingOperations = _numPendingOperations.get()) != 0) { + _logger.info("Waiting for {} pending operations to finish", numPendingOperations); + try { + _numPendingOperations.wait(); + } catch (InterruptedException e) { + throw new RuntimeException( + String.format("Interrupted while waiting for %d pending operations to finish", numPendingOperations), e); + } + } + } + doClose(); + _logger.info("Closed the metadata manager"); + } + + protected void doClose() + throws IOException { } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index e11c1899f3..b1687afdf8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -182,7 +182,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp } @Override - public void addRecord(MutableSegment segment, RecordInfo recordInfo) { + protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) { ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds()); _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (primaryKey, currentRecordLocation) -> { @@ -217,12 +217,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp } @Override - public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) { - // Directly return the record when partial-upsert is not enabled - if (_partialUpsertHandler == null) { - return record; - } - + protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) { + assert _partialUpsertHandler != null; AtomicReference<GenericRow> previousRecordReference = new AtomicReference<>(); RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent( HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java index 3f830bb3a7..67d6fe773b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java @@ -39,12 +39,18 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta _comparisonColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics)); } + @Override + public void stop() { + for (ConcurrentMapPartitionUpsertMetadataManager metadataManager : _partitionMetadataManagerMap.values()) { + metadataManager.stop(); + } + } + @Override public void close() throws IOException { - for (ConcurrentMapPartitionUpsertMetadataManager partitionUpsertMetadataManager - : _partitionMetadataManagerMap.values()) { - partitionUpsertMetadataManager.close(); + for (ConcurrentMapPartitionUpsertMetadataManager metadataManager : _partitionMetadataManagerMap.values()) { + metadataManager.close(); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java index ef5ec7c414..dd07143fd3 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java @@ -83,4 +83,9 @@ public interface PartitionUpsertMetadataManager extends Closeable { * Returns the merged record when partial-upsert is enabled. */ GenericRow updateRecord(GenericRow record, RecordInfo recordInfo); + + /** + * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted. + */ + void stop(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java index de46879712..52007f16fe 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java @@ -38,4 +38,9 @@ public interface TableUpsertMetadataManager extends Closeable { PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId); UpsertConfig.Mode getUpsertMode(); + + /** + * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted. + */ + void stop(); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index a0f18aaf14..6b4bf34368 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.upsert; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -60,7 +61,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); @Test - public void testAddReplaceRemoveSegment() { + public void testAddReplaceRemoveSegment() + throws IOException { verifyAddReplaceRemoveSegment(HashFunction.NONE, false); verifyAddReplaceRemoveSegment(HashFunction.MD5, false); verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, false); @@ -69,7 +71,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, true); } - private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot) { + private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot) + throws IOException { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), "timeCol", hashFunction, null, false, mock(ServerMetrics.class)); @@ -196,6 +199,19 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); + + // Stop the metadata manager + upsertMetadataManager.stop(); + + // Remove new segment1, should be no-op + upsertMetadataManager.removeSegment(newSegment1); + // new segment1: 1 -> {4, 120} + assertEquals(recordLocationMap.size(), 1); + checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); + assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); + + // Close the metadata manager + upsertMetadataManager.close(); } private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) { @@ -274,13 +290,15 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { } @Test - public void testAddRecord() { + public void testAddRecord() + throws IOException { verifyAddRecord(HashFunction.NONE); verifyAddRecord(HashFunction.MD5); verifyAddRecord(HashFunction.MURMUR3); } - private void verifyAddRecord(HashFunction hashFunction) { + private void verifyAddRecord(HashFunction hashFunction) + throws IOException { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), "timeCol", hashFunction, null, false, mock(ServerMetrics.class)); @@ -339,6 +357,23 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3}); + + // Stop the metadata manager + upsertMetadataManager.stop(); + + // Add record should be no-op + upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 4, new IntWrapper(120))); + // segment1: 1 -> {1, 120} + // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100} + checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3}); + + // Close the metadata manager + upsertMetadataManager.close(); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org