This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 38ac70a9c1 [Upsert] persist validDocsIndex snapshot for Pinot upsert optimization (#9062) 38ac70a9c1 is described below commit 38ac70a9c17c647dbbad090ca9fd60faebac5418 Author: deemoliu <qiao...@uber.com> AuthorDate: Mon Oct 3 15:14:16 2022 -0700 [Upsert] persist validDocsIndex snapshot for Pinot upsert optimization (#9062) --- ...adataAndDictionaryAggregationPlanMakerTest.java | 2 +- .../immutable/ImmutableSegmentImpl.java | 62 +++++++++ .../upsert/BasePartitionUpsertMetadataManager.java | 34 ++++- .../upsert/BaseTableUpsertMetadataManager.java | 3 + ...oncurrentMapPartitionUpsertMetadataManager.java | 4 +- .../ConcurrentMapTableUpsertMetadataManager.java | 2 +- .../pinot/segment/local/upsert/UpsertUtils.java | 45 +++++-- .../ImmutableSegmentImplUpsertSnapshotTest.java | 148 +++++++++++++++++++++ ...rrentMapPartitionUpsertMetadataManagerTest.java | 58 ++++++-- .../org/apache/pinot/segment/spi/V1Constants.java | 1 + .../pinot/spi/config/table/UpsertConfig.java | 11 ++ 11 files changed, 347 insertions(+), 23 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java index a887d3751b..f6df4e9b4e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java @@ -130,7 +130,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest { _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap); ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert( new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"), - "daysSinceEpoch", HashFunction.NONE, null, serverMetrics), new ThreadSafeMutableRoaringBitmap()); + "daysSinceEpoch", HashFunction.NONE, null, false, serverMetrics), new ThreadSafeMutableRoaringBitmap()); } @AfterClass diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java index c84454a041..de6555ff43 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java @@ -19,12 +19,17 @@ package org.apache.pinot.segment.local.indexsegment.immutable; import com.google.common.base.Preconditions; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource; @@ -34,6 +39,7 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.FetchContext; import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; @@ -43,7 +49,10 @@ import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; import org.apache.pinot.segment.spi.index.startree.StarTreeV2; import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; import org.apache.pinot.spi.data.readers.GenericRow; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +102,59 @@ public class ImmutableSegmentImpl implements ImmutableSegment { _validDocIds = validDocIds; } + @Nullable + public MutableRoaringBitmap loadValidDocIdsFromSnapshot() { + File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile(); + if (validDocIdsSnapshotFile.exists()) { + try { + byte[] bytes = FileUtils.readFileToByteArray(validDocIdsSnapshotFile); + MutableRoaringBitmap validDocIds = new ImmutableRoaringBitmap(ByteBuffer.wrap(bytes)).toMutableRoaringBitmap(); + LOGGER.info("Loaded valid doc ids for segment: {} with: {} valid docs", getSegmentName(), + validDocIds.getCardinality()); + return validDocIds; + } catch (Exception e) { + LOGGER.warn("Caught exception while loading valid doc ids from snapshot file: {}, ignoring the snapshot", + validDocIdsSnapshotFile); + } + } + return null; + } + + public void persistValidDocIdsSnapshot(MutableRoaringBitmap validDocIds) { + File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile(); + try { + if (validDocIdsSnapshotFile.exists()) { + FileUtils.delete(validDocIdsSnapshotFile); + } + try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(validDocIdsSnapshotFile))) { + validDocIds.serialize(dataOutputStream); + } + LOGGER.info("Persisted valid doc ids for segment: {} with: {} valid docs", getSegmentName(), + validDocIds.getCardinality()); + } catch (Exception e) { + LOGGER.warn("Caught exception while persisting valid doc ids to snapshot file: {}, skipping", + validDocIdsSnapshotFile); + } + } + + public void deleteValidDocIdsSnapshot() { + File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile(); + if (validDocIdsSnapshotFile.exists()) { + try { + FileUtils.delete(validDocIdsSnapshotFile); + LOGGER.info("Deleted valid doc ids snapshot for segment: {}", getSegmentName()); + } catch (Exception e) { + LOGGER.warn("Caught exception while deleting valid doc ids snapshot file: {}, skipping", + validDocIdsSnapshotFile); + } + } + } + + private File getValidDocIdsSnapshotFile() { + return new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()), + V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME); + } + @Override public Dictionary getDictionary(String column) { ColumnIndexContainer container = _indexContainerMap.get(column); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 41f6fa9f3c..7bcb33cf19 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -54,6 +54,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps protected final String _comparisonColumn; protected final HashFunction _hashFunction; protected final PartialUpsertHandler _partialUpsertHandler; + protected final boolean _enableSnapshot; protected final ServerMetrics _serverMetrics; protected final Logger _logger; @@ -65,13 +66,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction, - @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) { + @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) { _tableNameWithType = tableNameWithType; _partitionId = partitionId; _primaryKeyColumns = primaryKeyColumns; _comparisonColumn = comparisonColumn; _hashFunction = hashFunction; _partialUpsertHandler = partialUpsertHandler; + _enableSnapshot = enableSnapshot; _serverMetrics = serverMetrics; _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); } @@ -83,9 +85,25 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps @Override public void addSegment(ImmutableSegment segment) { - addSegment(segment, null, null); + Iterator<RecordInfo> recordInfoIterator = null; + if (segment instanceof ImmutableSegmentImpl) { + if (_enableSnapshot) { + MutableRoaringBitmap validDocIds = ((ImmutableSegmentImpl) segment).loadValidDocIdsFromSnapshot(); + if (validDocIds != null) { + recordInfoIterator = + UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn, validDocIds); + } + } else { + ((ImmutableSegmentImpl) segment).deleteValidDocIdsSnapshot(); + } + } + addSegment(segment, null, recordInfoIterator); } + /** + * NOTE: We allow passing in validDocIds here so that the value can be easily accessed from the tests. The passed in + * validDocIds should always be empty. + */ @VisibleForTesting public void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, @Nullable Iterator<RecordInfo> recordInfoIterator) { @@ -133,6 +151,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps replaceSegment(segment, null, null, oldSegment); } + /** + * NOTE: We allow passing in validDocIds here so that the value can be easily accessed from the tests. The passed in + * validDocIds should always be empty. + */ @VisibleForTesting public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds, @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) { @@ -157,6 +179,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps if (validDocIds == null) { validDocIds = new ThreadSafeMutableRoaringBitmap(); } + // New segment doesn't necessary have the same docs as the old segment. + // Even for consuming segment, we might re-order the docs. + // As a result, we iterate all docIds of the new segment instead of loading it from old segment's snapshot. if (recordInfoIterator == null) { recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn); } @@ -215,6 +240,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps try { MutableRoaringBitmap validDocIds = segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null; + + if (_enableSnapshot && segment instanceof ImmutableSegmentImpl && validDocIds != null) { + ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds); + } + if (validDocIds == null || validDocIds.isEmpty()) { _logger.info("Skip removing segment without valid docs: {}", segmentName); return; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 95666d3ea2..7147341b69 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -38,6 +38,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad protected String _comparisonColumn; protected HashFunction _hashFunction; protected PartialUpsertHandler _partialUpsertHandler; + protected boolean _enableSnapshot; protected ServerMetrics _serverMetrics; @Override @@ -69,6 +70,8 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad _comparisonColumn); } + _enableSnapshot = upsertConfig.isEnableSnapshot(); + _serverMetrics = serverMetrics; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 125f61fe47..611153fd13 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -58,9 +58,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction, - @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) { + @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) { super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumn, hashFunction, partialUpsertHandler, - serverMetrics); + enableSnapshot, serverMetrics); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java index 9c22316703..3f830bb3a7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java @@ -36,7 +36,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) { return _partitionMetadataManagerMap.computeIfAbsent(partitionId, k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns, - _comparisonColumn, _hashFunction, _partialUpsertHandler, _serverMetrics)); + _comparisonColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics)); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java index a7ea1b92f7..a1ee7b3266 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java @@ -24,6 +24,8 @@ import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.data.readers.PrimaryKey; import org.apache.pinot.spi.utils.ByteArray; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.MutableRoaringBitmap; @SuppressWarnings("rawtypes") @@ -32,7 +34,7 @@ public class UpsertUtils { } /** - * Returns an iterator of {@link RecordInfo} from the segment. + * Returns an iterator of {@link RecordInfo} for all the documents from the segment. */ public static Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment, List<String> primaryKeyColumns, String comparisonColumn) { @@ -47,18 +49,45 @@ public class UpsertUtils { @Override public RecordInfo next() { - PrimaryKey primaryKey = new PrimaryKey(new Object[primaryKeyColumns.size()]); - getPrimaryKey(segment, primaryKeyColumns, _docId, primaryKey); + return getRecordInfo(segment, primaryKeyColumns, comparisonColumn, _docId++); + } + }; + } + + /** + * Returns an iterator of {@link RecordInfo} for the valid documents from the segment. + */ + public static Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment, List<String> primaryKeyColumns, + String comparisonColumn, MutableRoaringBitmap validDocIds) { + return new Iterator<RecordInfo>() { + private final PeekableIntIterator _docIdIterator = validDocIds.getIntIterator(); - Object comparisonValue = segment.getValue(_docId, comparisonColumn); - if (comparisonValue instanceof byte[]) { - comparisonValue = new ByteArray((byte[]) comparisonValue); - } - return new RecordInfo(primaryKey, _docId++, (Comparable) comparisonValue); + @Override + public boolean hasNext() { + return _docIdIterator.hasNext(); + } + + @Override + public RecordInfo next() { + return getRecordInfo(segment, primaryKeyColumns, comparisonColumn, _docIdIterator.next()); } }; } + /** + * Reads a {@link RecordInfo} from the segment. + */ + public static RecordInfo getRecordInfo(ImmutableSegment segment, List<String> primaryKeyColumns, + String comparisonColumn, int docId) { + PrimaryKey primaryKey = new PrimaryKey(new Object[primaryKeyColumns.size()]); + getPrimaryKey(segment, primaryKeyColumns, docId, primaryKey); + Object comparisonValue = segment.getValue(docId, comparisonColumn); + if (comparisonValue instanceof byte[]) { + comparisonValue = new ByteArray((byte[]) comparisonValue); + } + return new RecordInfo(primaryKey, docId, (Comparable) comparisonValue); + } + /** * Reads a primary key from the segment. */ diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java new file mode 100644 index 0000000000..6c31c3981f --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.indexsegment.immutable; + +import java.io.File; +import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; +import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.mockito.Mockito; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class ImmutableSegmentImplUpsertSnapshotTest { + private static final String AVRO_FILE = "data/test_data-mv.avro"; + private static final String SCHEMA = "data/testDataMVSchema.json"; + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ImmutableSegmentImplTest"); + + private SegmentDirectory _segmentDirectory; + private SegmentMetadataImpl _segmentMetadata; + private PinotConfiguration _configuration; + private TableConfig _tableConfig; + private Schema _schema; + + private PartitionUpsertMetadataManager _partitionUpsertMetadataManager; + private ImmutableSegmentImpl _immutableSegmentImpl; + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteQuietly(INDEX_DIR); + + Map<String, Object> props = new HashMap<>(); + props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString()); + _configuration = new PinotConfiguration(props); + + _segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(INDEX_DIR.toURI(), + new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); + + URL resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(AVRO_FILE); + Assert.assertNotNull(resourceUrl); + File avroFile = new File(resourceUrl.getFile()); + + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setRowTimeValueCheck(false); + ingestionConfig.setSegmentTimeValueCheck(false); + + UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); + upsertConfig.setEnableSnapshot(true); + + _tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch") + .setIngestionConfig(ingestionConfig).setUpsertConfig(upsertConfig).build(); + + resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(SCHEMA); + _schema = Schema.fromFile(new File(resourceUrl.getFile())); + + SegmentGeneratorConfig config = + SegmentTestUtils.getSegmentGeneratorConfigWithSchema(avroFile, INDEX_DIR, "testTable", _tableConfig, _schema); + + SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + driver.init(config); + driver.build(); + _segmentMetadata = Mockito.mock(SegmentMetadataImpl.class); + Mockito.when(_segmentMetadata.getColumnMetadataMap()).thenReturn(new HashMap<>()); + Mockito.when(_segmentMetadata.getIndexDir()).thenReturn(INDEX_DIR); + _immutableSegmentImpl = new ImmutableSegmentImpl(_segmentDirectory, _segmentMetadata, new HashMap<>(), null); + + ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class); + _partitionUpsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"), + "daysSinceEpoch", HashFunction.NONE, null, true, serverMetrics); + + _immutableSegmentImpl.enableUpsert(_partitionUpsertMetadataManager, new ThreadSafeMutableRoaringBitmap()); + } + + @Test + public void testPersistValidDocIdsSnapshot() { + int[] docIds1 = new int[]{1, 4, 6, 10, 15, 17, 18, 20}; + MutableRoaringBitmap validDocIds = new MutableRoaringBitmap(); + validDocIds.add(docIds1); + + _immutableSegmentImpl.persistValidDocIdsSnapshot(validDocIds); + assertTrue(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()), + V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists()); + + MutableRoaringBitmap bitmap = _immutableSegmentImpl.loadValidDocIdsFromSnapshot(); + assertEquals(bitmap.toArray(), docIds1); + + _immutableSegmentImpl.deleteValidDocIdsSnapshot(); + assertFalse(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()), + V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists()); + } + + @AfterMethod + public void tearDown() { + FileUtils.deleteQuietly(INDEX_DIR); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index 5a6e45573c..3df1f45caa 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.upsert; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.pinot.common.metrics.ServerMetrics; @@ -37,6 +38,7 @@ import org.apache.pinot.spi.data.readers.PrimaryKey; import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.anyInt; @@ -55,15 +57,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { @Test public void testAddReplaceRemoveSegment() { - verifyAddReplaceRemoveSegment(HashFunction.NONE); - verifyAddReplaceRemoveSegment(HashFunction.MD5); - verifyAddReplaceRemoveSegment(HashFunction.MURMUR3); + verifyAddReplaceRemoveSegment(HashFunction.NONE, false); + verifyAddReplaceRemoveSegment(HashFunction.MD5, false); + verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, false); + verifyAddReplaceRemoveSegment(HashFunction.NONE, true); + verifyAddReplaceRemoveSegment(HashFunction.MD5, true); + verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, true); } - private void verifyAddReplaceRemoveSegment(HashFunction hashFunction) { + private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot) { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - "timeCol", hashFunction, null, mock(ServerMetrics.class)); + "timeCol", hashFunction, null, false, mock(ServerMetrics.class)); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment @@ -73,7 +78,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys); ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, primaryKeys1); - List<RecordInfo> recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps); + List<RecordInfo> recordInfoList1; + if (enableSnapshot) { + // get recordInfo from validDocIdSnapshot. + // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} + int[] docIds1 = new int[]{2, 4, 5}; + MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap(); + validDocIdsSnapshot1.add(docIds1); + recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps); + } else { + // get recordInfo by iterating all records. + recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps); + } upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator()); // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} assertEquals(recordLocationMap.size(), 3); @@ -88,8 +104,20 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { timestamps = new int[]{100, 100, 120, 80, 80}; ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); ImmutableSegmentImpl segment2 = mockImmutableSegment(2, validDocIds2, getPrimaryKeyList(numRecords, primaryKeys)); - upsertMetadataManager.addSegment(segment2, validDocIds2, - getRecordInfoList(numRecords, primaryKeys, timestamps).iterator()); + List<RecordInfo> recordInfoList2; + if (enableSnapshot) { + // get recordInfo from validDocIdSnapshot. + // segment2 snapshot: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} + // segment1 snapshot: 1 -> {4, 120} + MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap(); + validDocIdsSnapshot2.add(new int[]{0, 2, 3}); + recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps); + } else { + // get recordInfo by iterating all records. + recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps); + } + upsertMetadataManager.addSegment(segment2, validDocIds2, recordInfoList2.iterator()); + // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} assertEquals(recordLocationMap.size(), 4); @@ -174,6 +202,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { return recordInfoList; } + /** + * Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True). + */ + private List<RecordInfo> getRecordInfoList(MutableRoaringBitmap validDocIdsSnapshot, int[] primaryKeys, + int[] timestamps) { + List<RecordInfo> recordInfoList = new ArrayList<>(); + Iterator<Integer> validDocIdsIterator = validDocIdsSnapshot.iterator(); + validDocIdsIterator.forEachRemaining((docId) -> recordInfoList.add( + new RecordInfo(makePrimaryKey(primaryKeys[docId]), docId, new IntWrapper(timestamps[docId])))); + return recordInfoList; + } + private List<PrimaryKey> getPrimaryKeyList(int numRecords, int[] primaryKeys) { List<PrimaryKey> primaryKeyList = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { @@ -233,7 +273,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { private void verifyAddRecord(HashFunction hashFunction) { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), - "timeCol", hashFunction, null, mock(ServerMetrics.class)); + "timeCol", hashFunction, null, false, mock(ServerMetrics.class)); Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java index 3a99c99346..c9d4213b63 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java @@ -25,6 +25,7 @@ public class V1Constants { public static final String SEGMENT_CREATION_META = "creation.meta"; public static final String INDEX_MAP_FILE_NAME = "index_map"; public static final String INDEX_FILE_NAME = "columns.psf"; + public static final String VALID_DOC_IDS_SNAPSHOT_FILE_NAME = "validdocids.bitmap.snapshot"; public static class Str { public static final char DEFAULT_STRING_PAD_CHAR = '\0'; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 162e6b3030..ae0522f6cd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -54,6 +54,9 @@ public class UpsertConfig extends BaseJsonConfig { @JsonPropertyDescription("Column for upsert comparison, default to time column") private String _comparisonColumn; + @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery") + private boolean _enableSnapshot; + @JsonPropertyDescription("Custom class for upsert metadata manager") private String _metadataManagerClass; @@ -111,6 +114,10 @@ public class UpsertConfig extends BaseJsonConfig { return _comparisonColumn; } + public boolean isEnableSnapshot() { + return _enableSnapshot; + } + @Nullable public String getMetadataManagerClass() { return _metadataManagerClass; @@ -153,6 +160,10 @@ public class UpsertConfig extends BaseJsonConfig { _comparisonColumn = comparisonColumn; } + public void setEnableSnapshot(boolean enableSnapshot) { + _enableSnapshot = enableSnapshot; + } + public void setMetadataManagerClass(String metadataManagerClass) { _metadataManagerClass = metadataManagerClass; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org