This is an automated email from the ASF dual-hosted git repository.

yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7add772  Add option to store the hash of the upsert primary key (#7246)
7add772 is described below

commit 7add77202e543f0932db6c0d8557cab9baa95bc4
Author: Yupeng Fu <yupe...@users.noreply.github.com>
AuthorDate: Tue Aug 10 11:42:49 2021 -0700

    Add option to store the hash of the upsert primary key (#7246)
    
    * add option to store the hash of the upsert primary key
    
    * use byte equality
    
    * fix ut
    
    * fix
    
    * fix integration tests
    
    * add header
    
    * comments
    
    * comments
    
    * comments
    
    * fix ut
    
    * comments
---
 .../common/utils/config/TableConfigSerDeTest.java  |   3 +-
 .../controller/helix/PinotResourceManagerTest.java |   2 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |   1 +
 .../manager/realtime/RealtimeTableDataManager.java |   3 +-
 ...adataAndDictionaryAggregationPlanMakerTest.java |   6 +-
 .../tests/BaseClusterIntegrationTest.java          |   2 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |   5 +-
 .../local/realtime/impl/RealtimeSegmentConfig.java |  19 ++-
 .../upsert/PartitionUpsertMetadataManager.java     |  28 +++-
 .../local/upsert/TableUpsertMetadataManager.java   |   8 +-
 .../pinot/segment/local/utils/HashUtils.java       |  25 ++--
 .../mutable/MutableSegmentImplTestUtils.java       |   7 +-
 .../MutableSegmentImplUpsertComparisonColTest.java |   7 +-
 .../mutable/MutableSegmentImplUpsertTest.java      |  32 +++--
 .../upsert/PartitionUpsertMetadataManagerTest.java | 148 +++++++++++++--------
 .../pinot/segment/local/utils/HashUtilsTest.java   |  24 ++--
 .../segment/local/utils/TableConfigUtilsTest.java  |  14 +-
 .../apache/pinot/spi/config/table/TableConfig.java |   5 +
 .../pinot/spi/config/table/UpsertConfig.java       |  15 ++-
 .../apache/pinot/spi/data/readers/PrimaryKey.java  |   5 +
 .../java/org/apache/pinot/spi/utils/ByteArray.java |   3 +-
 .../pinot/spi/config/table/UpsertConfigTest.java   |   9 +-
 .../pinot/spi/data/readers/PrimaryKeyTest.java     |  11 ++
 23 files changed, 253 insertions(+), 129 deletions(-)

diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 08b364c..cb48ef5 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -246,7 +246,8 @@ public class TableConfigSerDeTest {
     }
     {
       // with upsert config
-      UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL, 
null, "comparison");
+      UpsertConfig upsertConfig =
+          new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison", 
UpsertConfig.HashFunction.NONE);
 
       TableConfig tableConfig = 
tableConfigBuilder.setUpsertConfig(upsertConfig).build();
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 58f993a..939cfe9 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -68,7 +68,7 @@ public class PinotResourceManagerTest {
     
realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING);
     realtimeTableConfig.getValidationConfig()
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
-    realtimeTableConfig.setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL, null, null));
+    realtimeTableConfig.setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL, null, null, null));
     
ControllerTestUtils.getHelixResourceManager().addTable(realtimeTableConfig);
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 31b4151..2a9e4b5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1260,6 +1260,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
             
.setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled)
             
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
             .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
+            .setHashFunction(tableConfig.getHashFunction())
             
.setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn());
 
     // Create message decoder
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 64fdb4d..1aae911 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -168,8 +168,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
         partialUpsertHandler =
             new PartialUpsertHandler(_helixManager, _tableNameWithType, 
upsertConfig.getPartialUpsertStrategies());
       }
+      UpsertConfig.HashFunction hashFunction = upsertConfig.getHashFunction();
       _tableUpsertMetadataManager =
-          new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, 
partialUpsertHandler);
+          new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, 
partialUpsertHandler, hashFunction);
       _primaryKeyColumns = schema.getPrimaryKeyColumns();
       Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns),
           "Primary key columns must be configured for upsert");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 956ad31..b691773 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -44,6 +44,7 @@ import 
