This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 74877be2aa Refactor murmur functions to pinot-spi (#15160)
74877be2aa is described below

commit 74877be2aa599baa8a2ea16529a07a6dbf611f2c
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Mon Mar 3 15:53:33 2025 -0800

    Refactor murmur functions to pinot-spi (#15160)
---
 .../common/function/scalar/HashFunctions.java      | 168 ++++++-
 .../org/apache/pinot/common/utils/HashUtil.java    | 120 -----
 .../common/utils/helix/LeadControllerUtils.java    |   6 +-
 .../common/function/scalar/HashFunctionsTest.java  | 142 ++++++
 .../spi/partition/Murmur3PartitionFunction.java    | 203 +-------
 .../spi/partition/MurmurPartitionFunction.java     |  56 +--
 .../spi/partition/PartitionFunctionTest.java       |  19 +-
 .../pinot/spi/utils/hash/MurmurHashFunctions.java  | 514 +++++++++++++++++++++
 .../admin/command/StreamAvroIntoKafkaCommand.java  |   4 +-
 9 files changed, 843 insertions(+), 389 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java
index ccf14a122d..6d167af166 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pinot.common.function.scalar;
 
+import com.google.common.hash.Hashing;
+import java.nio.charset.StandardCharsets;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.pinot.spi.annotations.ScalarFunction;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
 
 
 /**
@@ -45,7 +48,18 @@ public class HashFunctions {
    */
   @ScalarFunction
   public static String sha(byte[] input) {
-    return DigestUtils.shaHex(input);
+    return DigestUtils.sha1Hex(input);
+  }
+
+  /**
+   * Return SHA-224 digest as hex string.
+   *
+   * @param input the byte array representing the data
+   * @return hash string in hex format
+   */
+  @ScalarFunction
+  public static String sha224(byte[] input) {
+    return DigestUtils.sha3_224Hex(input);
   }
 
   /**
@@ -70,6 +84,17 @@ public class HashFunctions {
     return DigestUtils.sha512Hex(input);
   }
 
+  /**
+   * Return MD2 digest as hex string.
+   *
+   * @param input the byte array representing the data
+   * @return hash string in hex format
+   */
+  @ScalarFunction
+  public static String md2(byte[] input) {
+    return DigestUtils.md2Hex(input);
+  }
+
   /**
    * Return MD5 digest as hex string.
    *
@@ -80,4 +105,145 @@ public class HashFunctions {
   public static String md5(byte[] input) {
     return DigestUtils.md5Hex(input);
   }
+
+  /**
+   * Computes 32-bit MurmurHash2 of the given byte array.
+   *
+   * @param input the byte array to hash
+   * @return 32-bit hash
+   */
+  @ScalarFunction
+  public static int murmurHash2(byte[] input) {
+    return MurmurHashFunctions.murmurHash2(input);
+  }
+
+  /**
+   * Computes 32-bit MurmurHash2 of the given string.
+   *
+   * @param input the byte array to hash
+   * @return 32-bit hash
+   */
+  @ScalarFunction
+  public static int murmurHash2UTF8(String input) {
+    return 
MurmurHashFunctions.murmurHash2(input.getBytes(StandardCharsets.UTF_8));
+  }
+
+  /**
+   * Computes 64-bit MurmurHash2 of the given byte array.
+   *
+   * @param input the byte array to hash
+   * @return 64-bit hash
+   */
+  @ScalarFunction
+  public static long murmurHash2Bit64(byte[] input) {
+    return MurmurHashFunctions.murmurHash2Bit64(input);
+  }
+
+  /**
+   * Computes 64-bit MurmurHash2 of the given byte array and seed.
+   *
+   * @param input the byte array to hash
+   * @return 64-bit hash
+   */
+  @ScalarFunction
+  public static long murmurHash2Bit64(byte[] input, int seed) {
+    return MurmurHashFunctions.murmurHash2Bit64(input, input.length, seed);
+  }
+
+  /**
+   * Computes 32-bit Murmur3 Hash of the given byte array and seed.
+   *
+   * @param input the byte array to hash
+   * @return 32-bit hash
+   */
+  @ScalarFunction
+  public static int murmurHash3Bit32(byte[] input, int seed) {
+    return Hashing.murmur3_32_fixed(seed).hashBytes(input).asInt();
+  }
+
+  /**
+   * Computes 64-bit Murmur3 Hash of the given byte array and seed.
+   *
+   * @param input the byte array to hash
+   * @return 64-bit hash
+   */
+  @ScalarFunction
+  public static long murmurHash3Bit64(byte[] input, int seed) {
+    return Hashing.murmur3_128(seed).hashBytes(input).asLong();
+  }
+
+  /**
+   * Computes 128-bit Murmur3 Hash of the given byte array and seed.
+   *
+   * @param input the byte array to hash
+   * @return 128-bit hash represented in a 16-byte array
+   */
+  @ScalarFunction
+  public static byte[] murmurHash3Bit128(byte[] input, int seed) {
+    return Hashing.murmur3_128(seed).hashBytes(input).asBytes();
+  }
+
+  /**
+   * Computes 32-bit Murmur3 Hash of the given byte array and seed for x64 
platform.
+   *
+   * @param input the byte array to hash
+   * @return 32-bit hash
+   */
+  @ScalarFunction
+  public static int murmurHash3X64Bit32(byte[] input, int seed) {
+    return MurmurHashFunctions.murmurHash3X64Bit32(input, seed);
+  }
+
+  /**
+   * Computes 64-bit Murmur3 Hash of the given byte array and seed for x64 
platform.
+   *
+   * @param input the byte array to hash
+   * @return 64-bit hash
+   */
+  @ScalarFunction
+  public static long murmurHash3X64Bit64(byte[] input, int seed) {
+    return MurmurHashFunctions.murmurHash3X64Bit32(input, seed);
+  }
+
+  /**
+   * Computes 128-bit Murmur3 Hash of the given byte array and seed for x64 
platform.
+   *
+   * @param input the byte array to hash
+   * @return 128-bit hash represented in a 16-byte array
+   */
+  @ScalarFunction
+  public static byte[] murmurHash3X64Bit128(byte[] input, int seed) {
+    return MurmurHashFunctions.murmurHash3X64Bit128(input, seed);
+  }
+
+  /**
+   * Computes 32-bit Adler Hash of the given byte array
+   * @param input the byte array to hash
+   * @return 32-bit hash
+   */
+  @ScalarFunction
+  public static int adler32(byte[] input) {
+    return Hashing.adler32().hashBytes(input).asInt();
+  }
+
+  /**
+   * Computes 32-bit CRC (Cyclic Redundancy Check)  of the given byte array
+   * @param input the byte array to hash
+   * @return 32-bit CRC32 hash
+   */
+  @ScalarFunction
+  public static int crc32(byte[] input) {
+    return Hashing.crc32().hashBytes(input).asInt();
+  }
+
+  /**
+   * Computes 32-bit CRC32C (Cyclic Redundancy Check 32C) of the given byte 
array.
+   *
+   * @param input the byte array to hash
+   * @return 32-bit CRC32C hash
+   */
+  @ScalarFunction
+  public static int crc32c(byte[] input) {
+    return Hashing.crc32c().hashBytes(input).asInt();
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
index a8c5cc9985..de0409751e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java
@@ -19,8 +19,6 @@
 package org.apache.pinot.common.utils;
 
 import com.google.common.primitives.Ints;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
 
 
 public class HashUtil {
@@ -56,122 +54,4 @@ public class HashUtil {
     }
     return Integer.MAX_VALUE;
   }
-
-  public static long compute(IntBuffer buff) {
-    buff.rewind();
-    ByteBuffer bBuff = ByteBuffer.allocate(buff.array().length * 4);
-    for (int i : buff.array()) {
-      bBuff.putInt(i);
-    }
-    return compute(bBuff);
-  }
-
-  public static long compute(ByteBuffer buff) {
-    return hash64(buff.array(), buff.array().length);
-  }
-
-  public static long hash64(final byte[] data, int length) {
-    // Default seed is 0xe17a1465.
-    return hash64(data, length, 0xe17a1465);
-  }
-
-  // Implement 64-bit Murmur2 hash.
-  public static long hash64(final byte[] data, int length, int seed) {
-    final long m = 0xc6a4a7935bd1e995L;
-    final int r = 47;
-
-    long h = (seed & 0xffffffffL) ^ (length * m);
-
-    int length8 = length / 8;
-
-    for (int i = 0; i < length8; i++) {
-      final int i8 = i * 8;
-      long k =
-          ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) + 
(((long) data[i8 + 2] & 0xff) << 16) + (
-              ((long) data[i8 + 3] & 0xff) << 24) + (((long) data[i8 + 4] & 
0xff) << 32) + (((long) data[i8 + 5] & 0xff)
-              << 40) + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 
+ 7] & 0xff) << 56);
-
-      k *= m;
-      k ^= k >>> r;
-      k *= m;
-
-      h ^= k;
-      h *= m;
-    }
-
-    // CHECKSTYLE:OFF: checkstyle:coding
-    switch (length % 8) {
-      case 7:
-        h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48;
-      case 6:
-        h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40;
-      case 5:
-        h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32;
-      case 4:
-        h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24;
-      case 3:
-        h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16;
-      case 2:
-        h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8;
-      case 1:
-        h ^= data[length & ~7] & 0xff;
-        h *= m;
-    }
-    // CHECKSTYLE:ON: checkstyle:coding
-
-    h ^= h >>> r;
-    h *= m;
-    h ^= h >>> r;
-    return h;
-  }
-
-  /**
-   * Generates 32 bit murmur2 hash from byte array
-   * @param data byte array to hash
-   * @return 32 bit hash of the given array
-   */
-  @SuppressWarnings("checkstyle")
-  public static int murmur2(final byte[] data) {
-    int length = data.length;
-    int seed = 0x9747b28c;
-    // 'm' and 'r' are mixing constants generated offline.
-    // They're not really 'magic', they just happen to work well.
-    final int m = 0x5bd1e995;
-    final int r = 24;
-
-    // Initialize the hash to a random value
-    int h = seed ^ length;
-    int length4 = length / 4;
-
-    for (int i = 0; i < length4; i++) {
-      final int i4 = i * 4;
-      int k =
-          (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 
2] & 0xff) << 16) + ((data[i4 + 3] & 0xff)
-              << 24);
-      k *= m;
-      k ^= k >>> r;
-      k *= m;
-      h *= m;
-      h ^= k;
-    }
-
-    // CHECKSTYLE:OFF: checkstyle:coding
-    // Handle the last few bytes of the input array
-    switch (length % 4) {
-      case 3:
-        h ^= (data[(length & ~3) + 2] & 0xff) << 16;
-      case 2:
-        h ^= (data[(length & ~3) + 1] & 0xff) << 8;
-      case 1:
-        h ^= data[length & ~3] & 0xff;
-        h *= m;
-    }
-    // CHECKSTYLE:ON: checkstyle:coding
-
-    h ^= h >>> 13;
-    h *= m;
-    h ^= h >>> 15;
-
-    return h;
-  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java
index c7dffb044c..9984f5f463 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java
@@ -24,7 +24,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.ResourceConfig;
-import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.function.scalar.HashFunctions;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +40,7 @@ public class LeadControllerUtils {
 
   /**
    * Given a raw table name and number of partitions, returns the partition id 
in lead controller resource.
-   * Uses murmur2 function to get hashcode for table, ignores the most 
significant bit.
+   * Uses murmurHash2 function to get hashcode for table, ignores the most 
significant bit.
    * Note: This method CANNOT be changed when lead controller resource is 
enabled.
    * Otherwise it will assign different controller for the same table, which 
will mess up the controller periodic
    * tasks and realtime segment completion.
@@ -48,7 +48,7 @@ public class LeadControllerUtils {
    * @return partition id in lead controller resource.
    */
   public static int getPartitionIdForTable(String rawTableName) {
-    return (HashUtil.murmur2(rawTableName.getBytes(UTF_8)) & Integer.MAX_VALUE)
+    return (HashFunctions.murmurHash2(rawTableName.getBytes(UTF_8)) & 
Integer.MAX_VALUE)
         % Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
   }
 
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/HashFunctionsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/HashFunctionsTest.java
new file mode 100644
index 0000000000..6f23c8c89a
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/HashFunctionsTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.function.scalar;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class HashFunctionsTest {
+
+  @Test
+  public void testShaHash() {
+    String input = "testString";
+    assertEquals(HashFunctions.sha(input.getBytes()), 
"956265657d0b637ef65b9b59f9f858eecf55ed6a");
+  }
+
+  @Test
+  public void testSha2224Hash() {
+    String input = "testString";
+    assertEquals(HashFunctions.sha224(input.getBytes()),
+        "bb54d1095764bff72b570dcdc3172ed6d1b26695494528a0059c95ae");
+  }
+
+  @Test
+  public void testSha256Hash() {
+    String input = "testString";
+    assertEquals(HashFunctions.sha256(input.getBytes()),
+        "4acf0b39d9c4766709a3689f553ac01ab550545ffa4544dfc0b2cea82fba02a3");
+  }
+
+  @Test
+  public void testSha512Hash() {
+    String input = "testString";
+    assertEquals(HashFunctions.sha512(input.getBytes()),
+        "c48af5a7f6d4a851fc8a434eed638ab1a6ef68e19dbcae894ac67c9fbc5bcb01"
+            + 
"82b8e7123b3df3c9e4dcb7690c23103f03dc17f54352071ceb2a4eb204b26b91");
+  }
+
+  @Test
+  public void testMd2Hash() {
+    String input = "testString";
+    assertEquals(HashFunctions.md2(input.getBytes()), 
"466c453913ba0d8325f96b2d47984fb5");
+  }
+
+  @Test
+  public void testMd5Hash() {
+    String input = "testString";
+    assertEquals(HashFunctions.md5(input.getBytes()), 
"536788f4dbdffeecfbb8f350a941eea3");
+  }
+
+  @Test
+  public void testMurmurHash2() {
+    String input = "testString";
+    assertEquals(HashFunctions.murmurHash2(input.getBytes()), -534425817);
+  }
+
+  @Test
+  public void testMurmurHash2UTF8() {
+    String input = "testString";
+    assertEquals(HashFunctions.murmurHash2UTF8(input), -534425817);
+  }
+
+  @Test
+  public void testMurmurHash2Bit64() {
+    String input = "testString";
+    assertEquals(HashFunctions.murmurHash2Bit64(input.getBytes()), 
3907736674355139845L);
+    assertEquals(HashFunctions.murmurHash2Bit64(input.getBytes(), 12345), 
-2138976126980760436L);
+  }
+
+  @Test
+  public void testMurmurHash3Bit32() {
+    String input = "testString";
+    assertEquals(HashFunctions.murmurHash3Bit32(input.getBytes(), 0), 
-1435605585);
+  }
+
+  @Test
+  public void testMurmurHash3Bit64() {
+    String input = "testString";
+    assertEquals(HashFunctions.murmurHash3Bit64(input.getBytes(), 0), 
-3652179990542706350L);
+  }
+
+  @Test
+  public void testMurmurHash3Bit128() {
+    String input = "testString";
+    assertEquals(HashFunctions.murmurHash3Bit128(input.getBytes(), 0),
+        new byte[]{82, -103, -23, 15, -90, -39, 80, -51, 15, 73, -81, -28, 
111, -21, -78, 108});
+  }
+
+  @Test
+  public void testMurmurHash3X64Bit32() {
+    String input = "testString";
+    assertEquals(HashFunctions.murmurHash3X64Bit32(input.getBytes(), 0), 
-1096986291);
+  }
+
+  @Test
+  public void testMurmurHash3X64Bit64() {
+    String input = "testString";
+    assertEquals(HashFunctions.murmurHash3X64Bit64(input.getBytes(), 0), 
-1096986291L);
+  }
+
+  @Test
+  public void testMurmurHash3X64Bit128() {
+    String input = "testString";
+    assertEquals(HashFunctions.murmurHash3X64Bit128(input.getBytes(), 0),
+        new byte[]{-66, -99, 81, 77, -7, 29, 124, 76, 42, 38, -34, -42, -92, 
-83, 83, 13});
+  }
+
+  @Test
+  public void testAdler32() {
+    String input = "testString";
+    assertEquals(HashFunctions.adler32(input.getBytes()), 392102968);
+  }
+
+  @Test
+  public void testCrc32() {
+    String input = "testString";
+    assertEquals(HashFunctions.crc32(input.getBytes()), 418708744);
+  }
+
+  @Test
+  public void testCrc32c() {
+    String input = "testString";
+    assertEquals(HashFunctions.crc32c(input.getBytes()), -1608760557);
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
index c8f673a3dd..ba22c1a992 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
@@ -18,11 +18,10 @@
  */
 package org.apache.pinot.segment.spi.partition;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.hash.Hashing;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -31,7 +30,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  * Implementation of {@link PartitionFunction} which partitions based on 32 
bit murmur3 hash
  */
 public class Murmur3PartitionFunction implements PartitionFunction {
-  public static final byte INVALID_CHAR = (byte) '?';
   private static final String NAME = "Murmur3";
   private static final String SEED_KEY = "seed";
   private static final String VARIANT_KEY = "variant";
@@ -71,7 +69,8 @@ public class Murmur3PartitionFunction implements 
PartitionFunction {
 
   @Override
   public int getPartition(String value) {
-    int hash = _useX64 ? murmur3Hash32BitsX64(value, _seed) : 
murmur3Hash32BitsX86(value.getBytes(UTF_8), _seed);
+    int hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(value, _seed)
+        : MurmurHashFunctions.murmurHash3X86Bit32(value.getBytes(UTF_8), 
_seed);
     return (hash & Integer.MAX_VALUE) % _numPartitions;
   }
 
@@ -90,200 +89,4 @@ public class Murmur3PartitionFunction implements 
PartitionFunction {
   public String toString() {
     return NAME;
   }
-
-  @VisibleForTesting
-  static int murmur3Hash32BitsX86(byte[] data, int seed) {
-    return Hashing.murmur3_32_fixed(seed).hashBytes(data).asInt();
-  }
-
-  /**
-   * Taken from <a href=
-   * 
"https://github.com/infinispan/infinispan/blob/main/commons/all/src/main/java/org/infinispan/commons/hash
-   * /MurmurHash3.java"
-   * >Infinispan code base</a>.
-   *
-   * MurmurHash3 implementation in Java, based on Austin Appleby's <a href=
-   * "https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp";
-   * >original in C</a>
-   *
-   * This is an implementation of MurmurHash3 to generate 32 bit hash for x64 
architecture (not part of the original
-   * Murmur3 implementations) used by Infinispan and Debezium, Removed the 
parts that we don't need and formatted
-   * the code to Apache Pinot's Checkstyle.
-   *
-   * @author Patrick McFarland
-   * @see <a href="http://sites.google.com/site/murmurhash/";>MurmurHash 
website</a>
-   * @see <a href="http://en.wikipedia.org/wiki/MurmurHash";>MurmurHash entry 
on Wikipedia</a>
-   */
-
-  private static void bmix(State state) {
-    state._k1 *= state._c1;
-    state._k1 = (state._k1 << 23) | (state._k1 >>> 64 - 23);
-    state._k1 *= state._c2;
-    state._h1 ^= state._k1;
-    state._h1 += state._h2;
-
-    state._h2 = (state._h2 << 41) | (state._h2 >>> 64 - 41);
-
-    state._k2 *= state._c2;
-    state._k2 = (state._k2 << 23) | (state._k2 >>> 64 - 23);
-    state._k2 *= state._c1;
-    state._h2 ^= state._k2;
-    state._h2 += state._h1;
-
-    state._h1 = state._h1 * 3 + 0x52dce729;
-    state._h2 = state._h2 * 3 + 0x38495ab5;
-
-    state._c1 = state._c1 * 5 + 0x7b7d159c;
-    state._c2 = state._c2 * 5 + 0x6bce6396;
-  }
-
-  private static long fmix(long k) {
-    k ^= k >>> 33;
-    k *= 0xff51afd7ed558ccdL;
-    k ^= k >>> 33;
-    k *= 0xc4ceb9fe1a85ec53L;
-    k ^= k >>> 33;
-
-    return k;
-  }
-
-  @VisibleForTesting
-  static int murmur3Hash32BitsX64(String s, int seed) {
-    State state = new State();
-
-    state._h1 = 0x9368e53c2f6af274L ^ seed;
-    state._h2 = 0x586dcd208f7cd3fdL ^ seed;
-
-    state._c1 = 0x87c37b91114253d5L;
-    state._c2 = 0x4cf5ad432745937fL;
-
-    int byteLen = 0;
-    int stringLen = s.length();
-
-    // CHECKSTYLE:OFF
-    for (int i = 0; i < stringLen; i++) {
-      char c1 = s.charAt(i);
-      int cp;
-      if (!Character.isSurrogate(c1)) {
-        cp = c1;
-      } else if (Character.isHighSurrogate(c1)) {
-        if (i + 1 < stringLen) {
-          char c2 = s.charAt(i + 1);
-          if (Character.isLowSurrogate(c2)) {
-            i++;
-            cp = Character.toCodePoint(c1, c2);
-          } else {
-            cp = INVALID_CHAR;
-          }
-        } else {
-          cp = INVALID_CHAR;
-        }
-      } else {
-        cp = INVALID_CHAR;
-      }
-
-      if (cp <= 0x7f) {
-        addByte(state, (byte) cp, byteLen++);
-      } else if (cp <= 0x07ff) {
-        byte b1 = (byte) (0xc0 | (0x1f & (cp >> 6)));
-        byte b2 = (byte) (0x80 | (0x3f & cp));
-        addByte(state, b1, byteLen++);
-        addByte(state, b2, byteLen++);
-      } else if (cp <= 0xffff) {
-        byte b1 = (byte) (0xe0 | (0x0f & (cp >> 12)));
-        byte b2 = (byte) (0x80 | (0x3f & (cp >> 6)));
-        byte b3 = (byte) (0x80 | (0x3f & cp));
-        addByte(state, b1, byteLen++);
-        addByte(state, b2, byteLen++);
-        addByte(state, b3, byteLen++);
-      } else {
-        byte b1 = (byte) (0xf0 | (0x07 & (cp >> 18)));
-        byte b2 = (byte) (0x80 | (0x3f & (cp >> 12)));
-        byte b3 = (byte) (0x80 | (0x3f & (cp >> 6)));
-        byte b4 = (byte) (0x80 | (0x3f & cp));
-        addByte(state, b1, byteLen++);
-        addByte(state, b2, byteLen++);
-        addByte(state, b3, byteLen++);
-        addByte(state, b4, byteLen++);
-      }
-    }
-
-    long savedK1 = state._k1;
-    long savedK2 = state._k2;
-    state._k1 = 0;
-    state._k2 = 0;
-    switch (byteLen & 15) {
-      case 15:
-        state._k2 ^= (long) ((byte) (savedK2 >> 48)) << 48;
-      case 14:
-        state._k2 ^= (long) ((byte) (savedK2 >> 40)) << 40;
-      case 13:
-        state._k2 ^= (long) ((byte) (savedK2 >> 32)) << 32;
-      case 12:
-        state._k2 ^= (long) ((byte) (savedK2 >> 24)) << 24;
-      case 11:
-        state._k2 ^= (long) ((byte) (savedK2 >> 16)) << 16;
-      case 10:
-        state._k2 ^= (long) ((byte) (savedK2 >> 8)) << 8;
-      case 9:
-        state._k2 ^= ((byte) savedK2);
-      case 8:
-        state._k1 ^= (long) ((byte) (savedK1 >> 56)) << 56;
-      case 7:
-        state._k1 ^= (long) ((byte) (savedK1 >> 48)) << 48;
-      case 6:
-        state._k1 ^= (long) ((byte) (savedK1 >> 40)) << 40;
-      case 5:
-        state._k1 ^= (long) ((byte) (savedK1 >> 32)) << 32;
-      case 4:
-        state._k1 ^= (long) ((byte) (savedK1 >> 24)) << 24;
-      case 3:
-        state._k1 ^= (long) ((byte) (savedK1 >> 16)) << 16;
-      case 2:
-        state._k1 ^= (long) ((byte) (savedK1 >> 8)) << 8;
-      case 1:
-        state._k1 ^= ((byte) savedK1);
-        bmix(state);
-    }
-    // CHECKSTYLE:ON
-
-    state._h2 ^= byteLen;
-
-    state._h1 += state._h2;
-    state._h2 += state._h1;
-
-    state._h1 = fmix(state._h1);
-    state._h2 = fmix(state._h2);
-
-    state._h1 += state._h2;
-    state._h2 += state._h1;
-
-    return (int) (state._h1 >> 32);
-  }
-
-  private static void addByte(State state, byte b, int len) {
-    int shift = (len & 0x7) * 8;
-    long bb = (b & 0xffL) << shift;
-    if ((len & 0x8) == 0) {
-      state._k1 |= bb;
-    } else {
-      state._k2 |= bb;
-      if ((len & 0xf) == 0xf) {
-        bmix(state);
-        state._k1 = 0;
-        state._k2 = 0;
-      }
-    }
-  }
-
-  private static class State {
-    long _h1;
-    long _h2;
-
-    long _k1;
-    long _k2;
-
-    long _c1;
-    long _c2;
-  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
index a43a49f789..8251450c44 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.segment.spi.partition;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -42,7 +42,7 @@ public class MurmurPartitionFunction implements 
PartitionFunction {
 
   @Override
   public int getPartition(String value) {
-    return (murmur2(value.getBytes(UTF_8)) & Integer.MAX_VALUE) % 
_numPartitions;
+    return (MurmurHashFunctions.murmurHash2(value.getBytes(UTF_8)) & 
Integer.MAX_VALUE) % _numPartitions;
   }
 
   @Override
@@ -60,56 +60,4 @@ public class MurmurPartitionFunction implements 
PartitionFunction {
   public String toString() {
     return NAME;
   }
-
-  /**
-   * NOTE: This code has been copied over from 
org.apache.kafka.common.utils.Utils::murmur2
-   *
-   * Generates 32 bit murmur2 hash from byte array
-   * @param data byte array to hash
-   * @return 32 bit hash of the given array
-   */
-  @VisibleForTesting
-  static int murmur2(final byte[] data) {
-    int length = data.length;
-    int seed = 0x9747b28c;
-    // 'm' and 'r' are mixing constants generated offline.
-    // They're not really 'magic', they just happen to work well.
-    final int m = 0x5bd1e995;
-    final int r = 24;
-
-    // Initialize the hash to a random value
-    int h = seed ^ length;
-    int length4 = length / 4;
-
-    for (int i = 0; i < length4; i++) {
-      final int i4 = i * 4;
-      int k =
-          (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 
2] & 0xff) << 16) + ((data[i4 + 3] & 0xff)
-              << 24);
-      k *= m;
-      k ^= k >>> r;
-      k *= m;
-      h *= m;
-      h ^= k;
-    }
-
-    // Handle the last few bytes of the input array
-    // CHECKSTYLE:OFF
-    switch (length % 4) {
-      case 3:
-        h ^= (data[(length & ~3) + 2] & 0xff) << 16;
-      case 2:
-        h ^= (data[(length & ~3) + 1] & 0xff) << 8;
-      case 1:
-        h ^= data[length & ~3] & 0xff;
-        h *= m;
-    }
-    // CHECKSTYLE:ON
-
-    h ^= h >>> 13;
-    h *= m;
-    h ^= h >>> 15;
-
-    return h;
-  }
 }
diff --git 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
index a80f65d054..041920894b 100644
--- 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
+++ 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
 import org.testng.annotations.Test;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -383,15 +384,15 @@ public class PartitionFunctionTest {
   }
 
   /**
-   * Tests the equivalence of org.apache.kafka.common.utils.Utils::murmur2 and
+   * Tests the equivalence of org.apache.kafka.common.utils.Utils::murmurHash2 
and
    * {@link MurmurPartitionFunction#getPartition}
-   * Our implementation of murmur2 has been copied over from Utils::murmur2
+   * Our implementation of murmurHash2 has been copied over from 
Utils::murmurHash2
    */
   @Test
   public void testMurmurEquivalence() {
 
     // 10 values of size 7, were randomly generated, using {@link 
Random::nextBytes} with seed 100
-    // Applied org.apache.kafka.common.utils.Utils::murmur2 to those values 
and stored in expectedMurmurValues
+    // Applied org.apache.kafka.common.utils.Utils::murmurHash2 to those 
values and stored in expectedMurmurValues
     int[] expectedMurmurValues = new int[]{
         -1044832774, -594851693, 1441878663, 1766739604, 1034724141, 
-296671913, 443511156, 1483601453, 1819695080,
         -931669296
@@ -401,12 +402,12 @@ public class PartitionFunctionTest {
     Random random = new Random(seed);
 
     // Generate the same values as above - 10 random values of size 7, using 
{@link Random::nextBytes} with seed 100
-    // Apply {@link MurmurPartitionFunction::murmur2
+    // Apply {@link MurmurPartitionFunction::murmurHash2
     // compare with stored results
     byte[] bytes = new byte[7];
     for (int expectedMurmurValue : expectedMurmurValues) {
       random.nextBytes(bytes);
-      assertEquals(MurmurPartitionFunction.murmur2(bytes), 
expectedMurmurValue);
+      assertEquals(MurmurHashFunctions.murmurHash2(bytes), 
expectedMurmurValue);
     }
   }
 
@@ -419,8 +420,8 @@ public class PartitionFunctionTest {
 
     // 10 String values of size 7, were randomly generated, using {@link 
Random::nextBytes} with seed 100
     // Applied {@link MurmurPartitionFunction} initialized with 5 partitions, 
by overriding
-    // {@MurmurPartitionFunction::murmur2} with org
-    // .apache.kafka.common.utils.Utils::murmur2
+    // {@MurmurPartitionFunction::murmurHash2} with org
+    // .apache.kafka.common.utils.Utils::murmurHash2
     // stored the results in expectedPartitions
     int[] expectedPartitions = new int[]{1, 4, 4, 1, 1, 2, 0, 4, 2, 3};
 
@@ -549,8 +550,8 @@ public class PartitionFunctionTest {
     for (int expectedHashValue : expectedHashValues) {
       random.nextBytes(bytes);
       String nextString = new String(bytes, UTF_8);
-      int actualHashValue = useX64 ? 
Murmur3PartitionFunction.murmur3Hash32BitsX64(nextString, hashSeed)
-          : Murmur3PartitionFunction.murmur3Hash32BitsX86(bytes, hashSeed);
+      int actualHashValue = useX64 ? 
MurmurHashFunctions.murmurHash3X64Bit32(nextString, hashSeed)
+          : MurmurHashFunctions.murmurHash3X86Bit32(bytes, hashSeed);
       assertEquals(actualHashValue, expectedHashValue);
     }
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/hash/MurmurHashFunctions.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/hash/MurmurHashFunctions.java
new file mode 100644
index 0000000000..276eb0f46e
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/hash/MurmurHashFunctions.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils.hash;
+
+import com.google.common.hash.Hashing;
+import java.nio.ByteBuffer;
+
+
+public class MurmurHashFunctions {
+  public static final byte INVALID_CHAR = (byte) '?';
+
+  private MurmurHashFunctions() {
+  }
+
+  /**
+   * NOTE: This code has been copied over from 
org.apache.kafka.common.utils.Utils::murmurHash2
+   *
+   * Generates 32 bit murmurHash2 hash from byte array
+   * @param data byte array to hash
+   * @return 32 bit hash of the given array
+   */
+  public static int murmurHash2(final byte[] data) {
+    int length = data.length;
+    int seed = 0x9747b28c;
+    // 'm' and 'r' are mixing constants generated offline.
+    // They're not really 'magic', they just happen to work well.
+    final int m = 0x5bd1e995;
+    final int r = 24;
+
+    // Initialize the hash to a random value
+    int h = seed ^ length;
+    int length4 = length / 4;
+
+    for (int i = 0; i < length4; i++) {
+      final int i4 = i * 4;
+      int k =
+          (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 
2] & 0xff) << 16) + ((data[i4 + 3] & 0xff)
+              << 24);
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+      h *= m;
+      h ^= k;
+    }
+
+    // Handle the last few bytes of the input array
+    // CHECKSTYLE:OFF
+    switch (length % 4) {
+      case 3:
+        h ^= (data[(length & ~3) + 2] & 0xff) << 16;
+      case 2:
+        h ^= (data[(length & ~3) + 1] & 0xff) << 8;
+      case 1:
+        h ^= data[length & ~3] & 0xff;
+        h *= m;
+    }
+    // CHECKSTYLE:ON
+
+    h ^= h >>> 13;
+    h *= m;
+    h ^= h >>> 15;
+
+    return h;
+  }
+
+  /**
+   * Implement 64-bit Murmur2 hash.
+   * @param data byte array to hash
+   * @return 64-bit hash
+   */
+  public static long murmurHash2Bit64(final byte[] data) {
+    return murmurHash2Bit64(data, data.length, 0xe17a1465);
+  }
+
+  /**
+   * Implement 64-bit Murmur2 hash.
+   * @param data byte array to hash
+   * @param length byte array length
+   * @param seed hash seed
+   * @return 64-bit hash
+   */
+  public static long murmurHash2Bit64(final byte[] data, int length, int seed) 
{
+    final long m = 0xc6a4a7935bd1e995L;
+    final int r = 47;
+
+    long h = (seed & 0xffffffffL) ^ (length * m);
+
+    int length8 = length / 8;
+
+    for (int i = 0; i < length8; i++) {
+      final int i8 = i * 8;
+      long k =
+          ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) + 
(((long) data[i8 + 2] & 0xff) << 16) + (
+              ((long) data[i8 + 3] & 0xff) << 24) + (((long) data[i8 + 4] & 
0xff) << 32) + (((long) data[i8 + 5] & 0xff)
+              << 40) + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 
+ 7] & 0xff) << 56);
+
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+
+      h ^= k;
+      h *= m;
+    }
+
+    // CHECKSTYLE:OFF: checkstyle:coding
+    switch (length % 8) {
+      case 7:
+        h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48;
+      case 6:
+        h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40;
+      case 5:
+        h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32;
+      case 4:
+        h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24;
+      case 3:
+        h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16;
+      case 2:
+        h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8;
+      case 1:
+        h ^= data[length & ~7] & 0xff;
+        h *= m;
+    }
+    // CHECKSTYLE:ON: checkstyle:coding
+
+    h ^= h >>> r;
+    h *= m;
+    h ^= h >>> r;
+    return h;
+  }
+
+  public static int murmurHash3X86Bit32(byte[] data, int seed) {
+    return Hashing.murmur3_32_fixed(seed).hashBytes(data).asInt();
+  }
+
+  /**
+   * Taken from <a href=
+   * 
"https://github.com/infinispan/infinispan/blob/main/commons/all/src/main/java/org/infinispan/commons/hash
+   * /MurmurHash3.java"
+   * >Infinispan code base</a>.
+   *
+   * MurmurHash3 implementation in Java, based on Austin Appleby's <a href=
+   * "https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp";
+   * >original in C</a>s
+   *
+   * This is an implementation of MurmurHash3 to generate 32 bit hash for x64 
architecture (not part of the original
+   * Murmur3 implementations) used by Infinispan and Debezium, Removed the 
parts that we don't need and formatted
+   * the code to Apache Pinot's Checkstyle.
+   *
+   * @author Patrick McFarland
+   * @see <a href="http://sites.google.com/site/murmurhash/";>MurmurHash 
website</a>
+   * @see <a href="http://en.wikipedia.org/wiki/MurmurHash";>MurmurHash entry 
on Wikipedia</a>
+   */
+  public static int murmurHash3X64Bit32(String s, int seed) {
+    State state = new State();
+
+    state._h1 = 0x9368e53c2f6af274L ^ seed;
+    state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+    state._c1 = 0x87c37b91114253d5L;
+    state._c2 = 0x4cf5ad432745937fL;
+
+    int byteLen = 0;
+    int stringLen = s.length();
+
+    // CHECKSTYLE:OFF: checkstyle:coding
+    for (int i = 0; i < stringLen; i++) {
+      char c1 = s.charAt(i);
+      int cp;
+      if (!Character.isSurrogate(c1)) {
+        cp = c1;
+      } else if (Character.isHighSurrogate(c1)) {
+        if (i + 1 < stringLen) {
+          char c2 = s.charAt(i + 1);
+          if (Character.isLowSurrogate(c2)) {
+            i++;
+            cp = Character.toCodePoint(c1, c2);
+          } else {
+            cp = INVALID_CHAR;
+          }
+        } else {
+          cp = INVALID_CHAR;
+        }
+      } else {
+        cp = INVALID_CHAR;
+      }
+
+      if (cp <= 0x7f) {
+        addByte(state, (byte) cp, byteLen++);
+      } else if (cp <= 0x07ff) {
+        byte b1 = (byte) (0xc0 | (0x1f & (cp >> 6)));
+        byte b2 = (byte) (0x80 | (0x3f & cp));
+        addByte(state, b1, byteLen++);
+        addByte(state, b2, byteLen++);
+      } else if (cp <= 0xffff) {
+        byte b1 = (byte) (0xe0 | (0x0f & (cp >> 12)));
+        byte b2 = (byte) (0x80 | (0x3f & (cp >> 6)));
+        byte b3 = (byte) (0x80 | (0x3f & cp));
+        addByte(state, b1, byteLen++);
+        addByte(state, b2, byteLen++);
+        addByte(state, b3, byteLen++);
+      } else {
+        byte b1 = (byte) (0xf0 | (0x07 & (cp >> 18)));
+        byte b2 = (byte) (0x80 | (0x3f & (cp >> 12)));
+        byte b3 = (byte) (0x80 | (0x3f & (cp >> 6)));
+        byte b4 = (byte) (0x80 | (0x3f & cp));
+        addByte(state, b1, byteLen++);
+        addByte(state, b2, byteLen++);
+        addByte(state, b3, byteLen++);
+        addByte(state, b4, byteLen++);
+      }
+    }
+
+    long savedK1 = state._k1;
+    long savedK2 = state._k2;
+    state._k1 = 0;
+    state._k2 = 0;
+    switch (byteLen & 15) {
+      case 15:
+        state._k2 ^= (long) ((byte) (savedK2 >> 48)) << 48;
+      case 14:
+        state._k2 ^= (long) ((byte) (savedK2 >> 40)) << 40;
+      case 13:
+        state._k2 ^= (long) ((byte) (savedK2 >> 32)) << 32;
+      case 12:
+        state._k2 ^= (long) ((byte) (savedK2 >> 24)) << 24;
+      case 11:
+        state._k2 ^= (long) ((byte) (savedK2 >> 16)) << 16;
+      case 10:
+        state._k2 ^= (long) ((byte) (savedK2 >> 8)) << 8;
+      case 9:
+        state._k2 ^= ((byte) savedK2);
+      case 8:
+        state._k1 ^= (long) ((byte) (savedK1 >> 56)) << 56;
+      case 7:
+        state._k1 ^= (long) ((byte) (savedK1 >> 48)) << 48;
+      case 6:
+        state._k1 ^= (long) ((byte) (savedK1 >> 40)) << 40;
+      case 5:
+        state._k1 ^= (long) ((byte) (savedK1 >> 32)) << 32;
+      case 4:
+        state._k1 ^= (long) ((byte) (savedK1 >> 24)) << 24;
+      case 3:
+        state._k1 ^= (long) ((byte) (savedK1 >> 16)) << 16;
+      case 2:
+        state._k1 ^= (long) ((byte) (savedK1 >> 8)) << 8;
+      case 1:
+        state._k1 ^= ((byte) savedK1);
+        bmix(state);
+    }
+    // CHECKSTYLE:ON: checkstyle:coding
+    state._h2 ^= byteLen;
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    state._h1 = fmix(state._h1);
+    state._h2 = fmix(state._h2);
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    return (int) (state._h1 >> 32);
+  }
+
+  /**
+   * Hash a value using the x64 128 bit variant of MurmurHash3
+   *
+   * @param key value to hash
+   * @param seed random value
+   * @return 128 bit hashed key, in an array containing two longs
+   */
+  public static byte[] murmurHash3X64Bit128(final byte[] key, final int seed) {
+    State state = new State();
+
+    state._h1 = 0x9368e53c2f6af274L ^ seed;
+    state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+    state._c1 = 0x87c37b91114253d5L;
+    state._c2 = 0x4cf5ad432745937fL;
+
+    for (int i = 0; i < key.length / 16; i++) {
+      state._k1 = getblock(key, i * 2 * 8);
+      state._k2 = getblock(key, (i * 2 + 1) * 8);
+
+      bmix(state);
+    }
+
+    state._k1 = 0;
+    state._k2 = 0;
+
+    int tail = (key.length >>> 4) << 4;
+    // CHECKSTYLE:OFF: checkstyle:coding
+    switch (key.length & 15) {
+      case 15:
+        state._k2 ^= (long) key[tail + 14] << 48;
+      case 14:
+        state._k2 ^= (long) key[tail + 13] << 40;
+      case 13:
+        state._k2 ^= (long) key[tail + 12] << 32;
+      case 12:
+        state._k2 ^= (long) key[tail + 11] << 24;
+      case 11:
+        state._k2 ^= (long) key[tail + 10] << 16;
+      case 10:
+        state._k2 ^= (long) key[tail + 9] << 8;
+      case 9:
+        state._k2 ^= key[tail + 8];
+      case 8:
+        state._k1 ^= (long) key[tail + 7] << 56;
+      case 7:
+        state._k1 ^= (long) key[tail + 6] << 48;
+      case 6:
+        state._k1 ^= (long) key[tail + 5] << 40;
+      case 5:
+        state._k1 ^= (long) key[tail + 4] << 32;
+      case 4:
+        state._k1 ^= (long) key[tail + 3] << 24;
+      case 3:
+        state._k1 ^= (long) key[tail + 2] << 16;
+      case 2:
+        state._k1 ^= (long) key[tail + 1] << 8;
+      case 1:
+        state._k1 ^= key[tail + 0];
+        bmix(state);
+    }
+    // CHECKSTYLE:ON: checkstyle:coding
+
+    state._h2 ^= key.length;
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    state._h1 = fmix(state._h1);
+    state._h2 = fmix(state._h2);
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    ByteBuffer buffer = ByteBuffer.allocate(16);
+    buffer.putLong(state._h1);
+    buffer.putLong(state._h2);
+    return buffer.array();
+  }
+
+  /**
+   * Hash a value using the x64 64 bit variant of murmurHash3
+   *
+   * @param key value to hash
+   * @param seed random value
+   * @return 64 bit hashed key
+   */
+  public static long murmurHash3X64Bit64(final byte[] key, final int seed) {
+    // Exactly the same as murmurHash3X64Bit128, except it only returns 
state.h1
+    State state = new State();
+
+    state._h1 = 0x9368e53c2f6af274L ^ seed;
+    state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+    state._c1 = 0x87c37b91114253d5L;
+    state._c2 = 0x4cf5ad432745937fL;
+
+    for (int i = 0; i < key.length / 16; i++) {
+      state._k1 = getblock(key, i * 2 * 8);
+      state._k2 = getblock(key, (i * 2 + 1) * 8);
+
+      bmix(state);
+    }
+
+    state._k1 = 0;
+    state._k2 = 0;
+
+    int tail = (key.length >>> 4) << 4;
+    // CHECKSTYLE:OFF: checkstyle:coding
+    switch (key.length & 15) {
+      case 15:
+        state._k2 ^= (long) key[tail + 14] << 48;
+      case 14:
+        state._k2 ^= (long) key[tail + 13] << 40;
+      case 13:
+        state._k2 ^= (long) key[tail + 12] << 32;
+      case 12:
+        state._k2 ^= (long) key[tail + 11] << 24;
+      case 11:
+        state._k2 ^= (long) key[tail + 10] << 16;
+      case 10:
+        state._k2 ^= (long) key[tail + 9] << 8;
+      case 9:
+        state._k2 ^= key[tail + 8];
+      case 8:
+        state._k1 ^= (long) key[tail + 7] << 56;
+      case 7:
+        state._k1 ^= (long) key[tail + 6] << 48;
+      case 6:
+        state._k1 ^= (long) key[tail + 5] << 40;
+      case 5:
+        state._k1 ^= (long) key[tail + 4] << 32;
+      case 4:
+        state._k1 ^= (long) key[tail + 3] << 24;
+      case 3:
+        state._k1 ^= (long) key[tail + 2] << 16;
+      case 2:
+        state._k1 ^= (long) key[tail + 1] << 8;
+      case 1:
+        state._k1 ^= key[tail];
+        bmix(state);
+    }
+    // CHECKSTYLE:ON: checkstyle:coding
+
+    state._h2 ^= key.length;
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    state._h1 = fmix(state._h1);
+    state._h2 = fmix(state._h2);
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    return state._h1;
+  }
+
+  /**
+   * Hash a value using the x64 32 bit variant of murmurHash3
+   *
+   * @param key value to hash
+   * @param seed random value
+   * @return 32 bit hashed key
+   */
+  public static int murmurHash3X64Bit32(final byte[] key, final int seed) {
+    return (int) (murmurHash3X64Bit64(key, seed) >>> 32);
+  }
+
+  private static void addByte(State state, byte b, int len) {
+    int shift = (len & 0x7) * 8;
+    long bb = (b & 0xffL) << shift;
+    if ((len & 0x8) == 0) {
+      state._k1 |= bb;
+    } else {
+      state._k2 |= bb;
+      if ((len & 0xf) == 0xf) {
+        bmix(state);
+        state._k1 = 0;
+        state._k2 = 0;
+      }
+    }
+  }
+
+  static long getblock(byte[] key, int i) {
+    return ((key[i + 0] & 0x00000000000000FFL)) | ((key[i + 1] & 
0x00000000000000FFL) << 8) | (
+        (key[i + 2] & 0x00000000000000FFL) << 16) | ((key[i + 3] & 
0x00000000000000FFL) << 24) | (
+        (key[i + 4] & 0x00000000000000FFL) << 32) | ((key[i + 5] & 
0x00000000000000FFL) << 40) | (
+        (key[i + 6] & 0x00000000000000FFL) << 48) | ((key[i + 7] & 
0x00000000000000FFL) << 56);
+  }
+
+  private static void bmix(State state) {
+    state._k1 *= state._c1;
+    state._k1 = (state._k1 << 23) | (state._k1 >>> 64 - 23);
+    state._k1 *= state._c2;
+    state._h1 ^= state._k1;
+    state._h1 += state._h2;
+
+    state._h2 = (state._h2 << 41) | (state._h2 >>> 64 - 41);
+
+    state._k2 *= state._c2;
+    state._k2 = (state._k2 << 23) | (state._k2 >>> 64 - 23);
+    state._k2 *= state._c1;
+    state._h2 ^= state._k2;
+    state._h2 += state._h1;
+
+    state._h1 = state._h1 * 3 + 0x52dce729;
+    state._h2 = state._h2 * 3 + 0x38495ab5;
+
+    state._c1 = state._c1 * 5 + 0x7b7d159c;
+    state._c2 = state._c2 * 5 + 0x6bce6396;
+  }
+
+  private static long fmix(long k) {
+    k ^= k >>> 33;
+    k *= 0xff51afd7ed558ccdL;
+    k ^= k >>> 33;
+    k *= 0xc4ceb9fe1a85ec53L;
+    k ^= k >>> 33;
+
+    return k;
+  }
+
+  private static class State {
+    long _h1;
+    long _h2;
+
+    long _k1;
+    long _k2;
+
+    long _c1;
+    long _c2;
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 114a6a4fa6..91e35b1ece 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -31,10 +31,10 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.utils.hash.MurmurHashFunctions;
 import org.apache.pinot.tools.Command;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.slf4j.Logger;
@@ -134,7 +134,7 @@ public class StreamAvroIntoKafkaCommand extends 
AbstractBaseAdminCommand impleme
             break;
         }
         // Write the message to Kafka
-        streamDataProducer.produce(_kafkaTopic, 
Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes);
+        streamDataProducer.produce(_kafkaTopic, 
Longs.toByteArray(MurmurHashFunctions.murmurHash2Bit64(bytes)), bytes);
 
         // Sleep between messages
         if (sleepRequired) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to