This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 517b8813b8 For dedup/partial-upsert, check all segments loaded when creating the consuming segment (#8923) 517b8813b8 is described below commit 517b8813b8029bfcb75f90db043817256efce997 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Sun Jun 19 12:10:23 2022 -0700 For dedup/partial-upsert, check all segments loaded when creating the consuming segment (#8923) --- .../manager/realtime/RealtimeTableDataManager.java | 181 +++++++++++---------- .../local/dedup/PartitionDedupMetadataManager.java | 73 +++------ .../local/dedup/TableDedupMetadataManager.java | 9 +- .../segment/local/upsert/PartialUpsertHandler.java | 29 +--- .../upsert/PartitionUpsertMetadataManager.java | 13 +- .../local/upsert/TableUpsertMetadataManager.java | 4 + .../local/utils/tablestate/TableStateUtils.java | 18 +- .../dedup/PartitionDedupMetadataManagerTest.java | 109 +++++-------- .../mutable/MutableSegmentDedupeTest.java | 19 +-- .../local/upsert/PartialUpsertHandlerTest.java | 16 +- 10 files changed, 186 insertions(+), 285 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 36740212fc..195e5e023e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -20,7 +20,6 @@ package org.apache.pinot.core.data.manager.realtime; import com.google.common.base.Preconditions; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.net.URI; import java.util.HashMap; @@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; @@ -64,10 +64,10 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager; import org.apache.pinot.segment.local.utils.RecordInfo; import org.apache.pinot.segment.local.utils.SchemaUtils; +import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; import org.apache.pinot.spi.config.table.DedupConfig; -import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -115,9 +115,10 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // likely that we get fresh data each time instead of multiple copies of roughly same data. private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30; - private UpsertConfig.Mode _upsertMode; - private TableUpsertMetadataManager _tableUpsertMetadataManager; + private final AtomicBoolean _allSegmentsLoaded = new AtomicBoolean(); + private TableDedupMetadataManager _tableDedupMetadataManager; + private TableUpsertMetadataManager _tableUpsertMetadataManager; private List<String> _primaryKeyColumns; private String _upsertComparisonColumn; @@ -133,9 +134,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { try { _statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile); } catch (IOException | ClassNotFoundException e) { - _logger - .error("Error reading history object for table {} from {}", _tableNameWithType, statsFile.getAbsolutePath(), - e); + _logger.error("Error reading history object for table {} from {}", _tableNameWithType, + statsFile.getAbsolutePath(), e); File savedFile = new File(_tableDataDir, STATS_FILE_NAME + "." + UUID.randomUUID()); try { FileUtils.moveFile(statsFile, savedFile); @@ -156,64 +156,62 @@ public class RealtimeTableDataManager extends BaseTableDataManager { String consumerDirPath = getConsumerDir(); File consumerDir = new File(consumerDirPath); + if (consumerDir.exists()) { + File[] segmentFiles = consumerDir.listFiles((dir, name) -> !name.equals(STATS_FILE_NAME)); + Preconditions.checkState(segmentFiles != null, "Failed to list segment files from consumer dir: {} for table: {}", + consumerDirPath, _tableNameWithType); + for (File file : segmentFiles) { + if (file.delete()) { + _logger.info("Deleted old file {}", file.getAbsolutePath()); + } else { + _logger.error("Cannot delete file {}", file.getAbsolutePath()); + } + } + } - // NOTE: Upsert has to be set up when starting the server. Changing the table config without restarting the server - // won't enable/disable the upsert on the fly. + // Set up dedup/upsert metadata manager + // NOTE: Dedup/upsert has to be set up when starting the server. Changing the table config without restarting the + // server won't enable/disable them on the fly. TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); - _upsertMode = tableConfig.getUpsertMode(); - if (tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled()) { + + DedupConfig dedupConfig = tableConfig.getDedupConfig(); + boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled(); + if (dedupEnabled) { Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); + _primaryKeyColumns = schema.getPrimaryKeyColumns(); - DedupConfig dedupConfig = tableConfig.getDedupConfig(); - HashFunction dedupHashFunction = dedupConfig.getHashFunction(); - _tableDedupMetadataManager = - new TableDedupMetadataManager(_helixManager, _tableNameWithType, _primaryKeyColumns, _serverMetrics, - dedupHashFunction); + Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns), + "Primary key columns must be configured for dedup"); + _tableDedupMetadataManager = new TableDedupMetadataManager(_tableNameWithType, _primaryKeyColumns, _serverMetrics, + dedupConfig.getHashFunction()); } - if (isUpsertEnabled()) { - UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); - assert upsertConfig != null; + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + if (upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) { + Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both enabled for table: %s", + _tableUpsertMetadataManager); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); - PartialUpsertHandler partialUpsertHandler = null; - if (isPartialUpsertEnabled()) { - String comparisonColumn = upsertConfig.getComparisonColumn(); - if (comparisonColumn == null) { - comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName(); - } - partialUpsertHandler = new PartialUpsertHandler(_helixManager, _tableNameWithType, schema, - upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(), - comparisonColumn); - } - HashFunction hashFunction = upsertConfig.getHashFunction(); - _tableUpsertMetadataManager = - new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, partialUpsertHandler, hashFunction); _primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns), "Primary key columns must be configured for upsert"); String comparisonColumn = upsertConfig.getComparisonColumn(); _upsertComparisonColumn = comparisonColumn != null ? comparisonColumn : tableConfig.getValidationConfig().getTimeColumnName(); - } - if (consumerDir.exists()) { - File[] segmentFiles = consumerDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return !name.equals(STATS_FILE_NAME); - } - }); - for (File file : segmentFiles) { - if (file.delete()) { - _logger.info("Deleted old file {}", file.getAbsolutePath()); - } else { - _logger.error("Cannot delete file {}", file.getAbsolutePath()); - } + PartialUpsertHandler partialUpsertHandler = null; + if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { + assert upsertConfig.getPartialUpsertStrategies() != null; + partialUpsertHandler = new PartialUpsertHandler(schema, upsertConfig.getPartialUpsertStrategies(), + upsertConfig.getDefaultPartialUpsertStrategy(), _upsertComparisonColumn); } + + _tableUpsertMetadataManager = + new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, partialUpsertHandler, + upsertConfig.getHashFunction()); } } @@ -266,11 +264,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } public boolean isUpsertEnabled() { - return _upsertMode != UpsertConfig.Mode.NONE; + return _tableUpsertMetadataManager != null; } public boolean isPartialUpsertEnabled() { - return _upsertMode == UpsertConfig.Mode.PARTIAL; + return _tableUpsertMetadataManager != null && _tableUpsertMetadataManager.isPartialUpsertEnabled(); } /* @@ -358,8 +356,19 @@ public class RealtimeTableDataManager extends BaseTableDataManager { _tableUpsertMetadataManager != null ? _tableUpsertMetadataManager.getOrCreatePartitionManager( partitionGroupId) : null; PartitionDedupMetadataManager partitionDedupMetadataManager = - _tableDedupMetadataManager != null ? _tableDedupMetadataManager - .getOrCreatePartitionManager(partitionGroupId) : null; + _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) + : null; + // For dedup and partial-upsert, wait for all segments loaded before creating the consuming segment + if (isDedupEnabled() || isPartialUpsertEnabled()) { + if (!_allSegmentsLoaded.get()) { + synchronized (_allSegmentsLoaded) { + if (!_allSegmentsLoaded.get()) { + TableStateUtils.waitForAllSegmentsLoaded(_helixManager, _tableNameWithType); + _allSegmentsLoaded.set(true); + } + } + } + } segmentDataManager = new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager, @@ -390,10 +399,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private void buildDedupMeta(ImmutableSegmentImpl immutableSegment) { // TODO(saurabh) refactor commons code with handleUpsert String segmentName = immutableSegment.getSegmentName(); - Integer partitionGroupId = SegmentUtils - .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0)); - Preconditions.checkNotNull(partitionGroupId, String - .format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName, + Integer partitionGroupId = + SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, + _primaryKeyColumns.get(0)); + Preconditions.checkNotNull(partitionGroupId, + String.format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName, _tableNameWithType)); PartitionDedupMetadataManager partitionDedupMetadataManager = _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId); @@ -403,10 +413,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private void handleUpsert(ImmutableSegmentImpl immutableSegment) { String segmentName = immutableSegment.getSegmentName(); - Integer partitionGroupId = SegmentUtils - .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0)); - Preconditions.checkNotNull(partitionGroupId, String - .format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName, + Integer partitionGroupId = + SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, + _primaryKeyColumns.get(0)); + Preconditions.checkNotNull(partitionGroupId, + String.format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName, _tableNameWithType)); PartitionUpsertMetadataManager partitionUpsertMetadataManager = _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId); @@ -417,37 +428,35 @@ public class RealtimeTableDataManager extends BaseTableDataManager { for (String primaryKeyColumn : _primaryKeyColumns) { columnToReaderMap.put(primaryKeyColumn, new PinotSegmentColumnReader(immutableSegment, primaryKeyColumn)); } - columnToReaderMap - .put(_upsertComparisonColumn, new PinotSegmentColumnReader(immutableSegment, _upsertComparisonColumn)); + columnToReaderMap.put(_upsertComparisonColumn, + new PinotSegmentColumnReader(immutableSegment, _upsertComparisonColumn)); int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs(); int numPrimaryKeyColumns = _primaryKeyColumns.size(); - Iterator<RecordInfo> recordInfoIterator = - new Iterator<RecordInfo>() { - private int _docId = 0; + Iterator<RecordInfo> recordInfoIterator = new Iterator<RecordInfo>() { + private int _docId = 0; - @Override - public boolean hasNext() { - return _docId < numTotalDocs; - } + @Override + public boolean hasNext() { + return _docId < numTotalDocs; + } - @Override - public RecordInfo next() { - Object[] values = new Object[numPrimaryKeyColumns]; - for (int i = 0; i < numPrimaryKeyColumns; i++) { - Object value = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId); - if (value instanceof byte[]) { - value = new ByteArray((byte[]) value); - } - values[i] = value; - } - PrimaryKey primaryKey = new PrimaryKey(values); - Object upsertComparisonValue = columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId); - Preconditions.checkState(upsertComparisonValue instanceof Comparable, - "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); - return new RecordInfo(primaryKey, _docId++, - (Comparable) upsertComparisonValue); + @Override + public RecordInfo next() { + Object[] values = new Object[numPrimaryKeyColumns]; + for (int i = 0; i < numPrimaryKeyColumns; i++) { + Object value = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId); + if (value instanceof byte[]) { + value = new ByteArray((byte[]) value); } - }; + values[i] = value; + } + PrimaryKey primaryKey = new PrimaryKey(values); + Object upsertComparisonValue = columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId); + Preconditions.checkState(upsertComparisonValue instanceof Comparable, + "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); + return new RecordInfo(primaryKey, _docId++, (Comparable) upsertComparisonValue); + } + }; partitionUpsertMetadataManager.addSegment(immutableSegment, recordInfoIterator); } @@ -526,8 +535,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) { return CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()) - || CommonConstants.HTTPS_PROTOCOL - .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()); + || CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase( + tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()); } private void downloadSegmentFromPeer(String segmentName, String downloadScheme, diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java index 353d33c951..258aad7a5d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java @@ -24,25 +24,17 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.local.utils.HashUtils; -import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.data.readers.PrimaryKey; import org.apache.pinot.spi.utils.ByteArray; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PartitionDedupMetadataManager { - private static final Logger LOGGER = LoggerFactory.getLogger(PartitionDedupMetadataManager.class); - private static boolean _allSegmentsLoaded; - - private final HelixManager _helixManager; private final String _tableNameWithType; private final List<String> _primaryKeyColumns; private final int _partitionId; @@ -52,9 +44,8 @@ public class PartitionDedupMetadataManager { @VisibleForTesting final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>(); - public PartitionDedupMetadataManager(HelixManager helixManager, String tableNameWithType, - List<String> primaryKeyColumns, int partitionId, ServerMetrics serverMetrics, HashFunction hashFunction) { - _helixManager = helixManager; + public PartitionDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId, + ServerMetrics serverMetrics, HashFunction hashFunction) { _tableNameWithType = tableNameWithType; _primaryKeyColumns = primaryKeyColumns; _partitionId = partitionId; @@ -64,7 +55,7 @@ public class PartitionDedupMetadataManager { public void addSegment(IndexSegment segment) { // Add all PKs to _primaryKeyToSegmentMap - Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment, _primaryKeyColumns); + Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment); while (primaryKeyIterator.hasNext()) { PrimaryKey pk = primaryKeyIterator.next(); _primaryKeyToSegmentMap.put(HashUtils.hashPrimaryKey(pk, _hashFunction), segment); @@ -75,30 +66,29 @@ public class PartitionDedupMetadataManager { public void removeSegment(IndexSegment segment) { // TODO(saurabh): Explain reload scenario here - Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment, _primaryKeyColumns); + Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment); while (primaryKeyIterator.hasNext()) { PrimaryKey pk = primaryKeyIterator.next(); - _primaryKeyToSegmentMap.compute(HashUtils.hashPrimaryKey(pk, _hashFunction), - (primaryKey, currentSegment) -> { - if (currentSegment == segment) { - return null; - } else { - return currentSegment; - } - }); + _primaryKeyToSegmentMap.compute(HashUtils.hashPrimaryKey(pk, _hashFunction), (primaryKey, currentSegment) -> { + if (currentSegment == segment) { + return null; + } else { + return currentSegment; + } + }); } _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT, _primaryKeyToSegmentMap.size()); } @VisibleForTesting - public static Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment, List<String> primaryKeyColumns) { + Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment) { Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>(); - for (String primaryKeyColumn : primaryKeyColumns) { + for (String primaryKeyColumn : _primaryKeyColumns) { columnToReaderMap.put(primaryKeyColumn, new PinotSegmentColumnReader(segment, primaryKeyColumn)); } int numTotalDocs = segment.getSegmentMetadata().getTotalDocs(); - int numPrimaryKeyColumns = primaryKeyColumns.size(); + int numPrimaryKeyColumns = _primaryKeyColumns.size(); return new Iterator<PrimaryKey>() { private int _docId = 0; @@ -111,7 +101,7 @@ public class PartitionDedupMetadataManager { public PrimaryKey next() { Object[] values = new Object[numPrimaryKeyColumns]; for (int i = 0; i < numPrimaryKeyColumns; i++) { - Object value = columnToReaderMap.get(primaryKeyColumns.get(i)).getValue(_docId); + Object value = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId); if (value instanceof byte[]) { value = new ByteArray((byte[]) value); } @@ -123,34 +113,13 @@ public class PartitionDedupMetadataManager { }; } - private synchronized void waitTillAllSegmentsLoaded() { - if (_allSegmentsLoaded) { - return; - } - - while (!TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType)) { - LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType); - try { - //noinspection BusyWait - Thread.sleep(1000L); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - _allSegmentsLoaded = true; - } - public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment) { - if (!_allSegmentsLoaded) { - waitTillAllSegmentsLoaded(); + boolean present = + _primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment) != null; + if (!present) { + _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT, + _primaryKeyToSegmentMap.size()); } - - boolean result = - (_primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment) - != null); - _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT, - _primaryKeyToSegmentMap.size()); - - return result; + return present; } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java index 6e25974970..dedcebfff0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java @@ -21,22 +21,19 @@ package org.apache.pinot.segment.local.dedup; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.spi.config.table.HashFunction; public class TableDedupMetadataManager { private final Map<Integer, PartitionDedupMetadataManager> _partitionMetadataManagerMap = new ConcurrentHashMap<>(); - private final HelixManager _helixManager; private final String _tableNameWithType; private final List<String> _primaryKeyColumns; private final ServerMetrics _serverMetrics; private final HashFunction _hashFunction; - public TableDedupMetadataManager(HelixManager helixManager, String tableNameWithType, List<String> primaryKeyColumns, + public TableDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, ServerMetrics serverMetrics, HashFunction hashFunction) { - _helixManager = helixManager; _tableNameWithType = tableNameWithType; _primaryKeyColumns = primaryKeyColumns; _serverMetrics = serverMetrics; @@ -45,7 +42,7 @@ public class TableDedupMetadataManager { public PartitionDedupMetadataManager getOrCreatePartitionManager(int partitionId) { return _partitionMetadataManagerMap.computeIfAbsent(partitionId, - k -> new PartitionDedupMetadataManager(_helixManager, _tableNameWithType, _primaryKeyColumns, k, - _serverMetrics, _hashFunction)); + k -> new PartitionDedupMetadataManager(_tableNameWithType, _primaryKeyColumns, k, _serverMetrics, + _hashFunction)); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 720dc8a181..4a1cfad39f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -20,29 +20,22 @@ package org.apache.pinot.segment.local.upsert; import java.util.HashMap; import java.util.Map; -import org.apache.helix.HelixManager; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; -import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; + /** * Handler for partial-upsert. */ public class PartialUpsertHandler { // _column2Mergers maintains the mapping of merge strategies per columns. - private final HelixManager _helixManager; - private final String _tableNameWithType; private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>(); - private boolean _allSegmentsLoaded; - public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType, Schema schema, - Map<String, UpsertConfig.Strategy> partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy, - String comparisonColumn) { - _helixManager = helixManager; - _tableNameWithType = tableNameWithType; + public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies, + UpsertConfig.Strategy defaultPartialUpsertStrategy, String comparisonColumn) { for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) { _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); } @@ -56,18 +49,6 @@ public class PartialUpsertHandler { } } - /** - * Returns {@code true} if all segments assigned to the current instance are loaded, {@code false} otherwise. - * Consuming segment should perform this check to ensure all previous records are loaded before inserting new records. - */ - public synchronized boolean isAllSegmentsLoaded() { - if (_allSegmentsLoaded) { - return true; - } - _allSegmentsLoaded = TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType); - return _allSegmentsLoaded; - } - /** * Merges 2 records and returns the merged record. * We used a map to indicate all configured fields for partial upsert. For these fields @@ -91,8 +72,8 @@ public class PartialUpsertHandler { newRecord.putValue(column, previousRecord.getValue(column)); newRecord.removeNullValueField(column); } else { - newRecord - .putValue(column, entry.getValue().merge(previousRecord.getValue(column), newRecord.getValue(column))); + newRecord.putValue(column, + entry.getValue().merge(previousRecord.getValue(column), newRecord.getValue(column))); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java index 02289cd1c1..041a86443a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java @@ -61,7 +61,7 @@ import org.slf4j.LoggerFactory; * </li> * </ul> */ -@SuppressWarnings({"rawtypes", "unchecked"}) +@SuppressWarnings("unchecked") @ThreadSafe public class PartitionUpsertMetadataManager { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class); @@ -204,17 +204,6 @@ public class PartitionUpsertMetadataManager { return record; } - // Ensure all previous records are loaded before inserting new records - while (!_partialUpsertHandler.isAllSegmentsLoaded()) { - LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType); - try { - //noinspection BusyWait - Thread.sleep(1000L); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.get(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction)); if (currentRecordLocation != null) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java index 6ccd9315a6..384268a494 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java @@ -50,4 +50,8 @@ public class TableUpsertMetadataManager { k -> new PartitionUpsertMetadataManager(_tableNameWithType, k, _serverMetrics, _partialUpsertHandler, _hashFunction)); } + + public boolean isPartialUpsertEnabled() { + return _partialUpsertHandler != null; + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java index b83ac7d51d..2392a0bcd1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java @@ -29,6 +29,7 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class TableStateUtils { private static final Logger LOGGER = LoggerFactory.getLogger(TableStateUtils.class); @@ -72,8 +73,8 @@ public class TableStateUtils { String actualState = currentStateMap.get(segmentName); if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(actualState)) { if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(actualState)) { - LOGGER - .error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType, expectedState); + LOGGER.error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType, + expectedState); } else { LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, actual: {}", segmentName, tableNameWithType, expectedState, actualState); @@ -85,4 +86,17 @@ public class TableStateUtils { LOGGER.info("All segments loaded for table: {}", tableNameWithType); return true; } + + public static void waitForAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) { + try { + while (!TableStateUtils.isAllSegmentsLoaded(helixManager, tableNameWithType)) { + LOGGER.info("Sleeping 1 second waiting for all segments loaded for table: {}", tableNameWithType); + //noinspection BusyWait + Thread.sleep(1000L); + } + } catch (Exception e) { + throw new RuntimeException( + "Caught exception while waiting for all segments loaded for table: " + tableNameWithType, e); + } + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java index 124aad1c63..39f7d6ae98 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java @@ -19,27 +19,22 @@ package org.apache.pinot.segment.local.dedup; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.utils.HashUtils; import org.apache.pinot.segment.local.utils.RecordInfo; -import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.data.readers.PrimaryKey; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.mockito.MockedStatic; import org.testng.Assert; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertSame; @@ -49,19 +44,12 @@ public class PartitionDedupMetadataManagerTest { private static final String RAW_TABLE_NAME = "testTable"; private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); - @BeforeClass - public void init() { - MockedStatic mocked = mockStatic(TableStateUtils.class); - mocked.when(() -> TableStateUtils.isAllSegmentsLoaded(any(), any())).thenReturn(true); - } - @Test public void verifyAddRemoveSegment() { HashFunction hashFunction = HashFunction.NONE; - PartitionDedupMetadataManager partitionDedupMetadataManager = - new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0, - mock(ServerMetrics.class), hashFunction); - Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap; + TestMetadataManager metadataManager = + new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, mock(ServerMetrics.class), hashFunction); + Map<Object, IndexSegment> recordLocationMap = metadataManager._primaryKeyToSegmentMap; // Add the first segment List<PrimaryKey> pkList1 = new ArrayList<>(); @@ -71,41 +59,24 @@ public class PartitionDedupMetadataManagerTest { pkList1.add(getPrimaryKey(0)); pkList1.add(getPrimaryKey(1)); pkList1.add(getPrimaryKey(0)); + metadataManager._primaryKeyIterator = pkList1.iterator(); ImmutableSegmentImpl segment1 = mockSegment(1); - MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class); - mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any())) - .thenReturn(pkList1.iterator()); - - partitionDedupMetadataManager.addSegment(segment1); + metadataManager.addSegment(segment1); checkRecordLocation(recordLocationMap, 0, segment1, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, hashFunction); checkRecordLocation(recordLocationMap, 2, segment1, hashFunction); - pkList1 = new ArrayList<>(); - pkList1.add(getPrimaryKey(0)); - pkList1.add(getPrimaryKey(1)); - pkList1.add(getPrimaryKey(2)); - pkList1.add(getPrimaryKey(0)); - pkList1.add(getPrimaryKey(1)); - pkList1.add(getPrimaryKey(0)); - - mocked.close(); - mocked = mockStatic(PartitionDedupMetadataManager.class); - mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any())) - .thenReturn(pkList1.iterator()); - - partitionDedupMetadataManager.removeSegment(segment1); + metadataManager._primaryKeyIterator = pkList1.iterator(); + metadataManager.removeSegment(segment1); Assert.assertEquals(recordLocationMap.size(), 0); - mocked.close(); } @Test public void verifyReloadSegment() { HashFunction hashFunction = HashFunction.NONE; - PartitionDedupMetadataManager partitionDedupMetadataManager = - new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0, - mock(ServerMetrics.class), hashFunction); - Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap; + TestMetadataManager metadataManager = + new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, mock(ServerMetrics.class), hashFunction); + Map<Object, IndexSegment> recordLocationMap = metadataManager._primaryKeyToSegmentMap; // Add the first segment List<PrimaryKey> pkList1 = new ArrayList<>(); @@ -115,45 +86,28 @@ public class PartitionDedupMetadataManagerTest { pkList1.add(getPrimaryKey(0)); pkList1.add(getPrimaryKey(1)); pkList1.add(getPrimaryKey(0)); + metadataManager._primaryKeyIterator = pkList1.iterator(); ImmutableSegmentImpl segment1 = mockSegment(1); - MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class); - mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any())) - .thenReturn(pkList1.iterator()); - - partitionDedupMetadataManager.addSegment(segment1); + metadataManager.addSegment(segment1); // Remove another segment with same PK rows - pkList1 = new ArrayList<>(); - pkList1.add(getPrimaryKey(0)); - pkList1.add(getPrimaryKey(1)); - pkList1.add(getPrimaryKey(2)); - pkList1.add(getPrimaryKey(0)); - pkList1.add(getPrimaryKey(1)); - pkList1.add(getPrimaryKey(0)); + metadataManager._primaryKeyIterator = pkList1.iterator(); ImmutableSegmentImpl segment2 = mockSegment(1); - - mocked.close(); - mocked = mockStatic(PartitionDedupMetadataManager.class); - mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any())) - .thenReturn(pkList1.iterator()); - - partitionDedupMetadataManager.removeSegment(segment2); + metadataManager.removeSegment(segment2); Assert.assertEquals(recordLocationMap.size(), 3); // Keys should still exist checkRecordLocation(recordLocationMap, 0, segment1, hashFunction); checkRecordLocation(recordLocationMap, 1, segment1, hashFunction); checkRecordLocation(recordLocationMap, 2, segment1, hashFunction); - mocked.close(); } @Test public void verifyAddRow() { HashFunction hashFunction = HashFunction.NONE; - PartitionDedupMetadataManager partitionDedupMetadataManager = - new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0, - mock(ServerMetrics.class), hashFunction); - Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap; + TestMetadataManager metadataManager = + new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, mock(ServerMetrics.class), hashFunction); + Map<Object, IndexSegment> recordLocationMap = metadataManager._primaryKeyToSegmentMap; // Add the first segment List<PrimaryKey> pkList1 = new ArrayList<>(); @@ -163,28 +117,25 @@ public class PartitionDedupMetadataManagerTest { pkList1.add(getPrimaryKey(0)); pkList1.add(getPrimaryKey(1)); pkList1.add(getPrimaryKey(0)); + metadataManager._primaryKeyIterator = pkList1.iterator(); ImmutableSegmentImpl segment1 = mockSegment(1); - MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class); - mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any())) - .thenReturn(pkList1.iterator()); - partitionDedupMetadataManager.addSegment(segment1); - mocked.close(); + metadataManager.addSegment(segment1); // Same PK exists RecordInfo recordInfo = mock(RecordInfo.class); when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(0)); ImmutableSegmentImpl segment2 = mockSegment(2); - Assert.assertTrue(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2)); + Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2)); checkRecordLocation(recordLocationMap, 0, segment1, hashFunction); // New PK when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3)); - Assert.assertFalse(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2)); + Assert.assertFalse(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2)); checkRecordLocation(recordLocationMap, 3, segment2, hashFunction); // Same PK as the one recently ingested when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3)); - Assert.assertTrue(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2)); + Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2)); } private static ImmutableSegmentImpl mockSegment(int sequenceNumber) { @@ -208,4 +159,18 @@ public class PartitionDedupMetadataManagerTest { assertNotNull(indexSegment); assertSame(indexSegment, segment); } + + private static class TestMetadataManager extends PartitionDedupMetadataManager { + Iterator<PrimaryKey> _primaryKeyIterator; + + TestMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId, + ServerMetrics serverMetrics, HashFunction hashFunction) { + super(tableNameWithType, primaryKeyColumns, partitionId, serverMetrics, hashFunction); + } + + @Override + Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment) { + return _primaryKeyIterator; + } + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java index 7acfbdccdc..6f310c863c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java @@ -22,12 +22,10 @@ package org.apache.pinot.segment.local.indexsegment.mutable; import java.io.File; import java.net.URL; import java.util.Collections; -import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager; import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer; -import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils; import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; @@ -38,28 +36,16 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mockStatic; - public class MutableSegmentDedupeTest { - private static final String SCHEMA_FILE_PATH = "data/test_dedup_schema.json"; private static final String DATA_FILE_PATH = "data/test_dedup_data.json"; private MutableSegmentImpl _mutableSegmentImpl; - @BeforeClass - public void init() { - MockedStatic mocked = mockStatic(TableStateUtils.class); - mocked.when(() -> TableStateUtils.isAllSegmentsLoaded(any(), any())).thenReturn(true); - } - private void setup(boolean dedupEnabled) throws Exception { URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH); @@ -70,9 +56,8 @@ public class MutableSegmentDedupeTest { CompositeTransformer recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema); File jsonFile = new File(dataResourceUrl.getFile()); PartitionDedupMetadataManager partitionDedupMetadataManager = - (dedupEnabled) ? new TableDedupMetadataManager(Mockito.mock(HelixManager.class), "testTable_REALTIME", - schema.getPrimaryKeyColumns(), Mockito.mock(ServerMetrics.class), - HashFunction.NONE).getOrCreatePartitionManager(0) : null; + (dedupEnabled) ? new TableDedupMetadataManager("testTable_REALTIME", schema.getPrimaryKeyColumns(), + Mockito.mock(ServerMetrics.class), HashFunction.NONE).getOrCreatePartitionManager(0) : null; _mutableSegmentImpl = MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), false, true, null, "secondsSinceEpoch", null, partitionDedupMetadataManager); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index 471a46c803..31a508e988 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -21,12 +21,10 @@ package org.apache.pinot.segment.local.upsert; import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import org.apache.helix.HelixManager; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; -import org.mockito.Mockito; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -38,19 +36,14 @@ public class PartialUpsertHandlerTest { @Test public void testMerge() { - HelixManager helixManager = Mockito.mock(HelixManager.class); - Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) .addSingleValueDimension("field1", FieldSpec.DataType.LONG) .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") .setPrimaryKeyColumns(Arrays.asList("pk")).build(); - - String realtimeTableName = "testTable_REALTIME"; Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>(); partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT); PartialUpsertHandler handler = - new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies, - UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch"); + new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch"); // both records are null. GenericRow previousRecord = new GenericRow(); @@ -97,19 +90,14 @@ public class PartialUpsertHandlerTest { @Test public void testMergeWithDefaultPartialUpsertStrategy() { - HelixManager helixManager = Mockito.mock(HelixManager.class); - Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) .addSingleValueDimension("field1", FieldSpec.DataType.LONG).addMetric("field2", FieldSpec.DataType.LONG) .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") .setPrimaryKeyColumns(Arrays.asList("pk")).build(); - - String realtimeTableName = "testTable_REALTIME"; Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>(); partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT); PartialUpsertHandler handler = - new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies, - UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch"); + new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch"); // previousRecord is null default value, while newRecord is not. GenericRow previousRecord = new GenericRow(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org