This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 74877be2aa Refactor murmur functions to pinot-spi (#15160) 74877be2aa is described below commit 74877be2aa599baa8a2ea16529a07a6dbf611f2c Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Mon Mar 3 15:53:33 2025 -0800 Refactor murmur functions to pinot-spi (#15160) --- .../common/function/scalar/HashFunctions.java | 168 ++++++- .../org/apache/pinot/common/utils/HashUtil.java | 120 ----- .../common/utils/helix/LeadControllerUtils.java | 6 +- .../common/function/scalar/HashFunctionsTest.java | 142 ++++++ .../spi/partition/Murmur3PartitionFunction.java | 203 +------- .../spi/partition/MurmurPartitionFunction.java | 56 +-- .../spi/partition/PartitionFunctionTest.java | 19 +- .../pinot/spi/utils/hash/MurmurHashFunctions.java | 514 +++++++++++++++++++++ .../admin/command/StreamAvroIntoKafkaCommand.java | 4 +- 9 files changed, 843 insertions(+), 389 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java index ccf14a122d..6d167af166 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java @@ -18,8 +18,11 @@ */ package org.apache.pinot.common.function.scalar; +import com.google.common.hash.Hashing; +import java.nio.charset.StandardCharsets; import org.apache.commons.codec.digest.DigestUtils; import org.apache.pinot.spi.annotations.ScalarFunction; +import org.apache.pinot.spi.utils.hash.MurmurHashFunctions; /** @@ -45,7 +48,18 @@ public class HashFunctions { */ @ScalarFunction public static String sha(byte[] input) { - return DigestUtils.shaHex(input); + return DigestUtils.sha1Hex(input); + } + + /** + * Return SHA-224 digest as hex string. + * + * @param input the byte array representing the data + * @return hash string in hex format + */ + @ScalarFunction + public static String sha224(byte[] input) { + return DigestUtils.sha3_224Hex(input); } /** @@ -70,6 +84,17 @@ public class HashFunctions { return DigestUtils.sha512Hex(input); } + /** + * Return MD2 digest as hex string. + * + * @param input the byte array representing the data + * @return hash string in hex format + */ + @ScalarFunction + public static String md2(byte[] input) { + return DigestUtils.md2Hex(input); + } + /** * Return MD5 digest as hex string. * @@ -80,4 +105,145 @@ public class HashFunctions { public static String md5(byte[] input) { return DigestUtils.md5Hex(input); } + + /** + * Computes 32-bit MurmurHash2 of the given byte array. + * + * @param input the byte array to hash + * @return 32-bit hash + */ + @ScalarFunction + public static int murmurHash2(byte[] input) { + return MurmurHashFunctions.murmurHash2(input); + } + + /** + * Computes 32-bit MurmurHash2 of the given string. + * + * @param input the byte array to hash + * @return 32-bit hash + */ + @ScalarFunction + public static int murmurHash2UTF8(String input) { + return MurmurHashFunctions.murmurHash2(input.getBytes(StandardCharsets.UTF_8)); + } + + /** + * Computes 64-bit MurmurHash2 of the given byte array. + * + * @param input the byte array to hash + * @return 64-bit hash + */ + @ScalarFunction + public static long murmurHash2Bit64(byte[] input) { + return MurmurHashFunctions.murmurHash2Bit64(input); + } + + /** + * Computes 64-bit MurmurHash2 of the given byte array and seed. + * + * @param input the byte array to hash + * @return 64-bit hash + */ + @ScalarFunction + public static long murmurHash2Bit64(byte[] input, int seed) { + return MurmurHashFunctions.murmurHash2Bit64(input, input.length, seed); + } + + /** + * Computes 32-bit Murmur3 Hash of the given byte array and seed. + * + * @param input the byte array to hash + * @return 32-bit hash + */ + @ScalarFunction + public static int murmurHash3Bit32(byte[] input, int seed) { + return Hashing.murmur3_32_fixed(seed).hashBytes(input).asInt(); + } + + /** + * Computes 64-bit Murmur3 Hash of the given byte array and seed. + * + * @param input the byte array to hash + * @return 64-bit hash + */ + @ScalarFunction + public static long murmurHash3Bit64(byte[] input, int seed) { + return Hashing.murmur3_128(seed).hashBytes(input).asLong(); + } + + /** + * Computes 128-bit Murmur3 Hash of the given byte array and seed. + * + * @param input the byte array to hash + * @return 128-bit hash represented in a 16-byte array + */ + @ScalarFunction + public static byte[] murmurHash3Bit128(byte[] input, int seed) { + return Hashing.murmur3_128(seed).hashBytes(input).asBytes(); + } + + /** + * Computes 32-bit Murmur3 Hash of the given byte array and seed for x64 platform. + * + * @param input the byte array to hash + * @return 32-bit hash + */ + @ScalarFunction + public static int murmurHash3X64Bit32(byte[] input, int seed) { + return MurmurHashFunctions.murmurHash3X64Bit32(input, seed); + } + + /** + * Computes 64-bit Murmur3 Hash of the given byte array and seed for x64 platform. + * + * @param input the byte array to hash + * @return 64-bit hash + */ + @ScalarFunction + public static long murmurHash3X64Bit64(byte[] input, int seed) { + return MurmurHashFunctions.murmurHash3X64Bit32(input, seed); + } + + /** + * Computes 128-bit Murmur3 Hash of the given byte array and seed for x64 platform. + * + * @param input the byte array to hash + * @return 128-bit hash represented in a 16-byte array + */ + @ScalarFunction + public static byte[] murmurHash3X64Bit128(byte[] input, int seed) { + return MurmurHashFunctions.murmurHash3X64Bit128(input, seed); + } + + /** + * Computes 32-bit Adler Hash of the given byte array + * @param input the byte array to hash + * @return 32-bit hash + */ + @ScalarFunction + public static int adler32(byte[] input) { + return Hashing.adler32().hashBytes(input).asInt(); + } + + /** + * Computes 32-bit CRC (Cyclic Redundancy Check) of the given byte array + * @param input the byte array to hash + * @return 32-bit CRC32 hash + */ + @ScalarFunction + public static int crc32(byte[] input) { + return Hashing.crc32().hashBytes(input).asInt(); + } + + /** + * Computes 32-bit CRC32C (Cyclic Redundancy Check 32C) of the given byte array. + * + * @param input the byte array to hash + * @return 32-bit CRC32C hash + */ + @ScalarFunction + public static int crc32c(byte[] input) { + return Hashing.crc32c().hashBytes(input).asInt(); + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java index a8c5cc9985..de0409751e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/HashUtil.java @@ -19,8 +19,6 @@ package org.apache.pinot.common.utils; import com.google.common.primitives.Ints; -import java.nio.ByteBuffer; -import java.nio.IntBuffer; public class HashUtil { @@ -56,122 +54,4 @@ public class HashUtil { } return Integer.MAX_VALUE; } - - public static long compute(IntBuffer buff) { - buff.rewind(); - ByteBuffer bBuff = ByteBuffer.allocate(buff.array().length * 4); - for (int i : buff.array()) { - bBuff.putInt(i); - } - return compute(bBuff); - } - - public static long compute(ByteBuffer buff) { - return hash64(buff.array(), buff.array().length); - } - - public static long hash64(final byte[] data, int length) { - // Default seed is 0xe17a1465. - return hash64(data, length, 0xe17a1465); - } - - // Implement 64-bit Murmur2 hash. - public static long hash64(final byte[] data, int length, int seed) { - final long m = 0xc6a4a7935bd1e995L; - final int r = 47; - - long h = (seed & 0xffffffffL) ^ (length * m); - - int length8 = length / 8; - - for (int i = 0; i < length8; i++) { - final int i8 = i * 8; - long k = - ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) + (((long) data[i8 + 2] & 0xff) << 16) + ( - ((long) data[i8 + 3] & 0xff) << 24) + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) - << 40) + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56); - - k *= m; - k ^= k >>> r; - k *= m; - - h ^= k; - h *= m; - } - - // CHECKSTYLE:OFF: checkstyle:coding - switch (length % 8) { - case 7: - h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48; - case 6: - h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40; - case 5: - h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32; - case 4: - h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24; - case 3: - h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16; - case 2: - h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8; - case 1: - h ^= data[length & ~7] & 0xff; - h *= m; - } - // CHECKSTYLE:ON: checkstyle:coding - - h ^= h >>> r; - h *= m; - h ^= h >>> r; - return h; - } - - /** - * Generates 32 bit murmur2 hash from byte array - * @param data byte array to hash - * @return 32 bit hash of the given array - */ - @SuppressWarnings("checkstyle") - public static int murmur2(final byte[] data) { - int length = data.length; - int seed = 0x9747b28c; - // 'm' and 'r' are mixing constants generated offline. - // They're not really 'magic', they just happen to work well. - final int m = 0x5bd1e995; - final int r = 24; - - // Initialize the hash to a random value - int h = seed ^ length; - int length4 = length / 4; - - for (int i = 0; i < length4; i++) { - final int i4 = i * 4; - int k = - (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) - << 24); - k *= m; - k ^= k >>> r; - k *= m; - h *= m; - h ^= k; - } - - // CHECKSTYLE:OFF: checkstyle:coding - // Handle the last few bytes of the input array - switch (length % 4) { - case 3: - h ^= (data[(length & ~3) + 2] & 0xff) << 16; - case 2: - h ^= (data[(length & ~3) + 1] & 0xff) << 8; - case 1: - h ^= data[length & ~3] & 0xff; - h *= m; - } - // CHECKSTYLE:ON: checkstyle:coding - - h ^= h >>> 13; - h *= m; - h ^= h >>> 15; - - return h; - } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java index c7dffb044c..9984f5f463 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/LeadControllerUtils.java @@ -24,7 +24,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.ResourceConfig; -import org.apache.pinot.common.utils.HashUtil; +import org.apache.pinot.common.function.scalar.HashFunctions; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +40,7 @@ public class LeadControllerUtils { /** * Given a raw table name and number of partitions, returns the partition id in lead controller resource. - * Uses murmur2 function to get hashcode for table, ignores the most significant bit. + * Uses murmurHash2 function to get hashcode for table, ignores the most significant bit. * Note: This method CANNOT be changed when lead controller resource is enabled. * Otherwise it will assign different controller for the same table, which will mess up the controller periodic * tasks and realtime segment completion. @@ -48,7 +48,7 @@ public class LeadControllerUtils { * @return partition id in lead controller resource. */ public static int getPartitionIdForTable(String rawTableName) { - return (HashUtil.murmur2(rawTableName.getBytes(UTF_8)) & Integer.MAX_VALUE) + return (HashFunctions.murmurHash2(rawTableName.getBytes(UTF_8)) & Integer.MAX_VALUE) % Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE; } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/HashFunctionsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/HashFunctionsTest.java new file mode 100644 index 0000000000..6f23c8c89a --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/HashFunctionsTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.function.scalar; + +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class HashFunctionsTest { + + @Test + public void testShaHash() { + String input = "testString"; + assertEquals(HashFunctions.sha(input.getBytes()), "956265657d0b637ef65b9b59f9f858eecf55ed6a"); + } + + @Test + public void testSha2224Hash() { + String input = "testString"; + assertEquals(HashFunctions.sha224(input.getBytes()), + "bb54d1095764bff72b570dcdc3172ed6d1b26695494528a0059c95ae"); + } + + @Test + public void testSha256Hash() { + String input = "testString"; + assertEquals(HashFunctions.sha256(input.getBytes()), + "4acf0b39d9c4766709a3689f553ac01ab550545ffa4544dfc0b2cea82fba02a3"); + } + + @Test + public void testSha512Hash() { + String input = "testString"; + assertEquals(HashFunctions.sha512(input.getBytes()), + "c48af5a7f6d4a851fc8a434eed638ab1a6ef68e19dbcae894ac67c9fbc5bcb01" + + "82b8e7123b3df3c9e4dcb7690c23103f03dc17f54352071ceb2a4eb204b26b91"); + } + + @Test + public void testMd2Hash() { + String input = "testString"; + assertEquals(HashFunctions.md2(input.getBytes()), "466c453913ba0d8325f96b2d47984fb5"); + } + + @Test + public void testMd5Hash() { + String input = "testString"; + assertEquals(HashFunctions.md5(input.getBytes()), "536788f4dbdffeecfbb8f350a941eea3"); + } + + @Test + public void testMurmurHash2() { + String input = "testString"; + assertEquals(HashFunctions.murmurHash2(input.getBytes()), -534425817); + } + + @Test + public void testMurmurHash2UTF8() { + String input = "testString"; + assertEquals(HashFunctions.murmurHash2UTF8(input), -534425817); + } + + @Test + public void testMurmurHash2Bit64() { + String input = "testString"; + assertEquals(HashFunctions.murmurHash2Bit64(input.getBytes()), 3907736674355139845L); + assertEquals(HashFunctions.murmurHash2Bit64(input.getBytes(), 12345), -2138976126980760436L); + } + + @Test + public void testMurmurHash3Bit32() { + String input = "testString"; + assertEquals(HashFunctions.murmurHash3Bit32(input.getBytes(), 0), -1435605585); + } + + @Test + public void testMurmurHash3Bit64() { + String input = "testString"; + assertEquals(HashFunctions.murmurHash3Bit64(input.getBytes(), 0), -3652179990542706350L); + } + + @Test + public void testMurmurHash3Bit128() { + String input = "testString"; + assertEquals(HashFunctions.murmurHash3Bit128(input.getBytes(), 0), + new byte[]{82, -103, -23, 15, -90, -39, 80, -51, 15, 73, -81, -28, 111, -21, -78, 108}); + } + + @Test + public void testMurmurHash3X64Bit32() { + String input = "testString"; + assertEquals(HashFunctions.murmurHash3X64Bit32(input.getBytes(), 0), -1096986291); + } + + @Test + public void testMurmurHash3X64Bit64() { + String input = "testString"; + assertEquals(HashFunctions.murmurHash3X64Bit64(input.getBytes(), 0), -1096986291L); + } + + @Test + public void testMurmurHash3X64Bit128() { + String input = "testString"; + assertEquals(HashFunctions.murmurHash3X64Bit128(input.getBytes(), 0), + new byte[]{-66, -99, 81, 77, -7, 29, 124, 76, 42, 38, -34, -42, -92, -83, 83, 13}); + } + + @Test + public void testAdler32() { + String input = "testString"; + assertEquals(HashFunctions.adler32(input.getBytes()), 392102968); + } + + @Test + public void testCrc32() { + String input = "testString"; + assertEquals(HashFunctions.crc32(input.getBytes()), 418708744); + } + + @Test + public void testCrc32c() { + String input = "testString"; + assertEquals(HashFunctions.crc32c(input.getBytes()), -1608760557); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java index c8f673a3dd..ba22c1a992 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java @@ -18,11 +18,10 @@ */ package org.apache.pinot.segment.spi.partition; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.hash.Hashing; import java.util.Map; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.utils.hash.MurmurHashFunctions; import static java.nio.charset.StandardCharsets.UTF_8; @@ -31,7 +30,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; * Implementation of {@link PartitionFunction} which partitions based on 32 bit murmur3 hash */ public class Murmur3PartitionFunction implements PartitionFunction { - public static final byte INVALID_CHAR = (byte) '?'; private static final String NAME = "Murmur3"; private static final String SEED_KEY = "seed"; private static final String VARIANT_KEY = "variant"; @@ -71,7 +69,8 @@ public class Murmur3PartitionFunction implements PartitionFunction { @Override public int getPartition(String value) { - int hash = _useX64 ? murmur3Hash32BitsX64(value, _seed) : murmur3Hash32BitsX86(value.getBytes(UTF_8), _seed); + int hash = _useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(value, _seed) + : MurmurHashFunctions.murmurHash3X86Bit32(value.getBytes(UTF_8), _seed); return (hash & Integer.MAX_VALUE) % _numPartitions; } @@ -90,200 +89,4 @@ public class Murmur3PartitionFunction implements PartitionFunction { public String toString() { return NAME; } - - @VisibleForTesting - static int murmur3Hash32BitsX86(byte[] data, int seed) { - return Hashing.murmur3_32_fixed(seed).hashBytes(data).asInt(); - } - - /** - * Taken from <a href= - * "https://github.com/infinispan/infinispan/blob/main/commons/all/src/main/java/org/infinispan/commons/hash - * /MurmurHash3.java" - * >Infinispan code base</a>. - * - * MurmurHash3 implementation in Java, based on Austin Appleby's <a href= - * "https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp" - * >original in C</a> - * - * This is an implementation of MurmurHash3 to generate 32 bit hash for x64 architecture (not part of the original - * Murmur3 implementations) used by Infinispan and Debezium, Removed the parts that we don't need and formatted - * the code to Apache Pinot's Checkstyle. - * - * @author Patrick McFarland - * @see <a href="http://sites.google.com/site/murmurhash/">MurmurHash website</a> - * @see <a href="http://en.wikipedia.org/wiki/MurmurHash">MurmurHash entry on Wikipedia</a> - */ - - private static void bmix(State state) { - state._k1 *= state._c1; - state._k1 = (state._k1 << 23) | (state._k1 >>> 64 - 23); - state._k1 *= state._c2; - state._h1 ^= state._k1; - state._h1 += state._h2; - - state._h2 = (state._h2 << 41) | (state._h2 >>> 64 - 41); - - state._k2 *= state._c2; - state._k2 = (state._k2 << 23) | (state._k2 >>> 64 - 23); - state._k2 *= state._c1; - state._h2 ^= state._k2; - state._h2 += state._h1; - - state._h1 = state._h1 * 3 + 0x52dce729; - state._h2 = state._h2 * 3 + 0x38495ab5; - - state._c1 = state._c1 * 5 + 0x7b7d159c; - state._c2 = state._c2 * 5 + 0x6bce6396; - } - - private static long fmix(long k) { - k ^= k >>> 33; - k *= 0xff51afd7ed558ccdL; - k ^= k >>> 33; - k *= 0xc4ceb9fe1a85ec53L; - k ^= k >>> 33; - - return k; - } - - @VisibleForTesting - static int murmur3Hash32BitsX64(String s, int seed) { - State state = new State(); - - state._h1 = 0x9368e53c2f6af274L ^ seed; - state._h2 = 0x586dcd208f7cd3fdL ^ seed; - - state._c1 = 0x87c37b91114253d5L; - state._c2 = 0x4cf5ad432745937fL; - - int byteLen = 0; - int stringLen = s.length(); - - // CHECKSTYLE:OFF - for (int i = 0; i < stringLen; i++) { - char c1 = s.charAt(i); - int cp; - if (!Character.isSurrogate(c1)) { - cp = c1; - } else if (Character.isHighSurrogate(c1)) { - if (i + 1 < stringLen) { - char c2 = s.charAt(i + 1); - if (Character.isLowSurrogate(c2)) { - i++; - cp = Character.toCodePoint(c1, c2); - } else { - cp = INVALID_CHAR; - } - } else { - cp = INVALID_CHAR; - } - } else { - cp = INVALID_CHAR; - } - - if (cp <= 0x7f) { - addByte(state, (byte) cp, byteLen++); - } else if (cp <= 0x07ff) { - byte b1 = (byte) (0xc0 | (0x1f & (cp >> 6))); - byte b2 = (byte) (0x80 | (0x3f & cp)); - addByte(state, b1, byteLen++); - addByte(state, b2, byteLen++); - } else if (cp <= 0xffff) { - byte b1 = (byte) (0xe0 | (0x0f & (cp >> 12))); - byte b2 = (byte) (0x80 | (0x3f & (cp >> 6))); - byte b3 = (byte) (0x80 | (0x3f & cp)); - addByte(state, b1, byteLen++); - addByte(state, b2, byteLen++); - addByte(state, b3, byteLen++); - } else { - byte b1 = (byte) (0xf0 | (0x07 & (cp >> 18))); - byte b2 = (byte) (0x80 | (0x3f & (cp >> 12))); - byte b3 = (byte) (0x80 | (0x3f & (cp >> 6))); - byte b4 = (byte) (0x80 | (0x3f & cp)); - addByte(state, b1, byteLen++); - addByte(state, b2, byteLen++); - addByte(state, b3, byteLen++); - addByte(state, b4, byteLen++); - } - } - - long savedK1 = state._k1; - long savedK2 = state._k2; - state._k1 = 0; - state._k2 = 0; - switch (byteLen & 15) { - case 15: - state._k2 ^= (long) ((byte) (savedK2 >> 48)) << 48; - case 14: - state._k2 ^= (long) ((byte) (savedK2 >> 40)) << 40; - case 13: - state._k2 ^= (long) ((byte) (savedK2 >> 32)) << 32; - case 12: - state._k2 ^= (long) ((byte) (savedK2 >> 24)) << 24; - case 11: - state._k2 ^= (long) ((byte) (savedK2 >> 16)) << 16; - case 10: - state._k2 ^= (long) ((byte) (savedK2 >> 8)) << 8; - case 9: - state._k2 ^= ((byte) savedK2); - case 8: - state._k1 ^= (long) ((byte) (savedK1 >> 56)) << 56; - case 7: - state._k1 ^= (long) ((byte) (savedK1 >> 48)) << 48; - case 6: - state._k1 ^= (long) ((byte) (savedK1 >> 40)) << 40; - case 5: - state._k1 ^= (long) ((byte) (savedK1 >> 32)) << 32; - case 4: - state._k1 ^= (long) ((byte) (savedK1 >> 24)) << 24; - case 3: - state._k1 ^= (long) ((byte) (savedK1 >> 16)) << 16; - case 2: - state._k1 ^= (long) ((byte) (savedK1 >> 8)) << 8; - case 1: - state._k1 ^= ((byte) savedK1); - bmix(state); - } - // CHECKSTYLE:ON - - state._h2 ^= byteLen; - - state._h1 += state._h2; - state._h2 += state._h1; - - state._h1 = fmix(state._h1); - state._h2 = fmix(state._h2); - - state._h1 += state._h2; - state._h2 += state._h1; - - return (int) (state._h1 >> 32); - } - - private static void addByte(State state, byte b, int len) { - int shift = (len & 0x7) * 8; - long bb = (b & 0xffL) << shift; - if ((len & 0x8) == 0) { - state._k1 |= bb; - } else { - state._k2 |= bb; - if ((len & 0xf) == 0xf) { - bmix(state); - state._k1 = 0; - state._k2 = 0; - } - } - } - - private static class State { - long _h1; - long _h2; - - long _k1; - long _k2; - - long _c1; - long _c2; - } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java index a43a49f789..8251450c44 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java @@ -18,8 +18,8 @@ */ package org.apache.pinot.segment.spi.partition; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.pinot.spi.utils.hash.MurmurHashFunctions; import static java.nio.charset.StandardCharsets.UTF_8; @@ -42,7 +42,7 @@ public class MurmurPartitionFunction implements PartitionFunction { @Override public int getPartition(String value) { - return (murmur2(value.getBytes(UTF_8)) & Integer.MAX_VALUE) % _numPartitions; + return (MurmurHashFunctions.murmurHash2(value.getBytes(UTF_8)) & Integer.MAX_VALUE) % _numPartitions; } @Override @@ -60,56 +60,4 @@ public class MurmurPartitionFunction implements PartitionFunction { public String toString() { return NAME; } - - /** - * NOTE: This code has been copied over from org.apache.kafka.common.utils.Utils::murmur2 - * - * Generates 32 bit murmur2 hash from byte array - * @param data byte array to hash - * @return 32 bit hash of the given array - */ - @VisibleForTesting - static int murmur2(final byte[] data) { - int length = data.length; - int seed = 0x9747b28c; - // 'm' and 'r' are mixing constants generated offline. - // They're not really 'magic', they just happen to work well. - final int m = 0x5bd1e995; - final int r = 24; - - // Initialize the hash to a random value - int h = seed ^ length; - int length4 = length / 4; - - for (int i = 0; i < length4; i++) { - final int i4 = i * 4; - int k = - (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) - << 24); - k *= m; - k ^= k >>> r; - k *= m; - h *= m; - h ^= k; - } - - // Handle the last few bytes of the input array - // CHECKSTYLE:OFF - switch (length % 4) { - case 3: - h ^= (data[(length & ~3) + 2] & 0xff) << 16; - case 2: - h ^= (data[(length & ~3) + 1] & 0xff) << 8; - case 1: - h ^= data[length & ~3] & 0xff; - h *= m; - } - // CHECKSTYLE:ON - - h ^= h >>> 13; - h *= m; - h ^= h >>> 15; - - return h; - } } diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java index a80f65d054..041920894b 100644 --- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.hash.MurmurHashFunctions; import org.testng.annotations.Test; import static java.nio.charset.StandardCharsets.UTF_8; @@ -383,15 +384,15 @@ public class PartitionFunctionTest { } /** - * Tests the equivalence of org.apache.kafka.common.utils.Utils::murmur2 and + * Tests the equivalence of org.apache.kafka.common.utils.Utils::murmurHash2 and * {@link MurmurPartitionFunction#getPartition} - * Our implementation of murmur2 has been copied over from Utils::murmur2 + * Our implementation of murmurHash2 has been copied over from Utils::murmurHash2 */ @Test public void testMurmurEquivalence() { // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100 - // Applied org.apache.kafka.common.utils.Utils::murmur2 to those values and stored in expectedMurmurValues + // Applied org.apache.kafka.common.utils.Utils::murmurHash2 to those values and stored in expectedMurmurValues int[] expectedMurmurValues = new int[]{ -1044832774, -594851693, 1441878663, 1766739604, 1034724141, -296671913, 443511156, 1483601453, 1819695080, -931669296 @@ -401,12 +402,12 @@ public class PartitionFunctionTest { Random random = new Random(seed); // Generate the same values as above - 10 random values of size 7, using {@link Random::nextBytes} with seed 100 - // Apply {@link MurmurPartitionFunction::murmur2 + // Apply {@link MurmurPartitionFunction::murmurHash2 // compare with stored results byte[] bytes = new byte[7]; for (int expectedMurmurValue : expectedMurmurValues) { random.nextBytes(bytes); - assertEquals(MurmurPartitionFunction.murmur2(bytes), expectedMurmurValue); + assertEquals(MurmurHashFunctions.murmurHash2(bytes), expectedMurmurValue); } } @@ -419,8 +420,8 @@ public class PartitionFunctionTest { // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100 // Applied {@link MurmurPartitionFunction} initialized with 5 partitions, by overriding - // {@MurmurPartitionFunction::murmur2} with org - // .apache.kafka.common.utils.Utils::murmur2 + // {@MurmurPartitionFunction::murmurHash2} with org + // .apache.kafka.common.utils.Utils::murmurHash2 // stored the results in expectedPartitions int[] expectedPartitions = new int[]{1, 4, 4, 1, 1, 2, 0, 4, 2, 3}; @@ -549,8 +550,8 @@ public class PartitionFunctionTest { for (int expectedHashValue : expectedHashValues) { random.nextBytes(bytes); String nextString = new String(bytes, UTF_8); - int actualHashValue = useX64 ? Murmur3PartitionFunction.murmur3Hash32BitsX64(nextString, hashSeed) - : Murmur3PartitionFunction.murmur3Hash32BitsX86(bytes, hashSeed); + int actualHashValue = useX64 ? MurmurHashFunctions.murmurHash3X64Bit32(nextString, hashSeed) + : MurmurHashFunctions.murmurHash3X86Bit32(bytes, hashSeed); assertEquals(actualHashValue, expectedHashValue); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/hash/MurmurHashFunctions.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/hash/MurmurHashFunctions.java new file mode 100644 index 0000000000..276eb0f46e --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/hash/MurmurHashFunctions.java @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.utils.hash; + +import com.google.common.hash.Hashing; +import java.nio.ByteBuffer; + + +public class MurmurHashFunctions { + public static final byte INVALID_CHAR = (byte) '?'; + + private MurmurHashFunctions() { + } + + /** + * NOTE: This code has been copied over from org.apache.kafka.common.utils.Utils::murmurHash2 + * + * Generates 32 bit murmurHash2 hash from byte array + * @param data byte array to hash + * @return 32 bit hash of the given array + */ + public static int murmurHash2(final byte[] data) { + int length = data.length; + int seed = 0x9747b28c; + // 'm' and 'r' are mixing constants generated offline. + // They're not really 'magic', they just happen to work well. + final int m = 0x5bd1e995; + final int r = 24; + + // Initialize the hash to a random value + int h = seed ^ length; + int length4 = length / 4; + + for (int i = 0; i < length4; i++) { + final int i4 = i * 4; + int k = + (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) + << 24); + k *= m; + k ^= k >>> r; + k *= m; + h *= m; + h ^= k; + } + + // Handle the last few bytes of the input array + // CHECKSTYLE:OFF + switch (length % 4) { + case 3: + h ^= (data[(length & ~3) + 2] & 0xff) << 16; + case 2: + h ^= (data[(length & ~3) + 1] & 0xff) << 8; + case 1: + h ^= data[length & ~3] & 0xff; + h *= m; + } + // CHECKSTYLE:ON + + h ^= h >>> 13; + h *= m; + h ^= h >>> 15; + + return h; + } + + /** + * Implement 64-bit Murmur2 hash. + * @param data byte array to hash + * @return 64-bit hash + */ + public static long murmurHash2Bit64(final byte[] data) { + return murmurHash2Bit64(data, data.length, 0xe17a1465); + } + + /** + * Implement 64-bit Murmur2 hash. + * @param data byte array to hash + * @param length byte array length + * @param seed hash seed + * @return 64-bit hash + */ + public static long murmurHash2Bit64(final byte[] data, int length, int seed) { + final long m = 0xc6a4a7935bd1e995L; + final int r = 47; + + long h = (seed & 0xffffffffL) ^ (length * m); + + int length8 = length / 8; + + for (int i = 0; i < length8; i++) { + final int i8 = i * 8; + long k = + ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) + (((long) data[i8 + 2] & 0xff) << 16) + ( + ((long) data[i8 + 3] & 0xff) << 24) + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) + << 40) + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56); + + k *= m; + k ^= k >>> r; + k *= m; + + h ^= k; + h *= m; + } + + // CHECKSTYLE:OFF: checkstyle:coding + switch (length % 8) { + case 7: + h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48; + case 6: + h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40; + case 5: + h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32; + case 4: + h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24; + case 3: + h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16; + case 2: + h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8; + case 1: + h ^= data[length & ~7] & 0xff; + h *= m; + } + // CHECKSTYLE:ON: checkstyle:coding + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + return h; + } + + public static int murmurHash3X86Bit32(byte[] data, int seed) { + return Hashing.murmur3_32_fixed(seed).hashBytes(data).asInt(); + } + + /** + * Taken from <a href= + * "https://github.com/infinispan/infinispan/blob/main/commons/all/src/main/java/org/infinispan/commons/hash + * /MurmurHash3.java" + * >Infinispan code base</a>. + * + * MurmurHash3 implementation in Java, based on Austin Appleby's <a href= + * "https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp" + * >original in C</a>s + * + * This is an implementation of MurmurHash3 to generate 32 bit hash for x64 architecture (not part of the original + * Murmur3 implementations) used by Infinispan and Debezium, Removed the parts that we don't need and formatted + * the code to Apache Pinot's Checkstyle. + * + * @author Patrick McFarland + * @see <a href="http://sites.google.com/site/murmurhash/">MurmurHash website</a> + * @see <a href="http://en.wikipedia.org/wiki/MurmurHash">MurmurHash entry on Wikipedia</a> + */ + public static int murmurHash3X64Bit32(String s, int seed) { + State state = new State(); + + state._h1 = 0x9368e53c2f6af274L ^ seed; + state._h2 = 0x586dcd208f7cd3fdL ^ seed; + + state._c1 = 0x87c37b91114253d5L; + state._c2 = 0x4cf5ad432745937fL; + + int byteLen = 0; + int stringLen = s.length(); + + // CHECKSTYLE:OFF: checkstyle:coding + for (int i = 0; i < stringLen; i++) { + char c1 = s.charAt(i); + int cp; + if (!Character.isSurrogate(c1)) { + cp = c1; + } else if (Character.isHighSurrogate(c1)) { + if (i + 1 < stringLen) { + char c2 = s.charAt(i + 1); + if (Character.isLowSurrogate(c2)) { + i++; + cp = Character.toCodePoint(c1, c2); + } else { + cp = INVALID_CHAR; + } + } else { + cp = INVALID_CHAR; + } + } else { + cp = INVALID_CHAR; + } + + if (cp <= 0x7f) { + addByte(state, (byte) cp, byteLen++); + } else if (cp <= 0x07ff) { + byte b1 = (byte) (0xc0 | (0x1f & (cp >> 6))); + byte b2 = (byte) (0x80 | (0x3f & cp)); + addByte(state, b1, byteLen++); + addByte(state, b2, byteLen++); + } else if (cp <= 0xffff) { + byte b1 = (byte) (0xe0 | (0x0f & (cp >> 12))); + byte b2 = (byte) (0x80 | (0x3f & (cp >> 6))); + byte b3 = (byte) (0x80 | (0x3f & cp)); + addByte(state, b1, byteLen++); + addByte(state, b2, byteLen++); + addByte(state, b3, byteLen++); + } else { + byte b1 = (byte) (0xf0 | (0x07 & (cp >> 18))); + byte b2 = (byte) (0x80 | (0x3f & (cp >> 12))); + byte b3 = (byte) (0x80 | (0x3f & (cp >> 6))); + byte b4 = (byte) (0x80 | (0x3f & cp)); + addByte(state, b1, byteLen++); + addByte(state, b2, byteLen++); + addByte(state, b3, byteLen++); + addByte(state, b4, byteLen++); + } + } + + long savedK1 = state._k1; + long savedK2 = state._k2; + state._k1 = 0; + state._k2 = 0; + switch (byteLen & 15) { + case 15: + state._k2 ^= (long) ((byte) (savedK2 >> 48)) << 48; + case 14: + state._k2 ^= (long) ((byte) (savedK2 >> 40)) << 40; + case 13: + state._k2 ^= (long) ((byte) (savedK2 >> 32)) << 32; + case 12: + state._k2 ^= (long) ((byte) (savedK2 >> 24)) << 24; + case 11: + state._k2 ^= (long) ((byte) (savedK2 >> 16)) << 16; + case 10: + state._k2 ^= (long) ((byte) (savedK2 >> 8)) << 8; + case 9: + state._k2 ^= ((byte) savedK2); + case 8: + state._k1 ^= (long) ((byte) (savedK1 >> 56)) << 56; + case 7: + state._k1 ^= (long) ((byte) (savedK1 >> 48)) << 48; + case 6: + state._k1 ^= (long) ((byte) (savedK1 >> 40)) << 40; + case 5: + state._k1 ^= (long) ((byte) (savedK1 >> 32)) << 32; + case 4: + state._k1 ^= (long) ((byte) (savedK1 >> 24)) << 24; + case 3: + state._k1 ^= (long) ((byte) (savedK1 >> 16)) << 16; + case 2: + state._k1 ^= (long) ((byte) (savedK1 >> 8)) << 8; + case 1: + state._k1 ^= ((byte) savedK1); + bmix(state); + } + // CHECKSTYLE:ON: checkstyle:coding + state._h2 ^= byteLen; + + state._h1 += state._h2; + state._h2 += state._h1; + + state._h1 = fmix(state._h1); + state._h2 = fmix(state._h2); + + state._h1 += state._h2; + state._h2 += state._h1; + + return (int) (state._h1 >> 32); + } + + /** + * Hash a value using the x64 128 bit variant of MurmurHash3 + * + * @param key value to hash + * @param seed random value + * @return 128 bit hashed key, in an array containing two longs + */ + public static byte[] murmurHash3X64Bit128(final byte[] key, final int seed) { + State state = new State(); + + state._h1 = 0x9368e53c2f6af274L ^ seed; + state._h2 = 0x586dcd208f7cd3fdL ^ seed; + + state._c1 = 0x87c37b91114253d5L; + state._c2 = 0x4cf5ad432745937fL; + + for (int i = 0; i < key.length / 16; i++) { + state._k1 = getblock(key, i * 2 * 8); + state._k2 = getblock(key, (i * 2 + 1) * 8); + + bmix(state); + } + + state._k1 = 0; + state._k2 = 0; + + int tail = (key.length >>> 4) << 4; + // CHECKSTYLE:OFF: checkstyle:coding + switch (key.length & 15) { + case 15: + state._k2 ^= (long) key[tail + 14] << 48; + case 14: + state._k2 ^= (long) key[tail + 13] << 40; + case 13: + state._k2 ^= (long) key[tail + 12] << 32; + case 12: + state._k2 ^= (long) key[tail + 11] << 24; + case 11: + state._k2 ^= (long) key[tail + 10] << 16; + case 10: + state._k2 ^= (long) key[tail + 9] << 8; + case 9: + state._k2 ^= key[tail + 8]; + case 8: + state._k1 ^= (long) key[tail + 7] << 56; + case 7: + state._k1 ^= (long) key[tail + 6] << 48; + case 6: + state._k1 ^= (long) key[tail + 5] << 40; + case 5: + state._k1 ^= (long) key[tail + 4] << 32; + case 4: + state._k1 ^= (long) key[tail + 3] << 24; + case 3: + state._k1 ^= (long) key[tail + 2] << 16; + case 2: + state._k1 ^= (long) key[tail + 1] << 8; + case 1: + state._k1 ^= key[tail + 0]; + bmix(state); + } + // CHECKSTYLE:ON: checkstyle:coding + + state._h2 ^= key.length; + + state._h1 += state._h2; + state._h2 += state._h1; + + state._h1 = fmix(state._h1); + state._h2 = fmix(state._h2); + + state._h1 += state._h2; + state._h2 += state._h1; + + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.putLong(state._h1); + buffer.putLong(state._h2); + return buffer.array(); + } + + /** + * Hash a value using the x64 64 bit variant of murmurHash3 + * + * @param key value to hash + * @param seed random value + * @return 64 bit hashed key + */ + public static long murmurHash3X64Bit64(final byte[] key, final int seed) { + // Exactly the same as murmurHash3X64Bit128, except it only returns state.h1 + State state = new State(); + + state._h1 = 0x9368e53c2f6af274L ^ seed; + state._h2 = 0x586dcd208f7cd3fdL ^ seed; + + state._c1 = 0x87c37b91114253d5L; + state._c2 = 0x4cf5ad432745937fL; + + for (int i = 0; i < key.length / 16; i++) { + state._k1 = getblock(key, i * 2 * 8); + state._k2 = getblock(key, (i * 2 + 1) * 8); + + bmix(state); + } + + state._k1 = 0; + state._k2 = 0; + + int tail = (key.length >>> 4) << 4; + // CHECKSTYLE:OFF: checkstyle:coding + switch (key.length & 15) { + case 15: + state._k2 ^= (long) key[tail + 14] << 48; + case 14: + state._k2 ^= (long) key[tail + 13] << 40; + case 13: + state._k2 ^= (long) key[tail + 12] << 32; + case 12: + state._k2 ^= (long) key[tail + 11] << 24; + case 11: + state._k2 ^= (long) key[tail + 10] << 16; + case 10: + state._k2 ^= (long) key[tail + 9] << 8; + case 9: + state._k2 ^= key[tail + 8]; + case 8: + state._k1 ^= (long) key[tail + 7] << 56; + case 7: + state._k1 ^= (long) key[tail + 6] << 48; + case 6: + state._k1 ^= (long) key[tail + 5] << 40; + case 5: + state._k1 ^= (long) key[tail + 4] << 32; + case 4: + state._k1 ^= (long) key[tail + 3] << 24; + case 3: + state._k1 ^= (long) key[tail + 2] << 16; + case 2: + state._k1 ^= (long) key[tail + 1] << 8; + case 1: + state._k1 ^= key[tail]; + bmix(state); + } + // CHECKSTYLE:ON: checkstyle:coding + + state._h2 ^= key.length; + + state._h1 += state._h2; + state._h2 += state._h1; + + state._h1 = fmix(state._h1); + state._h2 = fmix(state._h2); + + state._h1 += state._h2; + state._h2 += state._h1; + + return state._h1; + } + + /** + * Hash a value using the x64 32 bit variant of murmurHash3 + * + * @param key value to hash + * @param seed random value + * @return 32 bit hashed key + */ + public static int murmurHash3X64Bit32(final byte[] key, final int seed) { + return (int) (murmurHash3X64Bit64(key, seed) >>> 32); + } + + private static void addByte(State state, byte b, int len) { + int shift = (len & 0x7) * 8; + long bb = (b & 0xffL) << shift; + if ((len & 0x8) == 0) { + state._k1 |= bb; + } else { + state._k2 |= bb; + if ((len & 0xf) == 0xf) { + bmix(state); + state._k1 = 0; + state._k2 = 0; + } + } + } + + static long getblock(byte[] key, int i) { + return ((key[i + 0] & 0x00000000000000FFL)) | ((key[i + 1] & 0x00000000000000FFL) << 8) | ( + (key[i + 2] & 0x00000000000000FFL) << 16) | ((key[i + 3] & 0x00000000000000FFL) << 24) | ( + (key[i + 4] & 0x00000000000000FFL) << 32) | ((key[i + 5] & 0x00000000000000FFL) << 40) | ( + (key[i + 6] & 0x00000000000000FFL) << 48) | ((key[i + 7] & 0x00000000000000FFL) << 56); + } + + private static void bmix(State state) { + state._k1 *= state._c1; + state._k1 = (state._k1 << 23) | (state._k1 >>> 64 - 23); + state._k1 *= state._c2; + state._h1 ^= state._k1; + state._h1 += state._h2; + + state._h2 = (state._h2 << 41) | (state._h2 >>> 64 - 41); + + state._k2 *= state._c2; + state._k2 = (state._k2 << 23) | (state._k2 >>> 64 - 23); + state._k2 *= state._c1; + state._h2 ^= state._k2; + state._h2 += state._h1; + + state._h1 = state._h1 * 3 + 0x52dce729; + state._h2 = state._h2 * 3 + 0x38495ab5; + + state._c1 = state._c1 * 5 + 0x7b7d159c; + state._c2 = state._c2 * 5 + 0x6bce6396; + } + + private static long fmix(long k) { + k ^= k >>> 33; + k *= 0xff51afd7ed558ccdL; + k ^= k >>> 33; + k *= 0xc4ceb9fe1a85ec53L; + k ^= k >>> 33; + + return k; + } + + private static class State { + long _h1; + long _h2; + + long _k1; + long _k2; + + long _c1; + long _c2; + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java index 114a6a4fa6..91e35b1ece 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java @@ -31,10 +31,10 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; -import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.plugin.inputformat.avro.AvroUtils; import org.apache.pinot.spi.stream.StreamDataProducer; import org.apache.pinot.spi.stream.StreamDataProvider; +import org.apache.pinot.spi.utils.hash.MurmurHashFunctions; import org.apache.pinot.tools.Command; import org.apache.pinot.tools.utils.KafkaStarterUtils; import org.slf4j.Logger; @@ -134,7 +134,7 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme break; } // Write the message to Kafka - streamDataProducer.produce(_kafkaTopic, Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes); + streamDataProducer.produce(_kafkaTopic, Longs.toByteArray(MurmurHashFunctions.murmurHash2Bit64(bytes)), bytes); // Sleep between messages if (sleepRequired) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org