This is an automated email from the ASF dual-hosted git repository. richardstartin pushed a commit to branch pushdown-comparison in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 4aac923bcd8acf9acdca35865c580fee1edde640 Author: Richard Startin <richardstar...@apache.org> AuthorDate: Fri Dec 30 20:05:38 2022 +0000 push unsigned byte comparison down into ValueReader --- .../local/io/util/FixedByteValueReaderWriter.java | 6 + .../pinot/segment/local/io/util/ValueReader.java | 53 +++++ .../local/io/util/VarLengthValueReader.java | 9 + .../index/readers/BaseImmutableDictionary.java | 11 +- .../readerwriter/ValueReaderComparisonTest.java | 220 +++++++++++++++++++++ 5 files changed, 293 insertions(+), 6 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedByteValueReaderWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedByteValueReaderWriter.java index 110cf21276..51f692192f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedByteValueReaderWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedByteValueReaderWriter.java @@ -122,6 +122,12 @@ public final class FixedByteValueReaderWriter implements ValueReader { return value; } + @Override + public int compareUnsignedBytes(int index, int numBytesPerValue, byte[] bytes) { + long startOffset = (long) index * numBytesPerValue; + return ValueReader.compareUnsignedBytes(_dataBuffer, startOffset, numBytesPerValue, true, bytes); + } + public void writeInt(int index, int value) { _dataBuffer.putInt((long) index * Integer.BYTES, value); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java index 1dbe9f2984..99cba6ea2b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java @@ -20,6 +20,9 @@ package org.apache.pinot.segment.local.io.util; import java.io.Closeable; import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.utils.BigDecimalUtils; @@ -65,4 +68,54 @@ public interface ValueReader extends Closeable { * NOTE: Do not reuse buffer for BYTES because the return value can have variable length. */ byte[] getBytes(int index, int numBytesPerValue); + + /** + * Assumes a zero padding byte, caller responsible for checking before use. + */ + int compareUnsignedBytes(int index, int numBytesPerValue, byte[] bytes); + + static int compareUnsignedBytes(PinotDataBuffer dataBuffer, long startOffset, int length, boolean padded, + byte[] bytes) { + // can use MethodHandles.byteArrayViewVarHandle here after dropping JDK8 + ByteBuffer buffer = ByteBuffer.wrap(bytes); + boolean littleEndian = dataBuffer.order() == ByteOrder.LITTLE_ENDIAN; + if (littleEndian) { + buffer.order(ByteOrder.LITTLE_ENDIAN); + } + int limit = Math.min(length, bytes.length); + int loopBound = limit & ~0x3F; + int mismatchPosition = -1; + int i = 0; + for (; i < loopBound && mismatchPosition == -1; i += 8) { + long ours = dataBuffer.getLong(startOffset + i); + long theirs = buffer.getLong(i); + if (ours != theirs) { + long difference = ours ^ theirs; + mismatchPosition = i + ((littleEndian + ? Long.numberOfTrailingZeros(difference) + : Long.numberOfLeadingZeros(difference)) >>> 3); + } + } + if (mismatchPosition == -1) { + for (; i < limit && mismatchPosition == -1; i++) { + byte ours = dataBuffer.getByte(startOffset + i); + byte theirs = buffer.get(i); + if (ours != theirs) { + mismatchPosition = i; + } + } + } + if (mismatchPosition == -1) { + // assumes padding byte is zero + if (padded) { + if (i == bytes.length && i < length) { + return dataBuffer.getByte(startOffset + i) == 0 ? 0 : 1; + } + return 0; + } else { + return length - bytes.length; + } + } + return Byte.compareUnsigned(dataBuffer.getByte(startOffset + mismatchPosition), buffer.get(mismatchPosition)); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java index cb081d9a5f..16f6cbceb3 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java @@ -119,6 +119,15 @@ public class VarLengthValueReader implements ValueReader { return value; } + @Override + public int compareUnsignedBytes(int index, int numBytesPerValue, byte[] bytes) { + int offsetPosition = _dataSectionStartOffSet + Integer.BYTES * index; + int startOffset = _dataBuffer.getInt(offsetPosition); + int endOffset = _dataBuffer.getInt(offsetPosition + Integer.BYTES); + int length = endOffset - startOffset; + return ValueReader.compareUnsignedBytes(_dataBuffer, startOffset, length, false, bytes); + } + @Override public void close() { // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java index 1dbbc994fc..de0c8ad4c2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java @@ -28,7 +28,6 @@ import org.apache.pinot.segment.local.io.util.ValueReader; import org.apache.pinot.segment.local.io.util.VarLengthValueReader; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; -import org.apache.pinot.spi.utils.ByteArray; import static java.nio.charset.StandardCharsets.UTF_8; @@ -219,14 +218,14 @@ public abstract class BaseImmutableDictionary implements Dictionary { * TODO: Clean up the segments with legacy non-zero padding byte, and remove the support for non-zero padding byte */ protected int binarySearch(String value) { - byte[] buffer = getBuffer(); int low = 0; int high = _length - 1; if (_paddingByte == 0) { + byte[] utf8 = value.getBytes(UTF_8); while (low <= high) { int mid = (low + high) >>> 1; - String midValue = _valueReader.getUnpaddedString(mid, _numBytesPerValue, _paddingByte, buffer); - int compareResult = midValue.compareTo(value); + // method requires zero padding byte, not safe to call in nonzero padding byte branch below + int compareResult = _valueReader.compareUnsignedBytes(mid, _numBytesPerValue, utf8); if (compareResult < 0) { low = mid + 1; } else if (compareResult > 0) { @@ -236,6 +235,7 @@ public abstract class BaseImmutableDictionary implements Dictionary { } } } else { + byte[] buffer = getBuffer(); String paddedValue = padString(value); while (low <= high) { int mid = (low + high) >>> 1; @@ -259,8 +259,7 @@ public abstract class BaseImmutableDictionary implements Dictionary { while (low <= high) { int mid = (low + high) >>> 1; - byte[] midValue = _valueReader.getBytes(mid, _numBytesPerValue); - int compareResult = ByteArray.compare(midValue, value); + int compareResult = _valueReader.compareUnsignedBytes(mid, _numBytesPerValue, value); if (compareResult < 0) { low = mid + 1; } else if (compareResult > 0) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readerwriter/ValueReaderComparisonTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readerwriter/ValueReaderComparisonTest.java new file mode 100644 index 0000000000..c7d99208e2 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readerwriter/ValueReaderComparisonTest.java @@ -0,0 +1,220 @@ +package org.apache.pinot.segment.local.segment.index.readerwriter; + +import com.google.common.primitives.Ints; +import java.io.IOException; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.segment.local.io.util.FixedByteValueReaderWriter; +import org.apache.pinot.segment.local.io.util.ValueReader; +import org.apache.pinot.segment.local.io.util.VarLengthValueReader; +import org.apache.pinot.segment.local.io.util.VarLengthValueWriter; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.utils.BytesUtils; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class ValueReaderComparisonTest { + @DataProvider + public static Object[] params() { + return Stream.of( + Pair.of(ByteOrder.BIG_ENDIAN, true), + Pair.of(ByteOrder.LITTLE_ENDIAN, true), + // there is no little endian support for var length at the time of writing + Pair.of(ByteOrder.BIG_ENDIAN, false) + ).flatMap(pair -> Stream.of( + new AsciiTestCase(pair.getLeft(), pair.getRight()), + new HexTestCase(pair.getLeft(), pair.getRight()), + new Utf8TestCase(pair.getLeft(), pair.getRight()) + )).toArray(Object[]::new); + } + + static abstract class TestCase { + + private final ByteOrder _byteOrder; + private final boolean _fixed; + + public TestCase(ByteOrder byteOrder, boolean fixed) { + _byteOrder = byteOrder; + _fixed = fixed; + } + + ByteOrder getByteOrder() { + return _byteOrder; + } + + boolean isFixed() { + return _fixed; + } + + abstract String[] generateStrings(); + + void testEqual(ValueReader reader, int numBytesPerValue, String... strings) { + for (int i = 0; i < strings.length; i++) { + assertComparisonCorrect(reader, i, numBytesPerValue, strings[i], 0); + } + } + + void testCommonPrefixStoredLonger(ValueReader reader, int numBytesPerValue, String... strings) { + for (int i = 0; i < strings.length; i++) { + assertComparisonCorrect(reader, i, numBytesPerValue, + strings[i].substring(0, strings[i].length() - 1), 1); + } + } + + void testCommonPrefixParameterLonger(ValueReader reader, int numBytesPerValue, String... strings) { + for (int i = 0; i < strings.length; i++) { + assertComparisonCorrect(reader, i, numBytesPerValue, strings[i] + "a", -1); + } + } + + void testInferiorPrefixes(ValueReader reader, int numBytesPerValue, String... strings) { + for (int i = 0; i < strings.length; i++) { + char[] chars = strings[i].toCharArray(); + for (int j = 0; j < strings[i].length(); j++) { + chars[j]--; + assertComparisonCorrect(reader, i, numBytesPerValue, new String(chars), 1); + chars[j]++; // fix the string for the next iteration + } + } + } + + void testSuperiorPrefixes(ValueReader reader, int numBytesPerValue, String... strings) { + for (int i = 0; i < strings.length; i++) { + char[] chars = strings[i].toCharArray(); + for (int j = 0; j < strings[i].length(); j++) { + chars[j]++; + assertComparisonCorrect(reader, i, numBytesPerValue, new String(chars), -1); + chars[j]--; // fix the string for the next iteration + } + } + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } + } + + static class AsciiTestCase extends TestCase { + + public AsciiTestCase(ByteOrder byteOrder, boolean fixed) { + super(byteOrder, fixed); + } + + @Override + public String[] generateStrings() { + // generate [a, ab, abc, ...] + return IntStream.range(1, 26).mapToObj(size -> { + char[] chars = new char[size]; + for (int i = 0; i < size; i++) { + chars[i] = (char) ('a' + i); + } + return new String(chars); + }).toArray(String[]::new); + } + } + + static class HexTestCase extends TestCase { + + public HexTestCase(ByteOrder byteOrder, boolean fixed) { + super(byteOrder, fixed); + } + + @Override + public String[] generateStrings() { + return IntStream.range(0, 1000) + .map(i -> i * 10000) + .mapToObj(i -> BytesUtils.toHexString(Ints.toByteArray(i))) + .toArray(String[]::new); + } + } + + static class Utf8TestCase extends TestCase { + public Utf8TestCase(ByteOrder byteOrder, boolean fixed) { + super(byteOrder, fixed); + } + + @Override + public String[] generateStrings() { + String[] symbols = new String[] {"a", "ß", "道", "\uD841\uDF0E"}; + return IntStream.range(1, 100) + .mapToObj(size -> { + StringBuilder sb = new StringBuilder(); + int offset = size % symbols.length; + for (int i = 0; i < size; i++) { + sb.append(symbols[(i + offset) % symbols.length]); + } + return sb.toString(); + }) + .toArray(String[]::new); + } + } + + @Test(dataProvider = "params") + public void testValueReaderComparison(TestCase testCase) + throws IOException { + String[] strings = testCase.generateStrings(); + int maxUtf8Length = Integer.MIN_VALUE; + for (String string : strings) { + maxUtf8Length = Math.max(string.getBytes(StandardCharsets.UTF_8).length, maxUtf8Length); + } + // round up to next power of 2 + int numBytesPerValue = 1 << (32 - Integer.numberOfLeadingZeros(maxUtf8Length)); + Pair<ValueReader, PinotDataBuffer> readerAndBuffer = prepare(testCase.isFixed(), numBytesPerValue, + testCase.getByteOrder(), strings); + ValueReader reader = readerAndBuffer.getKey(); + try { + testCase.testEqual(reader, numBytesPerValue, strings); + testCase.testCommonPrefixStoredLonger(reader, numBytesPerValue, strings); + testCase.testCommonPrefixParameterLonger(reader, numBytesPerValue, strings); + testCase.testInferiorPrefixes(reader, numBytesPerValue, strings); + testCase.testSuperiorPrefixes(reader, numBytesPerValue, strings); + } finally { + readerAndBuffer.getValue().close(); + } + } + + private static void assertComparisonCorrect(ValueReader readerWriter, int index, int numBytesPerValue, + String string, int signum) { + byte[] value = string.getBytes(StandardCharsets.UTF_8); + int comparison = readerWriter.compareUnsignedBytes(index, numBytesPerValue, value); + assertEquals(signum, Integer.compare(comparison, 0), index + ", " + string); + int comparisonViaMaterialization = readerWriter.getUnpaddedString(index, numBytesPerValue, (byte) 0, + new byte[numBytesPerValue]).compareTo(string); + assertEquals(signum, Integer.compare(comparisonViaMaterialization, 0)); + } + + private static Pair<ValueReader, PinotDataBuffer> prepare(boolean fixed, int numBytesPerValue, ByteOrder byteOrder, + String... storedValues) throws IOException { + if (fixed) { + PinotDataBuffer buffer = PinotDataBuffer.allocateDirect(1000L + + (long) storedValues.length * numBytesPerValue, byteOrder, + "ValueReaderComparisonTest"); + FixedByteValueReaderWriter readerWriter = new FixedByteValueReaderWriter(buffer); + for (int i = 0; i < storedValues.length; i++) { + readerWriter.writeBytes(i, numBytesPerValue, storedValues[i].getBytes(StandardCharsets.UTF_8)); + } + return Pair.of(readerWriter, buffer); + } else { + assert byteOrder == ByteOrder.BIG_ENDIAN : "little endian unsupported by VarLengthValueWriter"; + Path file = Files.createTempFile(ValueReaderComparisonTest.class.getName() + + "-" + UUID.randomUUID(), ".tmp"); + VarLengthValueWriter writer = new VarLengthValueWriter(file.toFile(), storedValues.length); + for (String storedValue : storedValues) { + writer.add(storedValue.getBytes(StandardCharsets.UTF_8)); + } + writer.close(); + PinotDataBuffer buffer = PinotDataBuffer.mapFile(file.toFile(), true, 0, Files.size(file), byteOrder, + "ValueReaderComparisonTest"); + return Pair.of(new VarLengthValueReader(buffer), buffer); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org