org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
@@ -123,8 +124,9 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), ReadMode.heap);
     ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
     _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), ReadMode.heap);
-    ((ImmutableSegmentImpl) _upsertIndexSegment)
-        .enableUpsert(new PartitionUpsertMetadataManager("testTable_REALTIME", 
0, serverMetrics, null),
+    ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
+        new PartitionUpsertMetadataManager("testTable_REALTIME", 0, 
serverMetrics, null,
+            UpsertConfig.HashFunction.NONE),
             new ThreadSafeMutableRoaringBitmap());
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 50da64b..de4ff03 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -386,7 +386,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, 
null)).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null)).build();
   }
 
   /**
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 73c6d4d..9cd4153 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -512,8 +512,9 @@ public class MutableSegmentImpl implements MutableSegment {
   private GenericRow handleUpsert(GenericRow row, int docId) {
     PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
     Object upsertComparisonValue = row.getValue(_upsertComparisonColumn);
-    Preconditions.checkState(upsertComparisonValue instanceof Comparable,
-        "Upsert comparison column: %s must be comparable", 
_upsertComparisonColumn);
+    Preconditions
+        .checkState(upsertComparisonValue instanceof Comparable, "Upsert 
comparison column: %s must be comparable",
+            _upsertComparisonColumn);
     return _partitionUpsertMetadataManager.updateRecord(this,
         new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, 
(Comparable) upsertComparisonValue), row);
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 43f9343..0a1252f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -57,6 +57,7 @@ public class RealtimeSegmentConfig {
   private final boolean _aggregateMetrics;
   private final boolean _nullHandlingEnabled;
   private final UpsertConfig.Mode _upsertMode;
+  private final UpsertConfig.HashFunction _hashFunction;
   private final String _upsertComparisonColumn;
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final String _consumerDir;
@@ -69,7 +70,7 @@ public class RealtimeSegmentConfig {
       RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, 
PinotDataBufferMemoryManager memoryManager,
       RealtimeSegmentStatsHistory statsHistory, String partitionColumn, 
PartitionFunction partitionFunction,
       int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, 
String consumerDir,
-      UpsertConfig.Mode upsertMode, String upsertComparisonColumn,
+      UpsertConfig.Mode upsertMode, String upsertComparisonColumn, 
UpsertConfig.HashFunction hashFunction,
       PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
     _tableNameWithType = tableNameWithType;
     _segmentName = segmentName;
@@ -96,6 +97,7 @@ public class RealtimeSegmentConfig {
     _nullHandlingEnabled = nullHandlingEnabled;
     _consumerDir = consumerDir;
     _upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
+    _hashFunction = hashFunction != null ? hashFunction : 
UpsertConfig.HashFunction.NONE;
     _upsertComparisonColumn = upsertComparisonColumn;
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
   }
@@ -205,6 +207,10 @@ public class RealtimeSegmentConfig {
     return _upsertMode;
   }
 
+  public UpsertConfig.HashFunction getHashFunction() {
+    return _hashFunction;
+  }
+
   public String getUpsertComparisonColumn() {
     return _upsertComparisonColumn;
   }
@@ -239,6 +245,7 @@ public class RealtimeSegmentConfig {
     private boolean _nullHandlingEnabled = false;
     private String _consumerDir;
     private UpsertConfig.Mode _upsertMode;
+    private UpsertConfig.HashFunction _hashFunction;
     private String _upsertComparisonColumn;
     private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
 
@@ -378,11 +385,16 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
+    public Builder setHashFunction(UpsertConfig.HashFunction hashFunction) {
+      _hashFunction = hashFunction;
+      return this;
+    }
+
     public Builder setUpsertComparisonColumn(String upsertComparisonColumn) {
       _upsertComparisonColumn = upsertComparisonColumn;
       return this;
     }
-    
+
     public Builder 
setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager 
partitionUpsertMetadataManager) {
       _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
       return this;
@@ -393,7 +405,8 @@ public class RealtimeSegmentConfig {
           _capacity, _avgNumMultiValues, _noDictionaryColumns, 
_varLengthDictionaryColumns, _invertedIndexColumns,
           _textIndexColumns, _fstIndexColumns, _jsonIndexColumns, 
_h3IndexConfigs, _realtimeSegmentZKMetadata, _offHeap,
           _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, 
_partitionId, _aggregateMetrics,
-          _nullHandlingEnabled, _consumerDir, _upsertMode, 
_upsertComparisonColumn, _partitionUpsertMetadataManager);
+          _nullHandlingEnabled, _consumerDir, _upsertMode, 
_upsertComparisonColumn, _hashFunction,
+          _partitionUpsertMetadataManager);
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 6bd9d1b..64d3192 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -26,10 +26,13 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,10 +69,11 @@ public class PartitionUpsertMetadataManager {
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
   private final PartialUpsertHandler _partialUpsertHandler;
+  private final UpsertConfig.HashFunction _hashFunction;
 
   // TODO(upsert): consider an off-heap KV store to persist this mapping to 
improve the recovery speed.
   @VisibleForTesting
-  final ConcurrentHashMap<PrimaryKey, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+  final ConcurrentHashMap<Object, RecordLocation> 
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   // Reused for reading previous record during partial upsert
   private final GenericRow _reuse = new GenericRow();
@@ -77,11 +81,12 @@ public class PartitionUpsertMetadataManager {
   private GenericRow _result;
 
   public PartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, ServerMetrics serverMetrics,
-      @Nullable PartialUpsertHandler partialUpsertHandler) {
+      @Nullable PartialUpsertHandler partialUpsertHandler, 
UpsertConfig.HashFunction hashFunction) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
     _partialUpsertHandler = partialUpsertHandler;
+    _hashFunction = hashFunction;
   }
 
   /**
@@ -95,7 +100,8 @@ public class PartitionUpsertMetadataManager {
 
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
-      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
+      _primaryKeyToRecordLocationMap
+          .compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction), 
(primaryKey, currentRecordLocation) -> {
         if (currentRecordLocation != null) {
           // Existing primary key
 
@@ -176,7 +182,8 @@ public class PartitionUpsertMetadataManager {
     }
 
     _result = record;
-    _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, 
(primaryKey, currentRecordLocation) -> {
+    _primaryKeyToRecordLocationMap
+        .compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction), 
(primaryKey, currentRecordLocation) -> {
       if (currentRecordLocation != null) {
         // Existing primary key
 
@@ -238,6 +245,19 @@ public class PartitionUpsertMetadataManager {
         _primaryKeyToRecordLocationMap.size());
   }
 
+  protected static Object hashPrimaryKey(PrimaryKey primaryKey, 
UpsertConfig.HashFunction hashFunction) {
+    switch (hashFunction) {
+      case NONE:
+        return primaryKey;
+      case MD5:
+        return new ByteArray(HashUtils.hashMD5(primaryKey.asBytes()));
+      case MURMUR3:
+        return new ByteArray(HashUtils.hashMurmur3(primaryKey.asBytes()));
+      default:
+        throw new IllegalArgumentException(String.format("Unrecognized hash 
function %s", hashFunction));
+    }
+  }
+
   public static final class RecordInfo {
     private final PrimaryKey _primaryKey;
     private final int _docId;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index ae8fb3f..d78cfc0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 
 
 /**
@@ -34,16 +35,19 @@ public class TableUpsertMetadataManager {
   private final String _tableNameWithType;
   private final ServerMetrics _serverMetrics;
   private final PartialUpsertHandler _partialUpsertHandler;
+  private final UpsertConfig.HashFunction _hashFunction;
 
   public TableUpsertMetadataManager(String tableNameWithType, ServerMetrics 
serverMetrics,
-      @Nullable PartialUpsertHandler partialUpsertHandler) {
+      @Nullable PartialUpsertHandler partialUpsertHandler, 
UpsertConfig.HashFunction hashFunction) {
     _tableNameWithType = tableNameWithType;
     _serverMetrics = serverMetrics;
     _partialUpsertHandler = partialUpsertHandler;
+    _hashFunction = hashFunction;
   }
 
   public PartitionUpsertMetadataManager getOrCreatePartitionManager(int 
partitionId) {
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
-        k -> new PartitionUpsertMetadataManager(_tableNameWithType, k, 
_serverMetrics, _partialUpsertHandler));
+        k -> new PartitionUpsertMetadataManager(_tableNameWithType, k, 
_serverMetrics, _partialUpsertHandler,
+            _hashFunction));
   }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HashUtils.java
similarity index 57%
copy from 
pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HashUtils.java
index 97faad8..dfd9a5c 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HashUtils.java
@@ -16,25 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.data.readers;
+package org.apache.pinot.segment.local.utils;
 
-import org.testng.annotations.Test;
+import com.google.common.hash.Hashing;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
 
+public class HashUtils {
+  private HashUtils() {
+  }
 
-public class PrimaryKeyTest {
-
-  @Test
-  public void testPrimaryKeyComparison() {
-    PrimaryKey left = new PrimaryKey(new Object[]{"111", 2});
-    PrimaryKey right = new PrimaryKey(new Object[]{"111", 2});
-    assertEquals(left, right);
-    assertEquals(left.hashCode(), right.hashCode());
+  public static byte[] hashMurmur3(byte[] bytes) {
+    return Hashing.murmur3_128().hashBytes(bytes).asBytes();
+  }
 
-    right = new PrimaryKey(new Object[]{"222", 2});
-    assertNotEquals(left, right);
-    assertNotEquals(left.hashCode(), right.hashCode());
+  public static byte[] hashMD5(byte[] bytes) {
+    return Hashing.md5().hashBytes(bytes).asBytes();
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 4f52009..e88ec3c 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -63,6 +63,8 @@ public class MutableSegmentImplTestUtils {
 
     UpsertConfig.Mode upsertMode = upsertConfig == null ? 
UpsertConfig.Mode.NONE : upsertConfig.getMode();
     String comparisonColumn = upsertConfig == null ? null : 
upsertConfig.getComparisonColumn();
+    UpsertConfig.HashFunction hashFunction =
+        upsertConfig == null ? UpsertConfig.HashFunction.NONE : 
upsertConfig.getHashFunction();
     RealtimeSegmentConfig realtimeSegmentConfig =
         new 
RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
             
.setStreamName(STEAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000)
@@ -71,9 +73,8 @@ public class MutableSegmentImplTestUtils {
             .setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
             .setMemoryManager(new 
DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
             
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
-            .setUpsertComparisonColumn(comparisonColumn)
-            .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
-            .build();
+            
.setUpsertComparisonColumn(comparisonColumn).setHashFunction(hashFunction)
+            
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).build();
     return new MutableSegmentImpl(realtimeSegmentConfig, null);
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index b7ae126..5a08556 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -57,15 +57,16 @@ public class MutableSegmentImplUpsertComparisonColTest {
     URL dataResourceUrl = 
this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, 
"offset")).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, 
"offset", null)).build();
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
-        new TableUpsertMetadataManager("testTable_REALTIME", 
Mockito.mock(ServerMetrics.class), null)
+        new TableUpsertMetadataManager("testTable_REALTIME", 
Mockito.mock(ServerMetrics.class), null,
+            UpsertConfig.HashFunction.NONE)
             .getOrCreatePartitionManager(0);
     _mutableSegmentImpl = MutableSegmentImplTestUtils
         .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
-            false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, 
"offset"), "secondsSinceEpoch",
+            false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, 
"offset", null), "secondsSinceEpoch",
             _partitionUpsertMetadataManager);
     GenericRow reuse = new GenericRow();
     try (RecordReader recordReader = RecordReaderFactory
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index 2b367cd..d62a6ed 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -37,35 +37,34 @@ import 
org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.mockito.Mockito;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
 public class MutableSegmentImplUpsertTest {
   private static final String SCHEMA_FILE_PATH = 
"data/test_upsert_schema.json";
   private static final String DATA_FILE_PATH = "data/test_upsert_data.json";
-  private static CompositeTransformer _recordTransformer;
-  private static Schema _schema;
-  private static TableConfig _tableConfig;
-  private static MutableSegmentImpl _mutableSegmentImpl;
-  private static PartitionUpsertMetadataManager 
_partitionUpsertMetadataManager;
+  private CompositeTransformer _recordTransformer;
+  private Schema _schema;
+  private TableConfig _tableConfig;
+  private MutableSegmentImpl _mutableSegmentImpl;
+  private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
 
-  @BeforeClass
-  public void setup()
+  private void setup(UpsertConfig.HashFunction hashFunction)
       throws Exception {
     URL schemaResourceUrl = 
this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
     URL dataResourceUrl = 
this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, 
null)).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
hashFunction)).build();
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
-        new TableUpsertMetadataManager("testTable_REALTIME", 
Mockito.mock(ServerMetrics.class), null)
+        new TableUpsertMetadataManager("testTable_REALTIME", 
Mockito.mock(ServerMetrics.class), null,
+            hashFunction)
             .getOrCreatePartitionManager(0);
     _mutableSegmentImpl = MutableSegmentImplTestUtils
         .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
-            false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null), 
"secondsSinceEpoch",
+            false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
hashFunction), "secondsSinceEpoch",
             _partitionUpsertMetadataManager);
     GenericRow reuse = new GenericRow();
     try (RecordReader recordReader = RecordReaderFactory
@@ -80,7 +79,16 @@ public class MutableSegmentImplUpsertTest {
   }
 
   @Test
-  public void testUpsertIngestion() {
+  public void testHashFunctions()
+      throws Exception {
+    testUpsertIngestion(UpsertConfig.HashFunction.NONE);
+    testUpsertIngestion(UpsertConfig.HashFunction.MD5);
+    testUpsertIngestion(UpsertConfig.HashFunction.MURMUR3);
+  }
+
+  private void testUpsertIngestion(UpsertConfig.HashFunction hashFunction)
+      throws Exception {
+    setup(hashFunction);
     ImmutableRoaringBitmap bitmap = 
_mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
     Assert.assertFalse(bitmap.contains(0));
     Assert.assertTrue(bitmap.contains(1));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
index 6812d63..ce91f01 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
@@ -26,9 +26,13 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -45,9 +49,15 @@ public class PartitionUpsertMetadataManagerTest {
 
   @Test
   public void testAddSegment() {
+    verifyAddSegment(UpsertConfig.HashFunction.NONE);
+    verifyAddSegment(UpsertConfig.HashFunction.MD5);
+    verifyAddSegment(UpsertConfig.HashFunction.MURMUR3);
+  }
+
+  private void verifyAddSegment(UpsertConfig.HashFunction hashFunction) {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
mock(ServerMetrics.class), null);
-    Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
mock(ServerMetrics.class), null, hashFunction);
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
     List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new 
ArrayList<>();
@@ -61,9 +71,9 @@ public class PartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
     upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
 
     // Add the second segment
@@ -78,10 +88,10 @@ public class PartitionUpsertMetadataManagerTest {
     upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
 
@@ -92,26 +102,28 @@ public class PartitionUpsertMetadataManagerTest {
     // original segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
-    assertSame(recordLocationMap.get(getPrimaryKey(1)).getSegment(), 
newSegment1);
+    
assertSame(recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(1),
 hashFunction))
+        .getSegment(), newSegment1);
 
     // Remove the original segment1
     upsertMetadataManager.removeSegment(segment1);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
-    assertSame(recordLocationMap.get(getPrimaryKey(1)).getSegment(), 
newSegment1);
+    
assertSame(recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(1),
 hashFunction))
+        .getSegment(), newSegment1);
   }
 
   private static ImmutableSegmentImpl mockSegment(int sequenceNumber, 
ThreadSafeMutableRoaringBitmap validDocIds) {
@@ -130,9 +142,10 @@ public class PartitionUpsertMetadataManagerTest {
     return new PrimaryKey(new Object[]{value});
   }
 
-  private static void checkRecordLocation(Map<PrimaryKey, RecordLocation> 
recordLocationMap, int keyValue,
-      IndexSegment segment, int docId, int timestamp) {
-    RecordLocation recordLocation = 
recordLocationMap.get(getPrimaryKey(keyValue));
+  private static void checkRecordLocation(Map<Object, RecordLocation> 
recordLocationMap, int keyValue,
+      IndexSegment segment, int docId, int timestamp, 
UpsertConfig.HashFunction hashFunction) {
+    RecordLocation recordLocation =
+        
recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(keyValue),
 hashFunction));
     assertNotNull(recordLocation);
     assertSame(recordLocation.getSegment(), segment);
     assertEquals(recordLocation.getDocId(), docId);
@@ -141,9 +154,15 @@ public class PartitionUpsertMetadataManagerTest {
 
   @Test
   public void testUpdateRecord() {
+    verifyUpdateRecord(UpsertConfig.HashFunction.NONE);
+    verifyUpdateRecord(UpsertConfig.HashFunction.MD5);
+    verifyUpdateRecord(UpsertConfig.HashFunction.MURMUR3);
+  }
+
+  private void verifyUpdateRecord(UpsertConfig.HashFunction hashFunction) {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
mock(ServerMetrics.class), null);
-    Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
mock(ServerMetrics.class), null, hashFunction);
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
@@ -160,56 +179,62 @@ public class PartitionUpsertMetadataManagerTest {
     IndexSegment segment2 = mockSegment(1, validDocIds2);
 
     GenericRow row = mock(GenericRow.class);
-    upsertMetadataManager
-        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100), row);
+    upsertMetadataManager.updateRecord(segment2,
+        new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 
100), row);
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
 
-    upsertMetadataManager
-        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120), row);
+    upsertMetadataManager.updateRecord(segment2,
+        new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 
120), row);
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
 
-    upsertMetadataManager
-        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100), row);
+    upsertMetadataManager.updateRecord(segment2,
+        new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 
100), row);
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
 
-    upsertMetadataManager
-        .updateRecord(segment2, new 
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100), row);
+    upsertMetadataManager.updateRecord(segment2,
+        new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 
100), row);
     // segment1: 1 -> {1, 120}
     // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
+    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 3});
   }
 
   @Test
   public void testRemoveSegment() {
+    verifyRemoveSegment(UpsertConfig.HashFunction.NONE);
+    verifyRemoveSegment(UpsertConfig.HashFunction.MD5);
+    verifyRemoveSegment(UpsertConfig.HashFunction.MURMUR3);
+  }
+
+  private void verifyRemoveSegment(UpsertConfig.HashFunction hashFunction) {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
mock(ServerMetrics.class), null);
-    Map<PrimaryKey, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, 
mock(ServerMetrics.class), null, hashFunction);
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add 2 segments
     // segment1: 0 -> {0, 100}, 1 -> {1, 100}
@@ -232,8 +257,27 @@ public class PartitionUpsertMetadataManagerTest {
     // segment2: 2 -> {0, 100}, 3 -> {0, 100}
     assertNull(recordLocationMap.get(getPrimaryKey(0)));
     assertNull(recordLocationMap.get(getPrimaryKey(1)));
-    checkRecordLocation(recordLocationMap, 2, segment2, 0, 100);
-    checkRecordLocation(recordLocationMap, 3, segment2, 1, 100);
+    checkRecordLocation(recordLocationMap, 2, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 1, 100, hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
   }
+
+  @Test
+  public void testHashPrimaryKey() {
+    PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
+    Assert.assertEquals(BytesUtils.toHexString(
+            ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, 
UpsertConfig.HashFunction.MD5)).getBytes()),
+        "58de44997505014e02982846a4d1cbbd");
+    Assert.assertEquals(BytesUtils.toHexString(
+            ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, 
UpsertConfig.HashFunction.MURMUR3)).getBytes()),
+        "7e6b4a98296292a4012225fff037fa8c");
+    // reorder
+    pk = new PrimaryKey(new Object[]{"uuid-3", "uuid-2", "uuid-1"});
+    Assert.assertEquals(BytesUtils.toHexString(
+            ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, 
UpsertConfig.HashFunction.MD5)).getBytes()),
+        "d2df12c6dea7b83f965613614eee58e2");
+    Assert.assertEquals(BytesUtils.toHexString(
+            ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, 
UpsertConfig.HashFunction.MURMUR3)).getBytes()),
+        "8d68b314cc0c8de4dbd55f4dad3c3e66");
+  }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/HashUtilsTest.java
similarity index 60%
copy from 
pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
copy to 
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/HashUtilsTest.java
index 97faad8..e704cb1 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/HashUtilsTest.java
@@ -16,25 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.data.readers;
+package org.apache.pinot.segment.local.utils;
 
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-
-
-public class PrimaryKeyTest {
 
+public class HashUtilsTest {
   @Test
-  public void testPrimaryKeyComparison() {
-    PrimaryKey left = new PrimaryKey(new Object[]{"111", 2});
-    PrimaryKey right = new PrimaryKey(new Object[]{"111", 2});
-    assertEquals(left, right);
-    assertEquals(left.hashCode(), right.hashCode());
-
-    right = new PrimaryKey(new Object[]{"222", 2});
-    assertNotEquals(left, right);
-    assertNotEquals(left.hashCode(), right.hashCode());
+  public void testHashPlainValues() {
+    Assert.assertEquals(BytesUtils.toHexString(HashUtils.hashMD5("hello 
world".getBytes())),
+        "5eb63bbbe01eeed093cb22bb8f5acdc3");
+    Assert.assertEquals(BytesUtils.toHexString(HashUtils.hashMurmur3("hello 
world".getBytes())),
+        "0e617feb46603f53b163eb607d4697ab");
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index fc0888a..c028069 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1036,7 +1036,7 @@ public class TableConfigUtilsTest {
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
             .build();
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, 
null)).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null)).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
       Assert.fail();
@@ -1045,7 +1045,7 @@ public class TableConfigUtilsTest {
     }
 
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, 
null)).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null)).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
       Assert.fail();
@@ -1066,7 +1066,7 @@ public class TableConfigUtilsTest {
 
     Map<String, String> streamConfigs = getStreamConfigs();
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, 
null)).setStreamConfigs(streamConfigs).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null)).setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
       Assert.fail();
@@ -1076,7 +1076,7 @@ public class TableConfigUtilsTest {
 
     streamConfigs.put("stream.kafka.consumer.type", "simple");
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, 
null)).setStreamConfigs(streamConfigs).build();
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null)).setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
       Assert.fail();
@@ -1086,7 +1086,7 @@ public class TableConfigUtilsTest {
     }
 
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null))
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null))
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStreamConfigs(streamConfigs).build();
     TableConfigUtils.validateUpsertConfig(tableConfig, schema);
@@ -1094,7 +1094,7 @@ public class TableConfigUtilsTest {
     StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Lists.newArrayList("myCol"), null, Collections
         .singletonList(new 
AggregationFunctionColumnPair(AggregationFunctionType.COUNT, 
"myCol").toColumnName()), 10);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null))
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null, 
null))
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
     try {
@@ -1119,7 +1119,7 @@ public class TableConfigUtilsTest {
     partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
 
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL, 
partialUpsertStratgies, null))
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL, 
partialUpsertStratgies, null, null))
         .setNullHandlingEnabled(false)
         .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStreamConfigs(streamConfigs).build();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 3156c2d..2c3c597 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -286,6 +286,11 @@ public class TableConfig extends BaseJsonConfig {
   }
 
   @JsonIgnore
+  public UpsertConfig.HashFunction getHashFunction() {
+    return _upsertConfig == null ? UpsertConfig.HashFunction.NONE : 
_upsertConfig.getHashFunction();
+  }
+
+  @JsonIgnore
   public UpsertConfig.Mode getUpsertMode() {
     return _upsertConfig == null ? UpsertConfig.Mode.NONE : 
_upsertConfig.getMode();
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 3ad2b36..7e0be94 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -40,9 +40,16 @@ public class UpsertConfig extends BaseJsonConfig {
     APPEND, INCREMENT, OVERWRITE, UNION
   }
 
+  public enum HashFunction {
+    NONE, MD5, MURMUR3
+  }
+
   @JsonPropertyDescription("Upsert mode.")
   private final Mode _mode;
 
+  @JsonPropertyDescription("Function to hash the primary key.")
+  private final HashFunction _hashFunction;
+
   @JsonPropertyDescription("Partial update strategies.")
   private final Map<String, Strategy> _partialUpsertStrategies;
 
@@ -52,7 +59,8 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonCreator
   public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
       @JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy> 
partialUpsertStrategies,
-      @JsonProperty("comparisonColumn") @Nullable String comparisonColumn) {
+      @JsonProperty("comparisonColumn") @Nullable String comparisonColumn,
+      @JsonProperty("hashFunction") @Nullable HashFunction hashFunction) {
     Preconditions.checkArgument(mode != null, "Upsert mode must be 
configured");
     _mode = mode;
 
@@ -63,12 +71,17 @@ public class UpsertConfig extends BaseJsonConfig {
     }
 
     _comparisonColumn = comparisonColumn;
+    _hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction;
   }
 
   public Mode getMode() {
     return _mode;
   }
 
+  public HashFunction getHashFunction() {
+    return _hashFunction;
+  }
+
   @Nullable
   public Map<String, Strategy> getPartialUpsertStrategies() {
     return _partialUpsertStrategies;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
index c6a2edf..b663cf6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.data.readers;
 
 import java.util.Arrays;
+import org.apache.commons.lang3.SerializationUtils;
 
 
 /**
@@ -35,6 +36,10 @@ public class PrimaryKey {
     return _values;
   }
 
+  public byte[] asBytes() {
+    return SerializationUtils.serialize(_values);
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java
index 95cfd6e..6189025 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.utils;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import javax.annotation.Nonnull;
 
@@ -29,7 +30,7 @@ import javax.annotation.Nonnull;
  *   <li> Implements equals() and hashCode(), so it can be used as key for 
HashMap/Set. </li>
  * </ul>
  */
