This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 767f32fc11 add preloading support for dedup tables (#14187) 767f32fc11 is described below commit 767f32fc113dbf5c027bc10b1cb695611194614e Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Tue Oct 22 08:25:03 2024 -0700 add preloading support for dedup tables (#14187) * add preloading suuport for dedup tables --- .../apache/pinot/common/metrics/ServerMeter.java | 1 + .../apache/pinot/common/metrics/ServerTimer.java | 1 + .../common/utils/config/TableConfigSerDeTest.java | 7 +- .../manager/realtime/RealtimeTableDataManager.java | 59 +++++-- .../tests/DedupPreloadIntegrationTest.java | 162 +++++++++++++++++++ .../dedup/BasePartitionDedupMetadataManager.java | 171 +++++++++++++++++---- .../local/dedup/BaseTableDedupMetadataManager.java | 33 ++-- ...ConcurrentMapPartitionDedupMetadataManager.java | 11 ++ .../pinot/segment/local/dedup/DedupContext.java | 33 ++-- .../local/dedup/PartitionDedupMetadataManager.java | 18 +++ .../local/dedup/TableDedupMetadataManager.java | 2 + .../BasePartitionDedupMetadataManagerTest.java | 86 +++++++++++ ...apPartitionDedupMetadataManagerWithTTLTest.java | 7 +- ...artitionDedupMetadataManagerWithoutTTLTest.java | 4 +- .../TableDedupMetadataManagerFactoryTest.java | 68 ++++++++ .../mutable/MutableSegmentDedupeTest.java | 2 +- .../apache/pinot/spi/config/table/DedupConfig.java | 14 +- 17 files changed, 592 insertions(+), 87 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 77595d6181..a68e77f144 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -52,6 +52,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { // number of times partition of a record did not match the partition of the stream REALTIME_PARTITION_MISMATCH("mismatch", false), REALTIME_DEDUP_DROPPED("rows", false), + DEDUP_PRELOAD_FAILURE("count", false), UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false), PARTIAL_UPSERT_OUT_OF_ORDER("rows", false), PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false), diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java index 63b42440a6..6738dbe194 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java @@ -58,6 +58,7 @@ public enum ServerTimer implements AbstractMetrics.Timer { DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false, "Total time taken to delete expired dedup primary keys based on metadataTTL or deletedKeysTTL"), + DEDUP_PRELOAD_TIME_MS("milliseconds", false, "Total time taken to preload a table partition of a dedup table"), SECONDARY_Q_WAIT_TIME_MS("milliseconds", false, "Time spent waiting in the secondary queue when BinaryWorkloadScheduler is used."), diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java index 47de3d6225..5972994e2d 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java @@ -65,6 +65,7 @@ import static org.testng.Assert.*; public class TableConfigSerDeTest { private static final double NO_DICTIONARY_THRESHOLD_RATIO = 0.72; + @Test public void testSerDe() throws IOException { @@ -192,8 +193,8 @@ public class TableConfigSerDeTest { } { // With query config - QueryConfig queryConfig = new QueryConfig(1000L, true, true, Collections.singletonMap("func(a)", "b"), null, - null); + QueryConfig queryConfig = + new QueryConfig(1000L, true, true, Collections.singletonMap("func(a)", "b"), null, null); TableConfig tableConfig = tableConfigBuilder.setQueryConfig(queryConfig).build(); checkQueryConfig(tableConfig); @@ -270,7 +271,7 @@ public class TableConfigSerDeTest { } { // with dedup config - with metadata ttl and metadata time column - DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5, null, null, 10, "dedupTimeColumn"); + DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5, null, null, 10, "dedupTimeColumn", false); TableConfig tableConfig = tableConfigBuilder.setDedupConfig(dedupConfig).build(); // Serialize then de-serialize checkTableConfigWithDedupConfigWithTTL(JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 297254529d..b4b33baa02 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -402,6 +402,15 @@ public class RealtimeTableDataManager extends BaseTableDataManager { && _tableUpsertMetadataManager.getUpsertMode() == UpsertConfig.Mode.PARTIAL; } + private void handleSegmentPreload(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) { + // Today a table can use either upsert or dedup but not both at the same time, so preloading is done by either the + // upsert manager or the dedup manager. + // TODO: if a table can enable both dedup and upsert in the future, we need to revisit the preloading logic here, + // as we can only preload segments once but have to restore metadata for both dedup and upsert managers. + handleUpsertPreload(zkMetadata, indexLoadingConfig); + handleDedupPreload(zkMetadata, indexLoadingConfig); + } + /** * Handles upsert preload if the upsert preload is enabled. */ @@ -417,6 +426,21 @@ public class RealtimeTableDataManager extends BaseTableDataManager { _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig); } + /** + * Handles dedup preload if the dedup preload is enabled. + */ + private void handleDedupPreload(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) { + if (_tableDedupMetadataManager == null || !_tableDedupMetadataManager.isEnablePreload()) { + return; + } + String segmentName = zkMetadata.getSegmentName(); + Integer partitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, zkMetadata, null); + Preconditions.checkState(partitionId != null, + String.format("Failed to get partition id for segment: %s in dedup-enabled table: %s", segmentName, + _tableNameWithType)); + _tableDedupMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig); + } + protected void doAddOnlineSegment(String segmentName) throws Exception { SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); @@ -424,7 +448,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { "Segment: %s of table: %s is not committed, cannot make it ONLINE", segmentName, _tableNameWithType); IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig(); indexLoadingConfig.setSegmentTier(zkMetadata.getTier()); - handleUpsertPreload(zkMetadata, indexLoadingConfig); + handleSegmentPreload(zkMetadata, indexLoadingConfig); SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); if (segmentDataManager == null) { addNewOnlineSegment(zkMetadata, indexLoadingConfig); @@ -470,7 +494,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { return; } IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig(); - handleUpsertPreload(zkMetadata, indexLoadingConfig); + handleSegmentPreload(zkMetadata, indexLoadingConfig); SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); if (segmentDataManager != null) { _logger.warn("Segment: {} ({}) already exists, skipping adding it as CONSUMING segment", segmentName, @@ -567,22 +591,29 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private void handleDedup(ImmutableSegmentImpl immutableSegment) { // TODO(saurabh) refactor commons code with handleUpsert String segmentName = immutableSegment.getSegmentName(); - Integer partitionGroupId = + _logger.info("Adding immutable segment: {} with dedup enabled", segmentName); + Integer partitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, null); - Preconditions.checkNotNull(partitionGroupId, - String.format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName, + Preconditions.checkNotNull(partitionId, + String.format("PartitionId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName, _tableNameWithType)); PartitionDedupMetadataManager partitionDedupMetadataManager = - _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId); + _tableDedupMetadataManager.getOrCreatePartitionManager(partitionId); immutableSegment.enableDedup(partitionDedupMetadataManager); SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName); - if (oldSegmentManager != null) { - LOGGER.info("Replacing mutable segment: {} with immutable segment: {} in partition dedup metadata manager", - oldSegmentManager.getSegment().getSegmentName(), segmentName); - partitionDedupMetadataManager.replaceSegment(oldSegmentManager.getSegment(), immutableSegment); - } else { - LOGGER.info("Adding immutable segment: {} to partition dedup metadata manager", segmentName); + if (partitionDedupMetadataManager.isPreloading()) { + partitionDedupMetadataManager.preloadSegment(immutableSegment); + LOGGER.info("Preloaded immutable segment: {} with dedup enabled", segmentName); + return; + } + if (oldSegmentManager == null) { partitionDedupMetadataManager.addSegment(immutableSegment); + LOGGER.info("Added new immutable segment: {} with dedup enabled", segmentName); + } else { + IndexSegment oldSegment = oldSegmentManager.getSegment(); + partitionDedupMetadataManager.replaceSegment(oldSegment, immutableSegment); + LOGGER.info("Replaced {} segment: {} with dedup enabled", + oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName); } } @@ -603,8 +634,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment); if (partitionUpsertMetadataManager.isPreloading()) { - // Preloading segment is ensured to be handled by a single thread, so no need to take the segment upsert lock. - // Besides, preloading happens before the table partition is made ready for any queries. + // Register segment after it is preloaded and has initialized its validDocIds. The order of preloading and + // registering segment doesn't matter much as preloading happens before table partition is ready for queries. partitionUpsertMetadataManager.preloadSegment(immutableSegment); registerSegment(segmentName, newSegmentManager, partitionUpsertMetadataManager); _logger.info("Preloaded immutable segment: {} with upsert enabled", segmentName); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupPreloadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupPreloadIntegrationTest.java new file mode 100644 index 0000000000..c2589bb520 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupPreloadIntegrationTest.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.DedupConfig; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; +import org.apache.pinot.spi.config.table.RoutingConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class DedupPreloadIntegrationTest extends BaseClusterIntegrationTestSet { + + private List<File> _avroFiles; + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + // Start a customized controller with more frequent realtime segment validation + startController(); + startBroker(); + startServer(); + + _avroFiles = unpackAvroData(_tempDir); + startKafka(); + pushAvroIntoKafka(_avroFiles); + + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createDedupTableConfig(_avroFiles.get(0), "id", getNumKafkaPartitions()); + addTableConfig(tableConfig); + + waitForAllDocsLoaded(600_000L); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX + ".max.segment.preload.threads", + "1"); + } + + @AfterClass + public void tearDown() + throws IOException { + dropRealtimeTable(getTableName()); + stopServer(); + stopBroker(); + stopController(); + stopKafka(); + stopZk(); + FileUtils.deleteDirectory(_tempDir); + } + + @Override + protected int getRealtimeSegmentFlushSize() { + // Create > 1 segments + return 2; + } + + @Override + protected String getSchemaFileName() { + return "dedupIngestionTestSchema.schema"; + } + + @Override + protected String getAvroTarFileName() { + return "dedupIngestionTestData.tar.gz"; + } + + @Override + protected String getPartitionColumn() { + return "id"; + } + + @Override + protected long getCountStarResult() { + // Three distinct records are expected with pk values of 100000, 100001, 100002 + return 5; + } + + @Test + public void testValues() + throws Exception { + assertEquals(getCurrentCountStarResult(), getCountStarResult()); + + // Validate the older value persist + for (int i = 0; i < getCountStarResult(); i++) { + assertEquals( + getPinotConnection().execute("SELECT name FROM " + getTableName() + " WHERE id = " + i).getResultSet(0) + .getString(0), "" + i); + } + + // Restart the servers and check again + restartServers(); + waitForAllDocsLoaded(600_000L); + + // Validate the older value persist + for (int i = 0; i < getCountStarResult(); i++) { + assertEquals( + getPinotConnection().execute("SELECT name FROM " + getTableName() + " WHERE id = " + i).getResultSet(0) + .getString(0), "" + i); + } + } + + @Override + protected TableConfig createDedupTableConfig(File sampleAvroFile, String primaryKeyColumn, int numPartitions) { + AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile; + Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>(); + columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions)); + + DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null, null, 0, null, true); + + return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()) + .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()) + .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()) + .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()) + .setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).setRoutingConfig( + new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)) + .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap)) + .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1)).setDedupConfig(dedupConfig) + .build(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java index 3892d36d92..08ca8633a7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java @@ -24,14 +24,23 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; +import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ServerGauge; +import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerTimer; +import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.utils.SegmentPreloadUtils; import org.apache.pinot.segment.local.utils.WatermarkUtils; +import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.spi.config.table.HashFunction; @@ -46,6 +55,7 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu protected final String _tableNameWithType; protected final List<String> _primaryKeyColumns; protected final int _partitionId; + protected final DedupContext _context; protected final ServerMetrics _serverMetrics; protected final HashFunction _hashFunction; protected final double _metadataTTL; @@ -58,16 +68,21 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu // Initialize with 1 pending operation to indicate the metadata manager can take more operations private int _numPendingOperations = 1; private boolean _closed; + // The lock and boolean flag ensure only one thread can start preloading and preloading happens only once. + private final Lock _preloadLock = new ReentrantLock(); + private volatile boolean _isPreloading; protected BasePartitionDedupMetadataManager(String tableNameWithType, int partitionId, DedupContext dedupContext) { _tableNameWithType = tableNameWithType; _partitionId = partitionId; + _context = dedupContext; _primaryKeyColumns = dedupContext.getPrimaryKeyColumns(); _hashFunction = dedupContext.getHashFunction(); - _serverMetrics = dedupContext.getServerMetrics(); + _isPreloading = dedupContext.isPreloadEnabled(); _metadataTTL = dedupContext.getMetadataTTL() >= 0 ? dedupContext.getMetadataTTL() : 0; _dedupTimeColumn = dedupContext.getDedupTimeColumn(); _tableIndexDir = dedupContext.getTableIndexDir(); + _serverMetrics = ServerMetrics.get(); _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); if (_metadataTTL > 0) { Preconditions.checkArgument(_dedupTimeColumn != null, @@ -87,7 +102,58 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu } @Override - public void addSegment(IndexSegment segment) { + public boolean isPreloading() { + return _isPreloading; + } + + @Override + public void preloadSegments(IndexLoadingConfig indexLoadingConfig) { + if (!_isPreloading) { + return; + } + TableDataManager tableDataManager = _context.getTableDataManager(); + Preconditions.checkNotNull(tableDataManager, "Preloading segments requires tableDataManager"); + HelixManager helixManager = tableDataManager.getHelixManager(); + ExecutorService segmentPreloadExecutor = tableDataManager.getSegmentPreloadExecutor(); + // Preloading the segments for dedup table for fast metadata recovery, as done for upsert table. + _preloadLock.lock(); + try { + // Check the flag again to ensure preloading happens only once. + if (!_isPreloading) { + return; + } + // From now on, the _isPreloading flag is true until the segments are preloaded. + long startTime = System.currentTimeMillis(); + doPreloadSegments(tableDataManager, indexLoadingConfig, helixManager, segmentPreloadExecutor); + long duration = System.currentTimeMillis() - startTime; + _serverMetrics.addTimedTableValue(_tableNameWithType, ServerTimer.DEDUP_PRELOAD_TIME_MS, duration, + TimeUnit.MILLISECONDS); + } catch (Exception e) { + // We should continue even if preloading fails, so that segments not being preloaded successfully can get + // loaded via the normal segment loading logic as done on the Helix task threads. + _logger.warn("Failed to preload segments from partition: {} of table: {}, skipping", _partitionId, + _tableNameWithType, e); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DEDUP_PRELOAD_FAILURE, 1); + if (e instanceof InterruptedException) { + // Restore the interrupted status in case the upper callers want to check. + Thread.currentThread().interrupt(); + } + } finally { + _isPreloading = false; + _preloadLock.unlock(); + } + } + + // Keep this hook method for subclasses to modify the preloading logic. + protected void doPreloadSegments(TableDataManager tableDataManager, IndexLoadingConfig indexLoadingConfig, + HelixManager helixManager, ExecutorService segmentPreloadExecutor) + throws Exception { + SegmentPreloadUtils.preloadSegments(tableDataManager, _partitionId, indexLoadingConfig, helixManager, + segmentPreloadExecutor, null); + } + + @Override + public void preloadSegment(ImmutableSegment segment) { String segmentName = segment.getSegmentName(); if (segment instanceof EmptyIndexSegment) { _logger.info("Skip adding empty segment: {}", segmentName); @@ -97,21 +163,58 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu "Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), segmentName, _tableNameWithType); if (!startOperation()) { - _logger.info("Skip adding segment: {} because dedup metadata manager is already stopped", - segment.getSegmentName()); + _logger.info("Skip preloading segment: {} because dedup metadata manager is already stopped", segmentName); return; } try { - addOrReplaceSegment(null, segment); + if (skipSegmentOutOfTTL(segment, true)) { + return; + } + try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new DedupUtils.DedupRecordInfoReader(segment, + _primaryKeyColumns, _dedupTimeColumn)) { + Iterator<DedupRecordInfo> dedupRecordInfoIterator = + DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, segment.getSegmentMetadata().getTotalDocs()); + doPreloadSegment(segment, dedupRecordInfoIterator); + updatePrimaryKeyGauge(); + } } catch (Exception e) { throw new RuntimeException( - String.format("Caught exception while adding segment: %s of table: %s to %s", segment.getSegmentName(), + String.format("Caught exception while preloading segment: %s of table: %s in %s", segmentName, _tableNameWithType, this.getClass().getSimpleName()), e); } finally { finishOperation(); } } + protected abstract void doPreloadSegment(ImmutableSegment segment, Iterator<DedupRecordInfo> dedupRecordInfoIterator); + + @Override + public void addSegment(IndexSegment segment) { + String segmentName = segment.getSegmentName(); + if (segment instanceof EmptyIndexSegment) { + _logger.info("Skip adding empty segment: {}", segmentName); + return; + } + Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, + "Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), segmentName, + _tableNameWithType); + if (!startOperation()) { + _logger.info("Skip adding segment: {} because dedup metadata manager is already stopped", segmentName); + return; + } + try { + if (!skipSegmentOutOfTTL(segment, true)) { + addOrReplaceSegment(null, segment); + } + } catch (Exception e) { + throw new RuntimeException( + String.format("Caught exception while adding segment: %s of table: %s to %s", segmentName, _tableNameWithType, + this.getClass().getSimpleName()), e); + } finally { + finishOperation(); + } + } + @Override public void replaceSegment(IndexSegment oldSegment, IndexSegment newSegment) { if (!startOperation()) { @@ -120,7 +223,9 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu return; } try { - addOrReplaceSegment(oldSegment, newSegment); + if (!skipSegmentOutOfTTL(newSegment, true)) { + addOrReplaceSegment(oldSegment, newSegment); + } } catch (Exception e) { throw new RuntimeException( String.format("Caught exception while replacing segment: %s with segment: %s of table: %s in %s", @@ -131,19 +236,27 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu } } - private void addOrReplaceSegment(@Nullable IndexSegment oldSegment, IndexSegment newSegment) - throws IOException { - // If metadataTTL is enabled, we can skip adding dedup metadata for segment that's already out of the TTL. - if (_metadataTTL > 0) { - double maxDedupTime = getMaxDedupTime(newSegment); + protected boolean skipSegmentOutOfTTL(IndexSegment segment, boolean updateWatermark) { + if (_metadataTTL <= 0) { + return false; + } + // If metadataTTL is enabled, we can skip adding dedup metadata for segment already out of the TTL. Different + // from upsert table, there is no need to initialize things like validDocIds bitmap for those skipped segments. + double maxDedupTime = getMaxDedupTime(segment); + if (updateWatermark) { _largestSeenTime.getAndUpdate(time -> Math.max(time, maxDedupTime)); - if (isOutOfMetadataTTL(maxDedupTime)) { - String action = oldSegment == null ? "adding" : "replacing"; - _logger.info("Skip {} segment: {} as max dedupTime: {} is out of TTL: {}", action, newSegment.getSegmentName(), - maxDedupTime, _metadataTTL); - return; - } } + if (!isOutOfMetadataTTL(maxDedupTime)) { + return false; + } + _logger.info("Skip segment: {} as max dedupTime: {} is out of TTL: {}", segment.getSegmentName(), maxDedupTime, + _metadataTTL); + // Return true if skipped. Boolean value allows subclasses to disable skipping. + return true; + } + + private void addOrReplaceSegment(@Nullable IndexSegment oldSegment, IndexSegment newSegment) + throws IOException { try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new DedupUtils.DedupRecordInfoReader(newSegment, _primaryKeyColumns, _dedupTimeColumn)) { Iterator<DedupRecordInfo> dedupRecordInfoIterator = @@ -169,21 +282,17 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu _logger.info("Skip removing segment: {} because metadata manager is already stopped", segment.getSegmentName()); return; } - // Skip removing the dedup metadata of segment out of TTL. The expired metadata is removed in batches. - if (_metadataTTL > 0) { - double maxDedupTime = getMaxDedupTime(segment); - if (isOutOfMetadataTTL(maxDedupTime)) { - _logger.info("Skip removing segment: {} as max dedupTime: {} is out of TTL: {}", segment.getSegmentName(), - maxDedupTime, _metadataTTL); + try { + if (skipSegmentOutOfTTL(segment, false)) { return; } - } - try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new DedupUtils.DedupRecordInfoReader(segment, - _primaryKeyColumns, _dedupTimeColumn)) { - Iterator<DedupRecordInfo> dedupRecordInfoIterator = - DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, segment.getSegmentMetadata().getTotalDocs()); - doRemoveSegment(segment, dedupRecordInfoIterator); - updatePrimaryKeyGauge(); + try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new DedupUtils.DedupRecordInfoReader(segment, + _primaryKeyColumns, _dedupTimeColumn)) { + Iterator<DedupRecordInfo> dedupRecordInfoIterator = + DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, segment.getSegmentMetadata().getTotalDocs()); + doRemoveSegment(segment, dedupRecordInfoIterator); + updatePrimaryKeyGauge(); + } } catch (Exception e) { throw new RuntimeException( String.format("Caught exception while removing segment: %s of table: %s from %s", segment.getSegmentName(), diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java index 80639ebd5e..8172bb86f3 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.dedup; import com.google.common.base.Preconditions; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; @@ -27,14 +28,20 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.config.table.DedupConfig; +import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class BaseTableDedupMetadataManager implements TableDedupMetadataManager { + private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableDedupMetadataManager.class); + protected final Map<Integer, PartitionDedupMetadataManager> _partitionMetadataManagerMap = new ConcurrentHashMap<>(); protected String _tableNameWithType; protected DedupContext _dedupContext; + private boolean _enablePreload; @Override public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, @@ -57,19 +64,18 @@ public abstract class BaseTableDedupMetadataManager implements TableDedupMetadat "When metadataTTL is configured, metadata time column or time column must be configured for " + "dedup enabled table: %s", _tableNameWithType); } - + _enablePreload = dedupConfig.isEnablePreload() && tableDataManager.getSegmentPreloadExecutor() != null; + HashFunction hashFunction = dedupConfig.getHashFunction(); + File tableIndexDir = tableDataManager.getTableDataDir(); DedupContext.Builder dedupContextBuider = new DedupContext.Builder(); - dedupContextBuider - .setTableConfig(tableConfig) - .setSchema(schema) - .setPrimaryKeyColumns(primaryKeyColumns) - .setHashFunction(dedupConfig.getHashFunction()) - .setMetadataTTL(metadataTTL) - .setDedupTimeColumn(dedupTimeColumn) - .setTableIndexDir(tableDataManager.getTableDataDir()) - .setTableDataManager(tableDataManager) - .setServerMetrics(serverMetrics); + dedupContextBuider.setTableConfig(tableConfig).setSchema(schema).setPrimaryKeyColumns(primaryKeyColumns) + .setHashFunction(hashFunction).setEnablePreload(_enablePreload).setMetadataTTL(metadataTTL) + .setDedupTimeColumn(dedupTimeColumn).setTableIndexDir(tableIndexDir).setTableDataManager(tableDataManager); _dedupContext = dedupContextBuider.build(); + LOGGER.info( + "Initialized {} for table: {} with primary key columns: {}, hash function: {}, enable preload: {}, metadata " + + "TTL: {}, dedup time column: {}, table index dir: {}", getClass().getSimpleName(), _tableNameWithType, + primaryKeyColumns, hashFunction, _enablePreload, metadataTTL, dedupTimeColumn, tableIndexDir); initCustomVariables(); } @@ -89,6 +95,11 @@ public abstract class BaseTableDedupMetadataManager implements TableDedupMetadat protected void initCustomVariables() { } + @Override + public boolean isEnablePreload() { + return _enablePreload; + } + @Override public void stop() { for (PartitionDedupMetadataManager metadataManager : _partitionMetadataManagerMap.values()) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java index 4461266e35..b4ef9ca63a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.segment.local.utils.HashUtils; +import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; @@ -38,6 +39,16 @@ class ConcurrentMapPartitionDedupMetadataManager extends BasePartitionDedupMetad super(tableNameWithType, partitionId, dedupContext); } + @Override + protected void doPreloadSegment(ImmutableSegment segment, Iterator<DedupRecordInfo> dedupRecordInfoIterator) { + while (dedupRecordInfoIterator.hasNext()) { + DedupRecordInfo dedupRecordInfo = dedupRecordInfoIterator.next(); + double dedupTime = dedupRecordInfo.getDedupTime(); + _primaryKeyToSegmentAndTimeMap.put(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(), _hashFunction), + Pair.of(segment, dedupTime)); + } + } + @Override protected void doAddOrReplaceSegment(IndexSegment oldSegment, IndexSegment newSegment, Iterator<DedupRecordInfo> dedupRecordInfoIteratorOfNewSegment) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java index a523f26957..4407676ad7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import java.io.File; import java.util.List; import org.apache.commons.collections4.CollectionUtils; -import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; @@ -34,24 +33,24 @@ public class DedupContext { private final Schema _schema; private final List<String> _primaryKeyColumns; private final HashFunction _hashFunction; + private final boolean _enablePreload; private final double _metadataTTL; private final String _dedupTimeColumn; private final File _tableIndexDir; private final TableDataManager _tableDataManager; - private final ServerMetrics _serverMetrics; private DedupContext(TableConfig tableConfig, Schema schema, List<String> primaryKeyColumns, - HashFunction hashFunction, double metadataTTL, String dedupTimeColumn, File tableIndexDir, - TableDataManager tableDataManager, ServerMetrics serverMetrics) { + HashFunction hashFunction, boolean enablePreload, double metadataTTL, String dedupTimeColumn, File tableIndexDir, + TableDataManager tableDataManager) { _tableConfig = tableConfig; _schema = schema; _primaryKeyColumns = primaryKeyColumns; _hashFunction = hashFunction; + _enablePreload = enablePreload; _metadataTTL = metadataTTL; _dedupTimeColumn = dedupTimeColumn; _tableIndexDir = tableIndexDir; _tableDataManager = tableDataManager; - _serverMetrics = serverMetrics; } public TableConfig getTableConfig() { @@ -70,6 +69,10 @@ public class DedupContext { return _hashFunction; } + public boolean isPreloadEnabled() { + return _enablePreload; + } + public double getMetadataTTL() { return _metadataTTL; } @@ -86,20 +89,16 @@ public class DedupContext { return _tableDataManager; } - public ServerMetrics getServerMetrics() { - return _serverMetrics; - } - public static class Builder { private TableConfig _tableConfig; private Schema _schema; private List<String> _primaryKeyColumns; private HashFunction _hashFunction; + private boolean _enablePreload; private double _metadataTTL; private String _dedupTimeColumn; private File _tableIndexDir; private TableDataManager _tableDataManager; - private ServerMetrics _serverMetrics; public Builder setTableConfig(TableConfig tableConfig) { _tableConfig = tableConfig; @@ -121,6 +120,11 @@ public class DedupContext { return this; } + public Builder setEnablePreload(boolean enablePreload) { + _enablePreload = enablePreload; + return this; + } + public Builder setMetadataTTL(double metadataTTL) { _metadataTTL = metadataTTL; return this; @@ -141,19 +145,14 @@ public class DedupContext { return this; } - public Builder setServerMetrics(ServerMetrics serverMetrics) { - _serverMetrics = serverMetrics; - return this; - } - public DedupContext build() { Preconditions.checkState(_tableConfig != null, "Table config must be set"); Preconditions.checkState(_schema != null, "Schema must be set"); Preconditions.checkState(CollectionUtils.isNotEmpty(_primaryKeyColumns), "Primary key columns must be set"); Preconditions.checkState(_hashFunction != null, "Hash function must be set"); Preconditions.checkState(_tableIndexDir != null, "Table index directory must be set"); - return new DedupContext(_tableConfig, _schema, _primaryKeyColumns, _hashFunction, _metadataTTL, _dedupTimeColumn, - _tableIndexDir, _tableDataManager, _serverMetrics); + return new DedupContext(_tableConfig, _schema, _primaryKeyColumns, _hashFunction, _enablePreload, _metadataTTL, + _dedupTimeColumn, _tableIndexDir, _tableDataManager); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java index 835ce6dfa7..ff2667ec14 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java @@ -19,6 +19,8 @@ package org.apache.pinot.segment.local.dedup; import java.io.Closeable; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.data.readers.PrimaryKey; @@ -37,6 +39,22 @@ public interface PartitionDedupMetadataManager extends Closeable { addSegment(newSegment); } + /** + * Preload segments for the table partition. Segments can be added differently during preloading. + * TODO: As commented in PartitionUpsertMetadataManager, revisit this method and see if we can use the same + * IndexLoadingConfig for all segments. Tier info might be different for different segments. + */ + void preloadSegments(IndexLoadingConfig indexLoadingConfig); + + boolean isPreloading(); + + /** + * Different from adding a segment, when preloading a segment, the dedup metadata may be updated more efficiently. + * Basically the dedup metadata can be directly updated for each primary key, without doing the more costly + * read-compare-update. + */ + void preloadSegment(ImmutableSegment immutableSegment); + /** * Removes the dedup metadata for the given segment. */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java index 5c0bd1d830..949f7ab669 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java @@ -36,6 +36,8 @@ public interface TableDedupMetadataManager extends Closeable { */ PartitionDedupMetadataManager getOrCreatePartitionManager(int partitionId); + boolean isEnablePreload(); + /** * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted. */ diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java new file mode 100644 index 0000000000..475d4a3929 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.dedup; + +import java.io.IOException; +import java.util.Iterator; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.config.table.TableConfig; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class BasePartitionDedupMetadataManagerTest { + @Test + public void testPreloadSegments() + throws IOException { + String realtimeTableName = "testTable_REALTIME"; + DedupContext dedupContext = mock(DedupContext.class); + when(dedupContext.isPreloadEnabled()).thenReturn(true); + TableDataManager tableDataManager = mock(TableDataManager.class); + when(dedupContext.getTableDataManager()).thenReturn(tableDataManager); + IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class); + when(indexLoadingConfig.getTableConfig()).thenReturn(mock(TableConfig.class)); + + try (DummyPartitionDedupMetadataManager dedupMetadataManager = new DummyPartitionDedupMetadataManager( + realtimeTableName, 0, dedupContext)) { + assertTrue(dedupMetadataManager.isPreloading()); + dedupMetadataManager.preloadSegments(indexLoadingConfig); + assertFalse(dedupMetadataManager.isPreloading()); + dedupMetadataManager.stop(); + } + } + + private static class DummyPartitionDedupMetadataManager extends BasePartitionDedupMetadataManager { + + protected DummyPartitionDedupMetadataManager(String tableNameWithType, int partitionId, DedupContext context) { + super(tableNameWithType, partitionId, context); + } + + @Override + protected void doPreloadSegment(ImmutableSegment segment, Iterator<DedupRecordInfo> dedupRecordInfoIterator) { + } + + @Override + protected void doAddOrReplaceSegment(@Nullable IndexSegment oldSegment, IndexSegment newSegment, + Iterator<DedupRecordInfo> dedupRecordInfoIteratorOfNewSegment) { + } + + @Override + protected void doRemoveSegment(IndexSegment segment, Iterator<DedupRecordInfo> dedupRecordInfoIterator) { + } + + @Override + protected void doRemoveExpiredPrimaryKeys() { + } + + @Override + protected long getNumPrimaryKeys() { + return 0; + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java index c0697eb4c3..2823d68d34 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.TreeMap; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; @@ -67,8 +66,7 @@ public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest { _dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class)) .setPrimaryKeyColumns(List.of("primaryKeyColumn")).setMetadataTTL(METADATA_TTL) .setDedupTimeColumn(DEDUP_TIME_COLUMN_NAME).setTableIndexDir(mock(File.class)) - .setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class)) - .setTableIndexDir(TEMP_DIR); + .setTableDataManager(mock(TableDataManager.class)).setTableIndexDir(TEMP_DIR); } @AfterMethod @@ -81,8 +79,7 @@ public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest { DedupContext.Builder dedupContextBuider = new DedupContext.Builder(); dedupContextBuider.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class)) .setPrimaryKeyColumns(List.of("primaryKeyColumn")).setHashFunction(HashFunction.NONE).setMetadataTTL(1) - .setDedupTimeColumn(null).setTableIndexDir(mock(File.class)).setTableDataManager(mock(TableDataManager.class)) - .setServerMetrics(mock(ServerMetrics.class)); + .setDedupTimeColumn(null).setTableIndexDir(mock(File.class)).setTableDataManager(mock(TableDataManager.class)); DedupContext dedupContext = dedupContextBuider.build(); assertThrows(IllegalArgumentException.class, () -> new ConcurrentMapPartitionDedupMetadataManager(DedupTestUtils.REALTIME_TABLE_NAME, 0, dedupContext)); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java index f7eadeaf09..90c68f1f42 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; @@ -58,8 +57,7 @@ public class ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest { _dedupContextBuilder = new DedupContext.Builder(); _dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class)) .setPrimaryKeyColumns(List.of("primaryKeyColumn")).setTableIndexDir(mock(File.class)) - .setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class)) - .setTableIndexDir(TEMP_DIR); + .setTableDataManager(mock(TableDataManager.class)).setTableIndexDir(TEMP_DIR); } @AfterMethod diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java new file mode 100644 index 0000000000..f3247c8227 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.dedup; + +import com.google.common.collect.Lists; +import java.io.File; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.spi.config.table.DedupConfig; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; + + +public class TableDedupMetadataManagerFactoryTest { + @Test + public void testEnablePreload() { + DedupConfig dedupConfig = + new DedupConfig(true, HashFunction.MD5, null, Collections.emptyMap(), 10, "timeCol", true); + Schema schema = + new Schema.SchemaBuilder().setSchemaName("mytable").addSingleValueDimension("myCol", FieldSpec.DataType.STRING) + .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build(); + TableConfig tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("mytable").setDedupConfig(dedupConfig).build(); + + // Preloading is not enabled as there is no preloading thread. + TableDataManager tableDataManager = mock(TableDataManager.class); + when(tableDataManager.getTableDataDir()).thenReturn(new File("mytable")); + when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(null); + TableDedupMetadataManager tableDedupMetadataManager = + TableDedupMetadataManagerFactory.create(tableConfig, schema, tableDataManager, null); + assertNotNull(tableDedupMetadataManager); + assertFalse(tableDedupMetadataManager.isEnablePreload()); + + // Enabled as enablePreload is true and there is preloading thread. + tableDataManager = mock(TableDataManager.class); + when(tableDataManager.getTableDataDir()).thenReturn(new File("mytable")); + when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(mock(ExecutorService.class)); + tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(tableConfig, schema, tableDataManager, null); + assertNotNull(tableDedupMetadataManager); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java index 8c4594dc91..b4544979e3 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java @@ -65,7 +65,7 @@ public class MutableSegmentDedupeTest { .setDedupConfig(new DedupConfig(dedupEnabled, HashFunction.NONE)).build(); CompositeTransformer recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema); File jsonFile = new File(dataResourceUrl.getFile()); - DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null, null, metadataTTL, dedupTimeColumn); + DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null, null, metadataTTL, dedupTimeColumn, false); PartitionDedupMetadataManager partitionDedupMetadataManager = (dedupEnabled) ? getTableDedupMetadataManager(schema, dedupConfig).getOrCreatePartitionManager(0) : null; _mutableSegmentImpl = diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java index 78d7b3b9f0..dfc8151e35 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonPropertyDescription; import java.util.Map; import org.apache.pinot.spi.config.BaseJsonConfig; + public class DedupConfig extends BaseJsonConfig { @JsonPropertyDescription("Whether dedup is enabled or not.") private final boolean _dedupEnabled; @@ -43,9 +44,12 @@ public class DedupConfig extends BaseJsonConfig { + " from the table config will be used.") private final String _dedupTimeColumn; + @JsonPropertyDescription("Whether to preload segments for fast dedup metadata recovery") + private final boolean _enablePreload; + public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true) boolean dedupEnabled, @JsonProperty(value = "hashFunction") HashFunction hashFunction) { - this(dedupEnabled, hashFunction, null, null, 0, null); + this(dedupEnabled, hashFunction, null, null, 0, null, false); } @JsonCreator @@ -54,13 +58,15 @@ public class DedupConfig extends BaseJsonConfig { @JsonProperty(value = "metadataManagerClass") String metadataManagerClass, @JsonProperty(value = "metadataManagerConfigs") Map<String, String> metadataManagerConfigs, @JsonProperty(value = "metadataTTL") double metadataTTL, - @JsonProperty(value = "dedupTimeColumn") String dedupTimeColumn) { + @JsonProperty(value = "dedupTimeColumn") String dedupTimeColumn, + @JsonProperty(value = "enablePreload") boolean enablePreload) { _dedupEnabled = dedupEnabled; _hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction; _metadataManagerClass = metadataManagerClass; _metadataManagerConfigs = metadataManagerConfigs; _metadataTTL = metadataTTL; _dedupTimeColumn = dedupTimeColumn; + _enablePreload = enablePreload; } public HashFunction getHashFunction() { @@ -86,4 +92,8 @@ public class DedupConfig extends BaseJsonConfig { public String getDedupTimeColumn() { return _dedupTimeColumn; } + + public boolean isEnablePreload() { + return _enablePreload; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org