This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3c92b32711 Take upsert snapshot when creating new consuming segment (#10928) 3c92b32711 is described below commit 3c92b327114c4534f354e8a8f2755197a6439fd5 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Jun 22 11:59:10 2023 -0700 Take upsert snapshot when creating new consuming segment (#10928) --- .../realtime/LLRealtimeSegmentDataManager.java | 31 ++-- .../immutable/ImmutableSegmentImpl.java | 9 +- .../upsert/BasePartitionUpsertMetadataManager.java | 158 +++++++++++++++------ .../upsert/PartitionUpsertMetadataManager.java | 6 + .../ImmutableSegmentImplUpsertSnapshotTest.java | 149 ------------------- ...rrentMapPartitionUpsertMetadataManagerTest.java | 17 ++- 6 files changed, 161 insertions(+), 209 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 0e1a70bfc2..99f3c6d802 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -232,6 +232,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // consuming. private final AtomicBoolean _acquiredConsumerSemaphore; private final ServerMetrics _serverMetrics; + private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager; private final BooleanSupplier _isReadyToConsumeData; private final MutableSegmentImpl _realtimeSegment; private volatile StreamPartitionMsgOffset _currentOffset; @@ -400,13 +401,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // anymore. Remove the file if it exists. removeSegmentFile(); - if (!_isReadyToConsumeData.getAsBoolean()) { - do { - //noinspection BusyWait - Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS); - } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean()); - } - _numRowsErrored = 0; final long idlePipeSleepTimeMillis = 100; final long idleTimeoutMillis = _partitionLevelStreamConfig.getIdleTimeoutMillis(); @@ -662,6 +656,27 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { long catchUpTimeMillis = 0L; _startTimeMs = now(); try { + if (!_isReadyToConsumeData.getAsBoolean()) { + do { + //noinspection BusyWait + Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS); + } while (!_shouldStop && !_isReadyToConsumeData.getAsBoolean()); + } + + // TODO: + // When reaching here, the current consuming segment has already acquired the consumer semaphore, but there is + // no guarantee that the previous consuming segment is already persisted (replaced with immutable segment). It + // can potentially cause the following problems: + // 1. The snapshot for the previous consuming segment might not be taken since it is not persisted yet + // 2. If the previous consuming segment is dropped but immutable segment is not downloaded and replaced yet, + // it might cause inconsistency (especially for partial upsert because events are not consumed in sequence) + // To address this problem, we should consider releasing the consumer semaphore after the consuming segment is + // persisted. + // Take upsert snapshot before starting consuming events + if (_partitionUpsertMetadataManager != null) { + _partitionUpsertMetadataManager.takeSnapshot(); + } + while (!_state.isFinal()) { if (_state.shouldConsume()) { consumeLoop(); // Consume until we reached the end criteria, or we are stopped. @@ -863,7 +878,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { if (_partitionMetadataProvider == null) { createPartitionMetadataProvider("Get Partition Lag State"); } - ; return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap); } @@ -1309,6 +1323,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _indexLoadingConfig = indexLoadingConfig; _schema = schema; _serverMetrics = serverMetrics; + _partitionUpsertMetadataManager = partitionUpsertMetadataManager; _isReadyToConsumeData = isReadyToConsumeData; _segmentVersion = indexLoadingConfig.getSegmentVersion(); _instanceId = _realtimeTableDataManager.getServerInstance(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java index 1e00f85077..66875eb726 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java @@ -123,7 +123,7 @@ public class ImmutableSegmentImpl implements ImmutableSegment { return null; } - public void persistValidDocIdsSnapshot(MutableRoaringBitmap validDocIds) { + public void persistValidDocIdsSnapshot() { File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile(); try { if (validDocIdsSnapshotFile.exists()) { @@ -132,14 +132,15 @@ public class ImmutableSegmentImpl implements ImmutableSegment { return; } } + MutableRoaringBitmap validDocIdsSnapshot = _validDocIds.getMutableRoaringBitmap(); try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(validDocIdsSnapshotFile))) { - validDocIds.serialize(dataOutputStream); + validDocIdsSnapshot.serialize(dataOutputStream); } LOGGER.info("Persisted valid doc ids for segment: {} with: {} valid docs", getSegmentName(), - validDocIds.getCardinality()); + validDocIdsSnapshot.getCardinality()); } catch (Exception e) { LOGGER.warn("Caught exception while persisting valid doc ids to snapshot file: {}, skipping", - validDocIdsSnapshotFile); + validDocIdsSnapshotFile, e); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 6556ffce8a..1501cd28e6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ServerGauge; @@ -61,8 +63,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps protected final ServerMetrics _serverMetrics; protected final Logger _logger; - @VisibleForTesting - public final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet(); + // Tracks all the segments managed by this manager (excluding EmptySegment) + protected final Set<IndexSegment> _trackedSegments = ConcurrentHashMap.newKeySet(); + + // NOTE: We do not persist snapshot on the first consuming segment because most segments might not be loaded yet + protected volatile boolean _gotFirstConsumingSegment = false; + protected final ReadWriteLock _snapshotLock; protected volatile boolean _stopped = false; // Initialize with 1 pending operation to indicate the metadata manager can take more operations @@ -81,6 +87,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _hashFunction = hashFunction; _partialUpsertHandler = partialUpsertHandler; _enableSnapshot = enableSnapshot; + _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null; _serverMetrics = serverMetrics; _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); } @@ -92,44 +99,51 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @Override public void addSegment(ImmutableSegment segment) { + String segmentName = segment.getSegmentName(); if (_stopped) { _logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName()); return; } + if (segment instanceof EmptyIndexSegment) { + _logger.info("Skip adding empty segment: {}", segmentName); + return; + } + Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, + "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName, + _tableNameWithType); + + if (_enableSnapshot) { + _snapshotLock.readLock().lock(); + } startOperation(); try { - doAddSegment(segment); + doAddSegment((ImmutableSegmentImpl) segment); + _trackedSegments.add(segment); } finally { finishOperation(); + if (_enableSnapshot) { + _snapshotLock.readLock().unlock(); + } } } - protected void doAddSegment(ImmutableSegment segment) { + protected void doAddSegment(ImmutableSegmentImpl segment) { String segmentName = segment.getSegmentName(); _logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys()); + long startTimeMs = System.currentTimeMillis(); - if (segment instanceof EmptyIndexSegment) { - _logger.info("Skip adding empty segment: {}", segmentName); - return; - } - - Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, - "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName, - _tableNameWithType); - - ImmutableSegmentImpl immutableSegmentImpl = (ImmutableSegmentImpl) segment; MutableRoaringBitmap validDocIds; if (_enableSnapshot) { - validDocIds = immutableSegmentImpl.loadValidDocIdsFromSnapshot(); + validDocIds = segment.loadValidDocIdsFromSnapshot(); if (validDocIds != null && validDocIds.isEmpty()) { _logger.info("Skip adding segment: {} without valid doc, current primary key count: {}", segment.getSegmentName(), getNumPrimaryKeys()); - immutableSegmentImpl.enableUpsert(this, new ThreadSafeMutableRoaringBitmap()); + segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap()); return; } } else { validDocIds = null; - immutableSegmentImpl.deleteValidDocIdsSnapshot(); + segment.deleteValidDocIdsSnapshot(); } try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns, @@ -141,7 +155,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps recordInfoIterator = UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs()); } - addSegment(immutableSegmentImpl, null, recordInfoIterator); + addSegment(segment, null, recordInfoIterator); } catch (Exception e) { throw new RuntimeException( String.format("Caught exception while adding segment: %s, table: %s", segmentName, _tableNameWithType), e); @@ -152,7 +166,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, numPrimaryKeys); - _logger.info("Finished adding segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); + _logger.info("Finished adding segment: {} in {}ms, current primary key count: {}", segmentName, + System.currentTimeMillis() - startTimeMs, numPrimaryKeys); } /** @@ -188,9 +203,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps segment.getSegmentName()); return; } + + // NOTE: We don't acquire snapshot read lock here because snapshot is always taken before a new consuming segment + // starts consuming, so it won't overlap with this method + _gotFirstConsumingSegment = true; startOperation(); try { doAddRecord(segment, recordInfo); + _trackedSegments.add(segment); } finally { finishOperation(); } @@ -204,11 +224,22 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _logger.info("Skip replacing segment: {} because metadata manager is already stopped", segment.getSegmentName()); return; } + + if (_enableSnapshot) { + _snapshotLock.readLock().lock(); + } startOperation(); try { doReplaceSegment(segment, oldSegment); + if (!(segment instanceof EmptyIndexSegment)) { + _trackedSegments.add(segment); + } + _trackedSegments.remove(oldSegment); } finally { finishOperation(); + if (_enableSnapshot) { + _snapshotLock.readLock().lock(); + } } } @@ -219,6 +250,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _tableNameWithType, oldSegment.getSegmentName(), segmentName); _logger.info("Replacing {} segment: {}, current primary key count: {}", oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys()); + long startTimeMs = System.currentTimeMillis(); if (segment instanceof EmptyIndexSegment) { _logger.info("Skip adding empty segment: {}", segmentName); @@ -241,7 +273,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, numPrimaryKeys); - _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); + _logger.info("Finished replacing segment: {} in {}ms, current primary key count: {}", segmentName, + System.currentTimeMillis() - startTimeMs, numPrimaryKeys); } /** @@ -288,10 +321,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } finally { segmentLock.unlock(); } - - if (!(oldSegment instanceof EmptyIndexSegment)) { - _replacedSegments.add(oldSegment); - } } protected abstract void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds); @@ -299,29 +328,27 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @Override public void removeSegment(IndexSegment segment) { String segmentName = segment.getSegmentName(); - if (_replacedSegments.remove(segment)) { - _logger.info("Skip removing replaced segment: {}", segmentName); + if (_stopped) { + _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName); return; } - // Allow persisting valid doc ids snapshot after metadata manager is stopped - MutableRoaringBitmap validDocIds = - segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null; - if (_enableSnapshot && segment instanceof ImmutableSegmentImpl) { - ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds); - } - if (validDocIds == null || validDocIds.isEmpty()) { - _logger.info("Skip removing segment without valid docs: {}", segmentName); + if (!_trackedSegments.contains(segment)) { + _logger.info("Skip removing untracked (replaced or empty) segment: {}", segmentName); return; } - if (_stopped) { - _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName); - return; + + if (_enableSnapshot) { + _snapshotLock.readLock().lock(); } startOperation(); try { doRemoveSegment(segment); + _trackedSegments.remove(segment); } finally { finishOperation(); + if (_enableSnapshot) { + _snapshotLock.readLock().unlock(); + } } } @@ -329,13 +356,18 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps String segmentName = segment.getSegmentName(); _logger.info("Removing {} segment: {}, current primary key count: {}", segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys()); + long startTimeMs = System.currentTimeMillis(); + + MutableRoaringBitmap validDocIds = + segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null; + if (validDocIds == null || validDocIds.isEmpty()) { + _logger.info("Skip removing segment without valid docs: {}", segmentName); + return; + } Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName); segmentLock.lock(); try { - assert segment.getValidDocIds() != null; - MutableRoaringBitmap validDocIds = segment.getValidDocIds().getMutableRoaringBitmap(); - assert validDocIds != null; _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName); removeSegment(segment, validDocIds); } finally { @@ -347,7 +379,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, numPrimaryKeys); - _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys); + _logger.info("Finished removing segment: {} in {}ms, current primary key count: {}", segmentName, + System.currentTimeMillis() - startTimeMs, numPrimaryKeys); } @Override @@ -360,6 +393,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _logger.debug("Skip updating record because metadata manager is already stopped"); return record; } + startOperation(); try { return doUpdateRecord(record, recordInfo); @@ -385,6 +419,48 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } } + @Override + public void takeSnapshot() { + if (!_enableSnapshot) { + return; + } + if (_stopped) { + _logger.info("Skip taking snapshot because metadata manager is already stopped"); + return; + } + if (!_gotFirstConsumingSegment) { + _logger.info("Skip taking snapshot before getting the first consuming segment"); + return; + } + + _snapshotLock.writeLock().lock(); + startOperation(); + try { + doTakeSnapshot(); + } finally { + finishOperation(); + _snapshotLock.writeLock().unlock(); + } + } + + // TODO: Consider optimizing it by tracking and persisting only the changed snapshot + protected void doTakeSnapshot() { + int numTrackedSegments = _trackedSegments.size(); + _logger.info("Taking snapshot for {} segments", numTrackedSegments); + long startTimeMs = System.currentTimeMillis(); + + int numImmutableSegments = 0; + for (IndexSegment segment : _trackedSegments) { + if (segment instanceof ImmutableSegmentImpl) { + ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(); + numImmutableSegments++; + } + } + + _logger.info("Finished taking snapshot for {} immutable segments (out of {} total segments) in {}ms", + numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs); + } + protected void startOperation() { _numPendingOperations.getAndIncrement(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java index dd07143fd3..55cd8497cb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java @@ -84,6 +84,12 @@ public interface PartitionUpsertMetadataManager extends Closeable { */ GenericRow updateRecord(GenericRow record, RecordInfo recordInfo); + /** + * Takes snapshot for all the tracked immutable segments when snapshot is enabled. This method should be invoked + * before a new consuming segment starts consuming. + */ + void takeSnapshot(); + /** * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted. */ diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java deleted file mode 100644 index 4bb049cc2c..0000000000 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.segment.local.indexsegment.immutable; - -import java.io.File; -import java.net.URL; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; -import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils; -import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; -import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; -import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager; -import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; -import org.apache.pinot.segment.spi.V1Constants; -import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; -import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; -import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; -import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; -import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; -import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; -import org.apache.pinot.spi.config.table.HashFunction; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.config.table.UpsertConfig; -import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.utils.ReadMode; -import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.mockito.Mockito; -import org.roaringbitmap.buffer.MutableRoaringBitmap; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; - - -public class ImmutableSegmentImplUpsertSnapshotTest { - private static final String AVRO_FILE = "data/test_data-mv.avro"; - private static final String SCHEMA = "data/testDataMVSchema.json"; - private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ImmutableSegmentImplTest"); - - private SegmentDirectory _segmentDirectory; - private SegmentMetadataImpl _segmentMetadata; - private PinotConfiguration _configuration; - private TableConfig _tableConfig; - private Schema _schema; - - private PartitionUpsertMetadataManager _partitionUpsertMetadataManager; - private ImmutableSegmentImpl _immutableSegmentImpl; - - @BeforeClass - public void setUp() - throws Exception { - FileUtils.deleteQuietly(INDEX_DIR); - - Map<String, Object> props = new HashMap<>(); - props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString()); - _configuration = new PinotConfiguration(props); - - _segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(INDEX_DIR.toURI(), - new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - - URL resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(AVRO_FILE); - Assert.assertNotNull(resourceUrl); - File avroFile = new File(resourceUrl.getFile()); - - IngestionConfig ingestionConfig = new IngestionConfig(); - ingestionConfig.setRowTimeValueCheck(false); - ingestionConfig.setSegmentTimeValueCheck(false); - - UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); - upsertConfig.setEnableSnapshot(true); - - _tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch") - .setIngestionConfig(ingestionConfig).setUpsertConfig(upsertConfig).build(); - - resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(SCHEMA); - _schema = Schema.fromFile(new File(resourceUrl.getFile())); - - SegmentGeneratorConfig config = - SegmentTestUtils.getSegmentGeneratorConfigWithSchema(avroFile, INDEX_DIR, "testTable", _tableConfig, _schema); - - SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); - driver.init(config); - driver.build(); - _segmentMetadata = Mockito.mock(SegmentMetadataImpl.class); - Mockito.when(_segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap<>()); - Mockito.when(_segmentMetadata.getIndexDir()).thenReturn(INDEX_DIR); - _immutableSegmentImpl = new ImmutableSegmentImpl(_segmentDirectory, _segmentMetadata, new HashMap<>(), null); - - ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class); - _partitionUpsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"), - Collections.singletonList("daysSinceEpoch"), HashFunction.NONE, null, true, serverMetrics); - - _immutableSegmentImpl.enableUpsert(_partitionUpsertMetadataManager, new ThreadSafeMutableRoaringBitmap()); - } - - @Test - public void testPersistValidDocIdsSnapshot() { - int[] docIds1 = new int[]{1, 4, 6, 10, 15, 17, 18, 20}; - MutableRoaringBitmap validDocIds = new MutableRoaringBitmap(); - validDocIds.add(docIds1); - - _immutableSegmentImpl.persistValidDocIdsSnapshot(validDocIds); - assertTrue(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()), - V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists()); - - MutableRoaringBitmap bitmap = _immutableSegmentImpl.loadValidDocIdsFromSnapshot(); - assertEquals(bitmap.toArray(), docIds1); - - _immutableSegmentImpl.deleteValidDocIdsSnapshot(); - assertFalse(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()), - V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists()); - } - - @AfterMethod - public void tearDown() { - FileUtils.deleteQuietly(INDEX_DIR); - } -} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index 1e3f130059..a89f7e9d64 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment; @@ -53,7 +54,6 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertSame; -import static org.testng.Assert.assertTrue; public class ConcurrentMapPartitionUpsertMetadataManagerTest { @@ -78,6 +78,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class)); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments; // Add the first segment int numRecords = 6; @@ -99,6 +100,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps); } upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator()); + trackedSegments.add(segment1); // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} assertEquals(recordLocationMap.size(), 3); checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction); @@ -125,6 +127,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps); } upsertMetadataManager.addSegment(segment2, validDocIds2, recordInfoList2.iterator()); + trackedSegments.add(segment2); // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} @@ -153,6 +156,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { ThreadSafeMutableRoaringBitmap newValidDocIds1 = new ThreadSafeMutableRoaringBitmap(); ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, primaryKeys1); upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator(), segment1); + trackedSegments.add(newSegment1); + trackedSegments.remove(segment1); // original segment1: 1 -> {4, 120} (not in the map) // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} @@ -164,7 +169,6 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); - assertEquals(upsertMetadataManager._replacedSegments, Collections.singleton(segment1)); // Remove the original segment1 upsertMetadataManager.removeSegment(segment1); @@ -178,7 +182,6 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); - assertTrue(upsertMetadataManager._replacedSegments.isEmpty()); // Remove the empty segment upsertMetadataManager.removeSegment(emptySegment); @@ -200,6 +203,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); + assertEquals(trackedSegments, Collections.singleton(newSegment1)); // Stop the metadata manager upsertMetadataManager.stop(); @@ -210,6 +214,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { assertEquals(recordLocationMap.size(), 1); checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); + assertEquals(trackedSegments, Collections.singleton(newSegment1)); // Close the metadata manager upsertMetadataManager.close(); @@ -218,8 +223,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) { List<RecordInfo> recordInfoList = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { - recordInfoList.add( - new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i]))); + recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i]))); } return recordInfoList; } @@ -288,8 +292,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { assertNotNull(recordLocation); assertSame(recordLocation.getSegment(), segment); assertEquals(recordLocation.getDocId(), docId); - assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, - comparisonValue); + assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, comparisonValue); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org