This is an automated email from the ASF dual-hosted git repository. yupeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7add772 Add option to store the hash of the upsert primary key (#7246) 7add772 is described below commit 7add77202e543f0932db6c0d8557cab9baa95bc4 Author: Yupeng Fu <yupe...@users.noreply.github.com> AuthorDate: Tue Aug 10 11:42:49 2021 -0700 Add option to store the hash of the upsert primary key (#7246) * add option to store the hash of the upsert primary key * use byte equality * fix ut * fix * fix integration tests * add header * comments * comments * comments * fix ut * comments --- .../common/utils/config/TableConfigSerDeTest.java | 3 +- .../controller/helix/PinotResourceManagerTest.java | 2 +- .../realtime/LLRealtimeSegmentDataManager.java | 1 + .../manager/realtime/RealtimeTableDataManager.java | 3 +- ...adataAndDictionaryAggregationPlanMakerTest.java | 6 +- .../tests/BaseClusterIntegrationTest.java | 2 +- .../indexsegment/mutable/MutableSegmentImpl.java | 5 +- .../local/realtime/impl/RealtimeSegmentConfig.java | 19 ++- .../upsert/PartitionUpsertMetadataManager.java | 28 +++- .../local/upsert/TableUpsertMetadataManager.java | 8 +- .../pinot/segment/local/utils/HashUtils.java | 25 ++-- .../mutable/MutableSegmentImplTestUtils.java | 7 +- .../MutableSegmentImplUpsertComparisonColTest.java | 7 +- .../mutable/MutableSegmentImplUpsertTest.java | 32 +++-- .../upsert/PartitionUpsertMetadataManagerTest.java | 148 +++++++++++++-------- .../pinot/segment/local/utils/HashUtilsTest.java | 24 ++-- .../segment/local/utils/TableConfigUtilsTest.java | 14 +- .../apache/pinot/spi/config/table/TableConfig.java | 5 + .../pinot/spi/config/table/UpsertConfig.java | 15 ++- .../apache/pinot/spi/data/readers/PrimaryKey.java | 5 + .../java/org/apache/pinot/spi/utils/ByteArray.java | 3 +- .../pinot/spi/config/table/UpsertConfigTest.java | 9 +- .../pinot/spi/data/readers/PrimaryKeyTest.java | 11 ++ 23 files changed, 253 insertions(+), 129 deletions(-) diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java index 08b364c..cb48ef5 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java @@ -246,7 +246,8 @@ public class TableConfigSerDeTest { } { // with upsert config - UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison"); + UpsertConfig upsertConfig = + new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", UpsertConfig.HashFunction.NONE); TableConfig tableConfig = tableConfigBuilder.setUpsertConfig(upsertConfig).build(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java index 58f993a..939cfe9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java @@ -68,7 +68,7 @@ public class PinotResourceManagerTest { realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING); realtimeTableConfig.getValidationConfig() .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)); - realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null)); + realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)); ControllerTestUtils.getHelixResourceManager().addTable(realtimeTableConfig); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 31b4151..2a9e4b5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1260,6 +1260,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { .setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled) .setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode()) .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager) + .setHashFunction(tableConfig.getHashFunction()) .setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn()); // Create message decoder diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 64fdb4d..1aae911 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -168,8 +168,9 @@ public class RealtimeTableDataManager extends BaseTableDataManager { partialUpsertHandler = new PartialUpsertHandler(_helixManager, _tableNameWithType, upsertConfig.getPartialUpsertStrategies()); } + UpsertConfig.HashFunction hashFunction = upsertConfig.getHashFunction(); _tableUpsertMetadataManager = - new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, partialUpsertHandler); + new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, partialUpsertHandler, hashFunction); _primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns), "Primary key columns must be configured for upsert"); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java index 956ad31..b691773 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java @@ -44,6 +44,7 @@ import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; @@ -123,8 +124,9 @@ public class MetadataAndDictionaryAggregationPlanMakerTest { _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap); ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class); _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap); - ((ImmutableSegmentImpl) _upsertIndexSegment) - .enableUpsert(new PartitionUpsertMetadataManager("testTable_REALTIME", 0, serverMetrics, null), + ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert( + new PartitionUpsertMetadataManager("testTable_REALTIME", 0, serverMetrics, null, + UpsertConfig.HashFunction.NONE), new ThreadSafeMutableRoaringBitmap()); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 50da64b..de4ff03 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -386,7 +386,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap)) .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1)) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null)).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build(); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index 73c6d4d..9cd4153 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -512,8 +512,9 @@ public class MutableSegmentImpl implements MutableSegment { private GenericRow handleUpsert(GenericRow row, int docId) { PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); Object upsertComparisonValue = row.getValue(_upsertComparisonColumn); - Preconditions.checkState(upsertComparisonValue instanceof Comparable, - "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); + Preconditions + .checkState(upsertComparisonValue instanceof Comparable, "Upsert comparison column: %s must be comparable", + _upsertComparisonColumn); return _partitionUpsertMetadataManager.updateRecord(this, new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue), row); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java index 43f9343..0a1252f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java @@ -57,6 +57,7 @@ public class RealtimeSegmentConfig { private final boolean _aggregateMetrics; private final boolean _nullHandlingEnabled; private final UpsertConfig.Mode _upsertMode; + private final UpsertConfig.HashFunction _hashFunction; private final String _upsertComparisonColumn; private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager; private final String _consumerDir; @@ -69,7 +70,7 @@ public class RealtimeSegmentConfig { RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, String consumerDir, - UpsertConfig.Mode upsertMode, String upsertComparisonColumn, + UpsertConfig.Mode upsertMode, String upsertComparisonColumn, UpsertConfig.HashFunction hashFunction, PartitionUpsertMetadataManager partitionUpsertMetadataManager) { _tableNameWithType = tableNameWithType; _segmentName = segmentName; @@ -96,6 +97,7 @@ public class RealtimeSegmentConfig { _nullHandlingEnabled = nullHandlingEnabled; _consumerDir = consumerDir; _upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE; + _hashFunction = hashFunction != null ? hashFunction : UpsertConfig.HashFunction.NONE; _upsertComparisonColumn = upsertComparisonColumn; _partitionUpsertMetadataManager = partitionUpsertMetadataManager; } @@ -205,6 +207,10 @@ public class RealtimeSegmentConfig { return _upsertMode; } + public UpsertConfig.HashFunction getHashFunction() { + return _hashFunction; + } + public String getUpsertComparisonColumn() { return _upsertComparisonColumn; } @@ -239,6 +245,7 @@ public class RealtimeSegmentConfig { private boolean _nullHandlingEnabled = false; private String _consumerDir; private UpsertConfig.Mode _upsertMode; + private UpsertConfig.HashFunction _hashFunction; private String _upsertComparisonColumn; private PartitionUpsertMetadataManager _partitionUpsertMetadataManager; @@ -378,11 +385,16 @@ public class RealtimeSegmentConfig { return this; } + public Builder setHashFunction(UpsertConfig.HashFunction hashFunction) { + _hashFunction = hashFunction; + return this; + } + public Builder setUpsertComparisonColumn(String upsertComparisonColumn) { _upsertComparisonColumn = upsertComparisonColumn; return this; } - + public Builder setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager partitionUpsertMetadataManager) { _partitionUpsertMetadataManager = partitionUpsertMetadataManager; return this; @@ -393,7 +405,8 @@ public class RealtimeSegmentConfig { _capacity, _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns, _textIndexColumns, _fstIndexColumns, _jsonIndexColumns, _h3IndexConfigs, _realtimeSegmentZKMetadata, _offHeap, _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics, - _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumn, _partitionUpsertMetadataManager); + _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumn, _hashFunction, + _partitionUpsertMetadataManager); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java index 6bd9d1b..64d3192 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java @@ -26,10 +26,13 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.segment.local.utils.HashUtils; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.apache.pinot.spi.utils.ByteArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,10 +69,11 @@ public class PartitionUpsertMetadataManager { private final int _partitionId; private final ServerMetrics _serverMetrics; private final PartialUpsertHandler _partialUpsertHandler; + private final UpsertConfig.HashFunction _hashFunction; // TODO(upsert): consider an off-heap KV store to persist this mapping to improve the recovery speed. @VisibleForTesting - final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); + final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>(); // Reused for reading previous record during partial upsert private final GenericRow _reuse = new GenericRow(); @@ -77,11 +81,12 @@ public class PartitionUpsertMetadataManager { private GenericRow _result; public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics, - @Nullable PartialUpsertHandler partialUpsertHandler) { + @Nullable PartialUpsertHandler partialUpsertHandler, UpsertConfig.HashFunction hashFunction) { _tableNameWithType = tableNameWithType; _partitionId = partitionId; _serverMetrics = serverMetrics; _partialUpsertHandler = partialUpsertHandler; + _hashFunction = hashFunction; } /** @@ -95,7 +100,8 @@ public class PartitionUpsertMetadataManager { while (recordInfoIterator.hasNext()) { RecordInfo recordInfo = recordInfoIterator.next(); - _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> { + _primaryKeyToRecordLocationMap + .compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction), (primaryKey, currentRecordLocation) -> { if (currentRecordLocation != null) { // Existing primary key @@ -176,7 +182,8 @@ public class PartitionUpsertMetadataManager { } _result = record; - _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> { + _primaryKeyToRecordLocationMap + .compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction), (primaryKey, currentRecordLocation) -> { if (currentRecordLocation != null) { // Existing primary key @@ -238,6 +245,19 @@ public class PartitionUpsertMetadataManager { _primaryKeyToRecordLocationMap.size()); } + protected static Object hashPrimaryKey(PrimaryKey primaryKey, UpsertConfig.HashFunction hashFunction) { + switch (hashFunction) { + case NONE: + return primaryKey; + case MD5: + return new ByteArray(HashUtils.hashMD5(primaryKey.asBytes())); + case MURMUR3: + return new ByteArray(HashUtils.hashMurmur3(primaryKey.asBytes())); + default: + throw new IllegalArgumentException(String.format("Unrecognized hash function %s", hashFunction)); + } + } + public static final class RecordInfo { private final PrimaryKey _primaryKey; private final int _docId; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java index ae8fb3f..d78cfc0 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.spi.config.table.UpsertConfig; /** @@ -34,16 +35,19 @@ public class TableUpsertMetadataManager { private final String _tableNameWithType; private final ServerMetrics _serverMetrics; private final PartialUpsertHandler _partialUpsertHandler; + private final UpsertConfig.HashFunction _hashFunction; public TableUpsertMetadataManager(String tableNameWithType, ServerMetrics serverMetrics, - @Nullable PartialUpsertHandler partialUpsertHandler) { + @Nullable PartialUpsertHandler partialUpsertHandler, UpsertConfig.HashFunction hashFunction) { _tableNameWithType = tableNameWithType; _serverMetrics = serverMetrics; _partialUpsertHandler = partialUpsertHandler; + _hashFunction = hashFunction; } public PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) { return _partitionMetadataManagerMap.computeIfAbsent(partitionId, - k -> new PartitionUpsertMetadataManager(_tableNameWithType, k, _serverMetrics, _partialUpsertHandler)); + k -> new PartitionUpsertMetadataManager(_tableNameWithType, k, _serverMetrics, _partialUpsertHandler, + _hashFunction)); } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HashUtils.java similarity index 57% copy from pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HashUtils.java index 97faad8..dfd9a5c 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HashUtils.java @@ -16,25 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.data.readers; +package org.apache.pinot.segment.local.utils; -import org.testng.annotations.Test; +import com.google.common.hash.Hashing; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; +public class HashUtils { + private HashUtils() { + } -public class PrimaryKeyTest { - - @Test - public void testPrimaryKeyComparison() { - PrimaryKey left = new PrimaryKey(new Object[]{"111", 2}); - PrimaryKey right = new PrimaryKey(new Object[]{"111", 2}); - assertEquals(left, right); - assertEquals(left.hashCode(), right.hashCode()); + public static byte[] hashMurmur3(byte[] bytes) { + return Hashing.murmur3_128().hashBytes(bytes).asBytes(); + } - right = new PrimaryKey(new Object[]{"222", 2}); - assertNotEquals(left, right); - assertNotEquals(left.hashCode(), right.hashCode()); + public static byte[] hashMD5(byte[] bytes) { + return Hashing.md5().hashBytes(bytes).asBytes(); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java index 4f52009..e88ec3c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java @@ -63,6 +63,8 @@ public class MutableSegmentImplTestUtils { UpsertConfig.Mode upsertMode = upsertConfig == null ? UpsertConfig.Mode.NONE : upsertConfig.getMode(); String comparisonColumn = upsertConfig == null ? null : upsertConfig.getComparisonColumn(); + UpsertConfig.HashFunction hashFunction = + upsertConfig == null ? UpsertConfig.HashFunction.NONE : upsertConfig.getHashFunction(); RealtimeSegmentConfig realtimeSegmentConfig = new RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME) .setStreamName(STEAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000) @@ -71,9 +73,8 @@ public class MutableSegmentImplTestUtils { .setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata()) .setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory) .setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode) - .setUpsertComparisonColumn(comparisonColumn) - .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager) - .build(); + .setUpsertComparisonColumn(comparisonColumn).setHashFunction(hashFunction) + .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).build(); return new MutableSegmentImpl(realtimeSegmentConfig, null); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java index b7ae126..5a08556 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java @@ -57,15 +57,16 @@ public class MutableSegmentImplUpsertComparisonColTest { URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH); _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable") - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, "offset")).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, "offset", null)).build(); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); _partitionUpsertMetadataManager = - new TableUpsertMetadataManager("testTable_REALTIME", Mockito.mock(ServerMetrics.class), null) + new TableUpsertMetadataManager("testTable_REALTIME", Mockito.mock(ServerMetrics.class), null, + UpsertConfig.HashFunction.NONE) .getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), - false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, "offset"), "secondsSinceEpoch", + false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, "offset", null), "secondsSinceEpoch", _partitionUpsertMetadataManager); GenericRow reuse = new GenericRow(); try (RecordReader recordReader = RecordReaderFactory diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java index 2b367cd..d62a6ed 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java @@ -37,35 +37,34 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.mockito.Mockito; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.testng.Assert; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class MutableSegmentImplUpsertTest { private static final String SCHEMA_FILE_PATH = "data/test_upsert_schema.json"; private static final String DATA_FILE_PATH = "data/test_upsert_data.json"; - private static CompositeTransformer _recordTransformer; - private static Schema _schema; - private static TableConfig _tableConfig; - private static MutableSegmentImpl _mutableSegmentImpl; - private static PartitionUpsertMetadataManager _partitionUpsertMetadataManager; + private CompositeTransformer _recordTransformer; + private Schema _schema; + private TableConfig _tableConfig; + private MutableSegmentImpl _mutableSegmentImpl; + private PartitionUpsertMetadataManager _partitionUpsertMetadataManager; - @BeforeClass - public void setup() + private void setup(UpsertConfig.HashFunction hashFunction) throws Exception { URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH); URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH); _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable") - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null)).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, hashFunction)).build(); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); _partitionUpsertMetadataManager = - new TableUpsertMetadataManager("testTable_REALTIME", Mockito.mock(ServerMetrics.class), null) + new TableUpsertMetadataManager("testTable_REALTIME", Mockito.mock(ServerMetrics.class), null, + hashFunction) .getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), - false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null), "secondsSinceEpoch", + false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, hashFunction), "secondsSinceEpoch", _partitionUpsertMetadataManager); GenericRow reuse = new GenericRow(); try (RecordReader recordReader = RecordReaderFactory @@ -80,7 +79,16 @@ public class MutableSegmentImplUpsertTest { } @Test - public void testUpsertIngestion() { + public void testHashFunctions() + throws Exception { + testUpsertIngestion(UpsertConfig.HashFunction.NONE); + testUpsertIngestion(UpsertConfig.HashFunction.MD5); + testUpsertIngestion(UpsertConfig.HashFunction.MURMUR3); + } + + private void testUpsertIngestion(UpsertConfig.HashFunction hashFunction) + throws Exception { + setup(hashFunction); ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap(); Assert.assertFalse(bitmap.contains(0)); Assert.assertTrue(bitmap.contains(1)); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java index 6812d63..ce91f01 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java @@ -26,9 +26,13 @@ import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.PrimaryKey; +import org.apache.pinot.spi.utils.ByteArray; +import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.Assert; import org.testng.annotations.Test; import static org.mockito.Mockito.mock; @@ -45,9 +49,15 @@ public class PartitionUpsertMetadataManagerTest { @Test public void testAddSegment() { + verifyAddSegment(UpsertConfig.HashFunction.NONE); + verifyAddSegment(UpsertConfig.HashFunction.MD5); + verifyAddSegment(UpsertConfig.HashFunction.MURMUR3); + } + + private void verifyAddSegment(UpsertConfig.HashFunction hashFunction) { PartitionUpsertMetadataManager upsertMetadataManager = - new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, mock(ServerMetrics.class), null); - Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, mock(ServerMetrics.class), null, hashFunction); + Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new ArrayList<>(); @@ -61,9 +71,9 @@ public class PartitionUpsertMetadataManagerTest { ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1); upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator()); // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} - checkRecordLocation(recordLocationMap, 0, segment1, 5, 100); - checkRecordLocation(recordLocationMap, 1, segment1, 4, 120); - checkRecordLocation(recordLocationMap, 2, segment1, 2, 100); + checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5}); // Add the second segment @@ -78,10 +88,10 @@ public class PartitionUpsertMetadataManagerTest { upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator()); // segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} - checkRecordLocation(recordLocationMap, 0, segment2, 0, 100); - checkRecordLocation(recordLocationMap, 1, segment1, 4, 120); - checkRecordLocation(recordLocationMap, 2, segment2, 2, 120); - checkRecordLocation(recordLocationMap, 3, segment2, 3, 80); + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction); + checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); + checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); @@ -92,26 +102,28 @@ public class PartitionUpsertMetadataManagerTest { // original segment1: 1 -> {4, 120} // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} - checkRecordLocation(recordLocationMap, 0, segment2, 0, 100); - checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120); - checkRecordLocation(recordLocationMap, 2, segment2, 2, 120); - checkRecordLocation(recordLocationMap, 3, segment2, 3, 80); + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); + checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); + checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); - assertSame(recordLocationMap.get(getPrimaryKey(1)).getSegment(), newSegment1); + assertSame(recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(1), hashFunction)) + .getSegment(), newSegment1); // Remove the original segment1 upsertMetadataManager.removeSegment(segment1); // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} // new segment1: 1 -> {4, 120} - checkRecordLocation(recordLocationMap, 0, segment2, 0, 100); - checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120); - checkRecordLocation(recordLocationMap, 2, segment2, 2, 120); - checkRecordLocation(recordLocationMap, 3, segment2, 3, 80); + checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction); + checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction); + checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); - assertSame(recordLocationMap.get(getPrimaryKey(1)).getSegment(), newSegment1); + assertSame(recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(1), hashFunction)) + .getSegment(), newSegment1); } private static ImmutableSegmentImpl mockSegment(int sequenceNumber, ThreadSafeMutableRoaringBitmap validDocIds) { @@ -130,9 +142,10 @@ public class PartitionUpsertMetadataManagerTest { return new PrimaryKey(new Object[]{value}); } - private static void checkRecordLocation(Map<PrimaryKey, RecordLocation> recordLocationMap, int keyValue, - IndexSegment segment, int docId, int timestamp) { - RecordLocation recordLocation = recordLocationMap.get(getPrimaryKey(keyValue)); + private static void checkRecordLocation(Map<Object, RecordLocation> recordLocationMap, int keyValue, + IndexSegment segment, int docId, int timestamp, UpsertConfig.HashFunction hashFunction) { + RecordLocation recordLocation = + recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(keyValue), hashFunction)); assertNotNull(recordLocation); assertSame(recordLocation.getSegment(), segment); assertEquals(recordLocation.getDocId(), docId); @@ -141,9 +154,15 @@ public class PartitionUpsertMetadataManagerTest { @Test public void testUpdateRecord() { + verifyUpdateRecord(UpsertConfig.HashFunction.NONE); + verifyUpdateRecord(UpsertConfig.HashFunction.MD5); + verifyUpdateRecord(UpsertConfig.HashFunction.MURMUR3); + } + + private void verifyUpdateRecord(UpsertConfig.HashFunction hashFunction) { PartitionUpsertMetadataManager upsertMetadataManager = - new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, mock(ServerMetrics.class), null); - Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, mock(ServerMetrics.class), null, hashFunction); + Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add the first segment // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100} @@ -160,56 +179,62 @@ public class PartitionUpsertMetadataManagerTest { IndexSegment segment2 = mockSegment(1, validDocIds2); GenericRow row = mock(GenericRow.class); - upsertMetadataManager - .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100), row); + upsertMetadataManager.updateRecord(segment2, + new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100), row); // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100} // segment2: 3 -> {0, 100} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100); - checkRecordLocation(recordLocationMap, 1, segment1, 1, 120); - checkRecordLocation(recordLocationMap, 2, segment1, 2, 100); - checkRecordLocation(recordLocationMap, 3, segment2, 0, 100); + checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0}); - upsertMetadataManager - .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120), row); + upsertMetadataManager.updateRecord(segment2, + new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120), row); // segment1: 0 -> {0, 100}, 1 -> {1, 120} // segment2: 2 -> {1, 120}, 3 -> {0, 100} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100); - checkRecordLocation(recordLocationMap, 1, segment1, 1, 120); - checkRecordLocation(recordLocationMap, 2, segment2, 1, 120); - checkRecordLocation(recordLocationMap, 3, segment2, 0, 100); + checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); - upsertMetadataManager - .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100), row); + upsertMetadataManager.updateRecord(segment2, + new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100), row); // segment1: 0 -> {0, 100}, 1 -> {1, 120} // segment2: 2 -> {1, 120}, 3 -> {0, 100} - checkRecordLocation(recordLocationMap, 0, segment1, 0, 100); - checkRecordLocation(recordLocationMap, 1, segment1, 1, 120); - checkRecordLocation(recordLocationMap, 2, segment2, 1, 120); - checkRecordLocation(recordLocationMap, 3, segment2, 0, 100); + checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); - upsertMetadataManager - .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100), row); + upsertMetadataManager.updateRecord(segment2, + new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100), row); // segment1: 1 -> {1, 120} // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100} - checkRecordLocation(recordLocationMap, 0, segment2, 3, 100); - checkRecordLocation(recordLocationMap, 1, segment1, 1, 120); - checkRecordLocation(recordLocationMap, 2, segment2, 1, 120); - checkRecordLocation(recordLocationMap, 3, segment2, 0, 100); + checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction); + checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction); + checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction); + checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1}); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3}); } @Test public void testRemoveSegment() { + verifyRemoveSegment(UpsertConfig.HashFunction.NONE); + verifyRemoveSegment(UpsertConfig.HashFunction.MD5); + verifyRemoveSegment(UpsertConfig.HashFunction.MURMUR3); + } + + private void verifyRemoveSegment(UpsertConfig.HashFunction hashFunction) { PartitionUpsertMetadataManager upsertMetadataManager = - new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, mock(ServerMetrics.class), null); - Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, mock(ServerMetrics.class), null, hashFunction); + Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; // Add 2 segments // segment1: 0 -> {0, 100}, 1 -> {1, 100} @@ -232,8 +257,27 @@ public class PartitionUpsertMetadataManagerTest { // segment2: 2 -> {0, 100}, 3 -> {0, 100} assertNull(recordLocationMap.get(getPrimaryKey(0))); assertNull(recordLocationMap.get(getPrimaryKey(1))); - checkRecordLocation(recordLocationMap, 2, segment2, 0, 100); - checkRecordLocation(recordLocationMap, 3, segment2, 1, 100); + checkRecordLocation(recordLocationMap, 2, segment2, 0, 100, hashFunction); + checkRecordLocation(recordLocationMap, 3, segment2, 1, 100, hashFunction); assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); } + + @Test + public void testHashPrimaryKey() { + PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"}); + Assert.assertEquals(BytesUtils.toHexString( + ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, UpsertConfig.HashFunction.MD5)).getBytes()), + "58de44997505014e02982846a4d1cbbd"); + Assert.assertEquals(BytesUtils.toHexString( + ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, UpsertConfig.HashFunction.MURMUR3)).getBytes()), + "7e6b4a98296292a4012225fff037fa8c"); + // reorder + pk = new PrimaryKey(new Object[]{"uuid-3", "uuid-2", "uuid-1"}); + Assert.assertEquals(BytesUtils.toHexString( + ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, UpsertConfig.HashFunction.MD5)).getBytes()), + "d2df12c6dea7b83f965613614eee58e2"); + Assert.assertEquals(BytesUtils.toHexString( + ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, UpsertConfig.HashFunction.MURMUR3)).getBytes()), + "8d68b314cc0c8de4dbd55f4dad3c3e66"); + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/HashUtilsTest.java similarity index 60% copy from pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java copy to pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/HashUtilsTest.java index 97faad8..e704cb1 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/HashUtilsTest.java @@ -16,25 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.data.readers; +package org.apache.pinot.segment.local.utils; +import org.apache.pinot.spi.utils.BytesUtils; +import org.testng.Assert; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; - - -public class PrimaryKeyTest { +public class HashUtilsTest { @Test - public void testPrimaryKeyComparison() { - PrimaryKey left = new PrimaryKey(new Object[]{"111", 2}); - PrimaryKey right = new PrimaryKey(new Object[]{"111", 2}); - assertEquals(left, right); - assertEquals(left.hashCode(), right.hashCode()); - - right = new PrimaryKey(new Object[]{"222", 2}); - assertNotEquals(left, right); - assertNotEquals(left.hashCode(), right.hashCode()); + public void testHashPlainValues() { + Assert.assertEquals(BytesUtils.toHexString(HashUtils.hashMD5("hello world".getBytes())), + "5eb63bbbe01eeed093cb22bb8f5acdc3"); + Assert.assertEquals(BytesUtils.toHexString(HashUtils.hashMurmur3("hello world".getBytes())), + "0e617feb46603f53b163eb607d4697ab"); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index fc0888a..c028069 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -1036,7 +1036,7 @@ public class TableConfigUtilsTest { new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING) .build(); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null)).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build(); try { TableConfigUtils.validateUpsertConfig(tableConfig, schema); Assert.fail(); @@ -1045,7 +1045,7 @@ public class TableConfigUtilsTest { } tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null)).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).build(); try { TableConfigUtils.validateUpsertConfig(tableConfig, schema); Assert.fail(); @@ -1066,7 +1066,7 @@ public class TableConfigUtilsTest { Map<String, String> streamConfigs = getStreamConfigs(); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null)).setStreamConfigs(streamConfigs).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setStreamConfigs(streamConfigs).build(); try { TableConfigUtils.validateUpsertConfig(tableConfig, schema); Assert.fail(); @@ -1076,7 +1076,7 @@ public class TableConfigUtilsTest { streamConfigs.put("stream.kafka.consumer.type", "simple"); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null)).setStreamConfigs(streamConfigs).build(); + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)).setStreamConfigs(streamConfigs).build(); try { TableConfigUtils.validateUpsertConfig(tableConfig, schema); Assert.fail(); @@ -1086,7 +1086,7 @@ public class TableConfigUtilsTest { } tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null)) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)) .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) .setStreamConfigs(streamConfigs).build(); TableConfigUtils.validateUpsertConfig(tableConfig, schema); @@ -1094,7 +1094,7 @@ public class TableConfigUtilsTest { StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Lists.newArrayList("myCol"), null, Collections .singletonList(new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "myCol").toColumnName()), 10); tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null)) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null)) .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) .setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build(); try { @@ -1119,7 +1119,7 @@ public class TableConfigUtilsTest { partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT); TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) - .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, null)) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, null, null)) .setNullHandlingEnabled(false) .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE)) .setStreamConfigs(streamConfigs).build(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java index 3156c2d..2c3c597 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java @@ -286,6 +286,11 @@ public class TableConfig extends BaseJsonConfig { } @JsonIgnore + public UpsertConfig.HashFunction getHashFunction() { + return _upsertConfig == null ? UpsertConfig.HashFunction.NONE : _upsertConfig.getHashFunction(); + } + + @JsonIgnore public UpsertConfig.Mode getUpsertMode() { return _upsertConfig == null ? UpsertConfig.Mode.NONE : _upsertConfig.getMode(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index 3ad2b36..7e0be94 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -40,9 +40,16 @@ public class UpsertConfig extends BaseJsonConfig { APPEND, INCREMENT, OVERWRITE, UNION } + public enum HashFunction { + NONE, MD5, MURMUR3 + } + @JsonPropertyDescription("Upsert mode.") private final Mode _mode; + @JsonPropertyDescription("Function to hash the primary key.") + private final HashFunction _hashFunction; + @JsonPropertyDescription("Partial update strategies.") private final Map<String, Strategy> _partialUpsertStrategies; @@ -52,7 +59,8 @@ public class UpsertConfig extends BaseJsonConfig { @JsonCreator public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode, @JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy> partialUpsertStrategies, - @JsonProperty("comparisonColumn") @Nullable String comparisonColumn) { + @JsonProperty("comparisonColumn") @Nullable String comparisonColumn, + @JsonProperty("hashFunction") @Nullable HashFunction hashFunction) { Preconditions.checkArgument(mode != null, "Upsert mode must be configured"); _mode = mode; @@ -63,12 +71,17 @@ public class UpsertConfig extends BaseJsonConfig { } _comparisonColumn = comparisonColumn; + _hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction; } public Mode getMode() { return _mode; } + public HashFunction getHashFunction() { + return _hashFunction; + } + @Nullable public Map<String, Strategy> getPartialUpsertStrategies() { return _partialUpsertStrategies; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java index c6a2edf..b663cf6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java @@ -19,6 +19,7 @@ package org.apache.pinot.spi.data.readers; import java.util.Arrays; +import org.apache.commons.lang3.SerializationUtils; /** @@ -35,6 +36,10 @@ public class PrimaryKey { return _values; } + public byte[] asBytes() { + return SerializationUtils.serialize(_values); + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java index 95cfd6e..6189025 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.spi.utils; +import java.io.Serializable; import java.util.Arrays; import javax.annotation.Nonnull; @@ -29,7 +30,7 @@ import javax.annotation.Nonnull; * <li> Implements equals() and hashCode(), so it can be used as key for HashMap/Set. </li> * </ul> */ -public class ByteArray implements Comparable<ByteArray> { +public class ByteArray implements Comparable<ByteArray>, Serializable { private final byte[] _bytes; public ByteArray(byte[] bytes) { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java index ab071f7..37b18a2 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java @@ -29,15 +29,18 @@ public class UpsertConfigTest { @Test public void testUpsertConfig() { - UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null); + UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, null); assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL); - upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison"); + upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", null); assertEquals(upsertConfig1.getComparisonColumn(), "comparison"); + upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", UpsertConfig.HashFunction.MURMUR3); + assertEquals(upsertConfig1.getHashFunction(), UpsertConfig.HashFunction.MURMUR3); + Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>(); partialUpsertStratgies.put("myCol", UpsertConfig.Strategy.INCREMENT); - UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, null); + UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL, partialUpsertStratgies, null, null); assertEquals(upsertConfig2.getPartialUpsertStrategies(), partialUpsertStratgies); } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java index 97faad8..e1c531d 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.spi.data.readers; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.pinot.spi.utils.ByteArray; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -37,4 +39,13 @@ public class PrimaryKeyTest { assertNotEquals(left, right); assertNotEquals(left.hashCode(), right.hashCode()); } + + @Test + public void testSerialization() { + byte[] rawbytes = {0xa, 0x2, (byte) 0xff}; + PrimaryKey pk = new PrimaryKey(new Object[]{"111", 2, new ByteArray(rawbytes)}); + byte[] bytes = pk.asBytes(); + PrimaryKey deserialized = new PrimaryKey((Object[]) SerializationUtils.deserialize(bytes)); + assertEquals(deserialized, pk); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org