This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 91289be930 Fix upsert replace (#9132) 91289be930 is described below commit 91289be930754dee74776e804ce388d005b06e3b Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Jul 29 13:29:44 2022 -0700 Fix upsert replace (#9132) - Fix upsert replace to - Correctly replace record location without removing the valid docs from the replaced segment - Correctly track and remove the remaining valid docs from the replaced segment - Track replaced segment so that the docs do not need to be removed again - Use segment lock to prevent adding/replacing/removing the same segment at same time which can cause race condition - Track an unexpected case where the primary keys show up in the wrong segment using meter `UPSERT_KEYS_IN_WRONG_SEGMENT` --- .../apache/pinot/common/metrics/ServerMeter.java | 3 +- .../manager/realtime/RealtimeTableDataManager.java | 7 +- .../upsert/PartitionUpsertMetadataManager.java | 191 ++++++++++++++++----- .../pinot/segment/local/utils}/SegmentLocks.java | 2 +- .../upsert/PartitionUpsertMetadataManagerTest.java | 93 ++++------ .../starter/helix/HelixInstanceDataManager.java | 1 + .../SegmentOnlineOfflineStateModelFactory.java | 1 + 7 files changed, 188 insertions(+), 110 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 22bb82abd4..c0d173fc76 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -41,8 +41,9 @@ public enum ServerMeter implements AbstractMetrics.Meter { REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false), REALTIME_PARTITION_MISMATCH("mismatch", false), REALTIME_DEDUP_DROPPED("rows", false), + UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false), PARTIAL_UPSERT_OUT_OF_ORDER("rows", false), - PARTIAL_UPSERT_ROWS_NOT_REPLACED("rows", false), + PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false), ROWS_WITH_ERRORS("rows", false), LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true), LLC_CONTROLLER_RESPONSE_COMMIT("messages", true), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 654133af11..0a09ff4087 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -430,12 +430,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType); } else { IndexSegment oldSegment = oldSegmentManager.getSegment(); - partitionUpsertMetadataManager.addSegment(immutableSegment); - // TODO: Fix the following issue about replacing segment in upsert metadata - // - We cannot directly invalidate the docs in the replaced segment because query might still running against it - // - We should track the valid docs in the replaced segment separately. Currently the docs won't be invalidate - // in the replaced segment due to the reason above, and will cause wrong logs/metrics emitted. - // partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment); + partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment); _logger.info("Replaced {} segment: {} of upsert-enabled table: {}", oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType); releaseSegment(oldSegmentManager); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java index 39455c569e..d1042fe5c1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java @@ -23,9 +23,12 @@ import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ServerGauge; @@ -36,6 +39,7 @@ import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.utils.HashUtils; import org.apache.pinot.segment.local.utils.RecordInfo; +import org.apache.pinot.segment.local.utils.SegmentLocks; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.MutableSegment; @@ -92,6 +96,9 @@ public class PartitionUpsertMetadataManager { @VisibleForTesting final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); + @VisibleForTesting + final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet(); + // Reused for reading previous record during partial upsert private final GenericRow _reuse = new GenericRow(); @@ -122,6 +129,12 @@ public class PartitionUpsertMetadataManager { * Initializes the upsert metadata for the given immutable segment. */ public void addSegment(ImmutableSegment segment) { + addSegment(segment, null, null); + } + + @VisibleForTesting + void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, + @Nullable Iterator<RecordInfo> recordInfoIterator) { String segmentName = segment.getSegmentName(); _logger.info("Adding segment: {}, current primary key count: {}", segmentName, _primaryKeyToRecordLocationMap.size()); @@ -131,10 +144,22 @@ public class PartitionUpsertMetadataManager { return; } - Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, - "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName, - _tableNameWithType); - addSegment((ImmutableSegmentImpl) segment, new ThreadSafeMutableRoaringBitmap(), getRecordInfoIterator(segment)); + Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); + segmentLock.lock(); + try { + Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, + "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName, + _tableNameWithType); + if (validDocIds == null) { + validDocIds = new ThreadSafeMutableRoaringBitmap(); + } + if (recordInfoIterator == null) { + recordInfoIterator = getRecordInfoIterator(segment); + } + addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, null, null); + } finally { + segmentLock.unlock(); + } // Update metrics int numPrimaryKeys = _primaryKeyToRecordLocationMap.size(); @@ -180,11 +205,13 @@ public class PartitionUpsertMetadataManager { } } - @VisibleForTesting - void addSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, - Iterator<RecordInfo> recordInfoIterator) { + private void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, + Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment, + @Nullable MutableRoaringBitmap validDocIdsForOldSegment) { String segmentName = segment.getSegmentName(); segment.enableUpsert(this, validDocIds); + + AtomicInteger numKeysInWrongSegment = new AtomicInteger(); while (recordInfoIterator.hasNext()) { RecordInfo recordInfo = recordInfoIterator.next(); _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), @@ -198,7 +225,7 @@ public class PartitionUpsertMetadataManager { // The current record is in the same segment // Update the record location when there is a tie to keep the newer record. Note that the record info // iterator will return records with incremental doc ids. - if (segment == currentSegment) { + if (currentSegment == segment) { if (comparisonResult >= 0) { validDocIds.replace(currentRecordLocation.getDocId(), recordInfo.getDocId()); return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); @@ -211,9 +238,25 @@ public class PartitionUpsertMetadataManager { // This could happen when committing a consuming segment, or reloading a completed segment. In this // case, we want to update the record location when there is a tie because the record locations should // point to the new added segment instead of the old segment being replaced. Also, do not update the valid - // doc ids for the old segment because it has not been replaced yet. + // doc ids for the old segment because it has not been replaced yet. We pass in an optional valid doc ids + // snapshot for the old segment, which can be updated and used to track the docs not replaced yet. + if (currentSegment == oldSegment) { + if (comparisonResult >= 0) { + validDocIds.add(recordInfo.getDocId()); + if (validDocIdsForOldSegment != null) { + validDocIdsForOldSegment.remove(currentRecordLocation.getDocId()); + } + return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); + } else { + return currentRecordLocation; + } + } + + // This should not happen because the previously replaced segment should have all keys removed. We still + // handle it here, and also track the number of keys not properly replaced previously. String currentSegmentName = currentSegment.getSegmentName(); - if (segmentName.equals(currentSegmentName)) { + if (currentSegmentName.equals(segmentName)) { + numKeysInWrongSegment.getAndIncrement(); if (comparisonResult >= 0) { validDocIds.add(recordInfo.getDocId()); return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); @@ -243,6 +286,11 @@ public class PartitionUpsertMetadataManager { } }); } + int numKeys = numKeysInWrongSegment.get(); + if (numKeys > 0) { + _logger.warn("Found {} primary keys in the wrong segment when adding segment: {}", numKeys, segmentName); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_IN_WRONG_SEGMENT, numKeys); + } } /** @@ -285,36 +333,73 @@ public class PartitionUpsertMetadataManager { /** * Replaces the upsert metadata for the old segment with the new immutable segment. */ - public void replaceSegment(ImmutableSegment newSegment, IndexSegment oldSegment) { - String segmentName = newSegment.getSegmentName(); + public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) { + replaceSegment(segment, null, null, oldSegment); + } + + @VisibleForTesting + void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, + @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) { + String segmentName = segment.getSegmentName(); Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()), "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}", _tableNameWithType, oldSegment.getSegmentName(), segmentName); - _logger.info("Replacing {} segment: {}", oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", - segmentName); - - addSegment(newSegment); - - MutableRoaringBitmap validDocIds = - oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null; - if (validDocIds != null && !validDocIds.isEmpty()) { - int numDocsNotReplaced = validDocIds.getCardinality(); - if (_partialUpsertHandler != null) { - // For partial-upsert table, because we do not restore the original record location when removing the primary - // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a consuming - // segment is replaced by a committed segment that is consumed from a different server with different records - // (some stream consumer cannot guarantee consuming the messages in the same order). - _logger.error("{} primary keys not replaced when replacing segment: {} for partial-upsert table. This can " - + "potentially cause inconsistency between replicas", numDocsNotReplaced, segmentName); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_ROWS_NOT_REPLACED, - numDocsNotReplaced); + _logger.info("Replacing {} segment: {}, current primary key count: {}", + oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, + _primaryKeyToRecordLocationMap.size()); + + Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); + segmentLock.lock(); + try { + MutableRoaringBitmap validDocIdsForOldSegment = + oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null; + if (segment instanceof EmptyIndexSegment) { + _logger.info("Skip adding empty segment: {}", segmentName); } else { - _logger.info("{} primary keys not replaced when replacing segment: {}", numDocsNotReplaced, segmentName); + Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, + "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName, + _tableNameWithType); + if (validDocIds == null) { + validDocIds = new ThreadSafeMutableRoaringBitmap(); + } + if (recordInfoIterator == null) { + recordInfoIterator = getRecordInfoIterator(segment); + } + addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment, + validDocIdsForOldSegment); } - removeSegment(oldSegment); + + if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) { + int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality(); + if (_partialUpsertHandler != null) { + // For partial-upsert table, because we do not restore the original record location when removing the primary + // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a + // consuming segment is replaced by a committed segment that is consumed from a different server with + // different records (some stream consumer cannot guarantee consuming the messages in the same order). + _logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This " + + "can potentially cause inconsistency between replicas", numKeysNotReplaced, segmentName); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED, + numKeysNotReplaced); + } else { + _logger.info("Found {} primary keys not replaced when replacing segment: {}", numKeysNotReplaced, + segmentName); + } + removeSegment(oldSegment, validDocIdsForOldSegment); + } + } finally { + segmentLock.unlock(); + } + + if (!(oldSegment instanceof EmptyIndexSegment)) { + _replacedSegments.add(oldSegment); } - _logger.info("Finished replacing segment: {}", segmentName); + // Update metrics + int numPrimaryKeys = _primaryKeyToRecordLocationMap.size(); + _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, + numPrimaryKeys); + + _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); } /** @@ -326,14 +411,37 @@ public class PartitionUpsertMetadataManager { segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _primaryKeyToRecordLocationMap.size()); - MutableRoaringBitmap validDocIds = - segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null; - if (validDocIds == null || validDocIds.isEmpty()) { - _logger.info("Skip removing segment without valid docs: {}", segmentName); + if (_replacedSegments.remove(segment)) { + _logger.info("Skip removing replaced segment: {}", segmentName); return; } - _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName); + Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); + segmentLock.lock(); + try { + MutableRoaringBitmap validDocIds = + segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null; + if (validDocIds == null || validDocIds.isEmpty()) { + _logger.info("Skip removing segment without valid docs: {}", segmentName); + return; + } + + _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName); + removeSegment(segment, validDocIds); + } finally { + segmentLock.unlock(); + } + + // Update metrics + int numPrimaryKeys = _primaryKeyToRecordLocationMap.size(); + _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, + numPrimaryKeys); + + _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); + } + + private void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) { + assert !validDocIds.isEmpty(); PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]); PeekableIntIterator iterator = validDocIds.getIntIterator(); while (iterator.hasNext()) { @@ -347,13 +455,6 @@ public class PartitionUpsertMetadataManager { return recordLocation; }); } - - // Update metrics - int numPrimaryKeys = _primaryKeyToRecordLocationMap.size(); - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, - numPrimaryKeys); - - _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); } /** diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentLocks.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java similarity index 96% rename from pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentLocks.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java index 0a14d05a7b..abbd2c906c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentLocks.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.server.starter.helix; +package org.apache.pinot.segment.local.utils; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java index 639f6321e4..aa1392d6df 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java @@ -45,8 +45,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; public class PartitionUpsertMetadataManagerTest { @@ -54,13 +54,13 @@ public class PartitionUpsertMetadataManagerTest { private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); @Test - public void testAddSegment() { - verifyAddSegment(HashFunction.NONE); - verifyAddSegment(HashFunction.MD5); - verifyAddSegment(HashFunction.MURMUR3); + public void testAddReplaceRemoveSegment() { + verifyAddReplaceRemoveSegment(HashFunction.NONE); + verifyAddReplaceRemoveSegment(HashFunction.MD5); + verifyAddReplaceRemoveSegment(HashFunction.MURMUR3); } - private void verifyAddSegment(HashFunction hashFunction) { + private void verifyAddReplaceRemoveSegment(HashFunction hashFunction) { PartitionUpsertMetadataManager upsertMetadataManager = new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), "timeCol", hashFunction, null, mock(ServerMetrics.class)); @@ -76,6 +76,7 @@ public class PartitionUpsertMetadataManagerTest { List<RecordInfo> recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps); upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator()); // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} + assertEquals(recordLocationMap.size(), 3); checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction); @@ -91,6 +92,7 @@ public class PartitionUpsertMetadataManagerTest { getRecordInfoList(numRecords, primaryKeys, timestamps).iterator()); // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} + assertEquals(recordLocationMap.size(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -99,9 +101,11 @@ public class PartitionUpsertMetadataManagerTest { assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); // Add an empty segment - upsertMetadataManager.addSegment(new EmptyIndexSegment(mock(SegmentMetadataImpl.class))); + EmptyIndexSegment emptySegment = mockEmptySegment(3); + upsertMetadataManager.addSegment(emptySegment); // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} + assertEquals(recordLocationMap.size(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -112,10 +116,11 @@ public class PartitionUpsertMetadataManagerTest { // Replace (reload) the first segment ThreadSafeMutableRoaringBitmap newValidDocIds1 = new ThreadSafeMutableRoaringBitmap(); ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, primaryKeys1); - upsertMetadataManager.addSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator()); - // original segment1: 1 -> {4, 120} + upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator(), segment1); + // original segment1: 1 -> {4, 120} (not in the map) // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} + assertEquals(recordLocationMap.size(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); @@ -123,34 +128,42 @@ public class PartitionUpsertMetadataManagerTest { assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); - assertSame(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(1), hashFunction)).getSegment(), - newSegment1); + assertEquals(upsertMetadataManager._replacedSegments, Collections.singleton(segment1)); // Remove the original segment1 upsertMetadataManager.removeSegment(segment1); // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} + assertEquals(recordLocationMap.size(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); - assertSame(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(1), hashFunction)).getSegment(), - newSegment1); + assertTrue(upsertMetadataManager._replacedSegments.isEmpty()); - // Remove an empty segment - upsertMetadataManager.removeSegment(new EmptyIndexSegment(mock(SegmentMetadataImpl.class))); + // Remove the empty segment + upsertMetadataManager.removeSegment(emptySegment); // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} + assertEquals(recordLocationMap.size(), 4); checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); - assertSame(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(1), hashFunction)).getSegment(), - newSegment1); + + // Remove segment2 + upsertMetadataManager.removeSegment(segment2); + // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map) + // new segment1: 1 -> {4, 120} + assertEquals(recordLocationMap.size(), 1); + checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); + assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); } private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) { @@ -179,6 +192,12 @@ public class PartitionUpsertMetadataManagerTest { return segment; } + private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) { + SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); + when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber)); + return new EmptyIndexSegment(segmentMetadata); + } + private static MutableSegment mockMutableSegment(int sequenceNumber, ThreadSafeMutableRoaringBitmap validDocIds) { MutableSegment segment = mock(MutableSegment.class); when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber)); @@ -272,46 +291,6 @@ public class PartitionUpsertMetadataManagerTest { assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3}); } - @Test - public void testRemoveSegment() { - verifyRemoveSegment(HashFunction.NONE); - verifyRemoveSegment(HashFunction.MD5); - verifyRemoveSegment(HashFunction.MURMUR3); - } - - private void verifyRemoveSegment(HashFunction hashFunction) { - PartitionUpsertMetadataManager upsertMetadataManager = - new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), "timeCol", - hashFunction, null, mock(ServerMetrics.class)); - Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; - - // Add 2 segments - // segment1: 0 -> {0, 100}, 1 -> {1, 100} - // segment2: 2 -> {0, 100}, 3 -> {0, 100} - int numRecords = 2; - int[] primaryKeys = new int[]{0, 1}; - int[] timestamps = new int[]{100, 100}; - ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); - ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, getPrimaryKeyList(numRecords, primaryKeys)); - upsertMetadataManager.addSegment(segment1, validDocIds1, - getRecordInfoList(numRecords, primaryKeys, timestamps).iterator()); - - primaryKeys = new int[]{2, 3}; - ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); - ImmutableSegmentImpl segment2 = mockImmutableSegment(2, validDocIds2, getPrimaryKeyList(numRecords, primaryKeys)); - upsertMetadataManager.addSegment(segment2, validDocIds2, - getRecordInfoList(numRecords, primaryKeys, timestamps).iterator()); - - // Remove the first segment - upsertMetadataManager.removeSegment(segment1); - // segment2: 2 -> {0, 100}, 3 -> {0, 100} - assertNull(recordLocationMap.get(makePrimaryKey(0))); - assertNull(recordLocationMap.get(makePrimaryKey(1))); - checkRecordLocation(recordLocationMap, 2, segment2, 0, 100, hashFunction); - checkRecordLocation(recordLocationMap, 3, segment2, 1, 100, hashFunction); - assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); - } - @Test public void testHashPrimaryKey() { PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"}); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 9904b95cb0..132fe44fe9 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -59,6 +59,7 @@ import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig; import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.SegmentLocks; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentMetadata; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index d2e4c30ca6..9e541a29f2 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -38,6 +38,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.utils.SegmentLocks; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org