-public class ByteArray implements Comparable<ByteArray> {
+public class ByteArray implements Comparable<ByteArray>, Serializable {
   private final byte[] _bytes;
 
   public ByteArray(byte[] bytes) {
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
index ab071f7..37b18a2 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
@@ -29,15 +29,18 @@ public class UpsertConfigTest {
 
   @Test
   public void testUpsertConfig() {
-    UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, 
null, null);
+    UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, 
null, null, null);
     assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL);
 
-    upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, 
"comparison");
+    upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, 
"comparison", null);
     assertEquals(upsertConfig1.getComparisonColumn(), "comparison");
 
+    upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null, 
"comparison", UpsertConfig.HashFunction.MURMUR3);
+    assertEquals(upsertConfig1.getHashFunction(), 
UpsertConfig.HashFunction.MURMUR3);
+
     Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new 
HashMap<>();
     partialUpsertStratgies.put("myCol", UpsertConfig.Strategy.INCREMENT);
-    UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL, 
partialUpsertStratgies, null);
+    UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL, 
partialUpsertStratgies, null, null);
     assertEquals(upsertConfig2.getPartialUpsertStrategies(), 
partialUpsertStratgies);
   }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
index 97faad8..e1c531d 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.spi.data.readers;
 
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.pinot.spi.utils.ByteArray;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -37,4 +39,13 @@ public class PrimaryKeyTest {
     assertNotEquals(left, right);
     assertNotEquals(left.hashCode(), right.hashCode());
   }
+
+  @Test
+  public void testSerialization() {
+    byte[] rawbytes = {0xa, 0x2, (byte) 0xff};
+    PrimaryKey pk = new PrimaryKey(new Object[]{"111", 2, new 
ByteArray(rawbytes)});
+    byte[] bytes = pk.asBytes();
+    PrimaryKey deserialized = new PrimaryKey((Object[]) 
SerializationUtils.deserialize(bytes));
+    assertEquals(deserialized, pk);
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to