This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch pushdown-comparison
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 4aac923bcd8acf9acdca35865c580fee1edde640
Author: Richard Startin <richardstar...@apache.org>
AuthorDate: Fri Dec 30 20:05:38 2022 +0000

    push unsigned byte comparison down into ValueReader
---
 .../local/io/util/FixedByteValueReaderWriter.java  |   6 +
 .../pinot/segment/local/io/util/ValueReader.java   |  53 +++++
 .../local/io/util/VarLengthValueReader.java        |   9 +
 .../index/readers/BaseImmutableDictionary.java     |  11 +-
 .../readerwriter/ValueReaderComparisonTest.java    | 220 +++++++++++++++++++++
 5 files changed, 293 insertions(+), 6 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedByteValueReaderWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedByteValueReaderWriter.java
index 110cf21276..51f692192f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedByteValueReaderWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedByteValueReaderWriter.java
@@ -122,6 +122,12 @@ public final class FixedByteValueReaderWriter implements 
ValueReader {
     return value;
   }
 
+  @Override
+  public int compareUnsignedBytes(int index, int numBytesPerValue, byte[] 
bytes) {
+    long startOffset = (long) index * numBytesPerValue;
+    return ValueReader.compareUnsignedBytes(_dataBuffer, startOffset, 
numBytesPerValue, true, bytes);
+  }
+
   public void writeInt(int index, int value) {
     _dataBuffer.putInt((long) index * Integer.BYTES, value);
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java
index 1dbe9f2984..99cba6ea2b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/ValueReader.java
@@ -20,6 +20,9 @@ package org.apache.pinot.segment.local.io.util;
 
 import java.io.Closeable;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 
 
@@ -65,4 +68,54 @@ public interface ValueReader extends Closeable {
    * NOTE: Do not reuse buffer for BYTES because the return value can have 
variable length.
    */
   byte[] getBytes(int index, int numBytesPerValue);
+
+  /**
+   * Assumes a zero padding byte, caller responsible for checking before use.
+   */
+  int compareUnsignedBytes(int index, int numBytesPerValue, byte[] bytes);
+
+  static int compareUnsignedBytes(PinotDataBuffer dataBuffer, long 
startOffset, int length, boolean padded,
+                                  byte[] bytes) {
+    // can use MethodHandles.byteArrayViewVarHandle here after dropping JDK8
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    boolean littleEndian = dataBuffer.order() == ByteOrder.LITTLE_ENDIAN;
+    if (littleEndian) {
+      buffer.order(ByteOrder.LITTLE_ENDIAN);
+    }
+    int limit = Math.min(length, bytes.length);
+    int loopBound = limit & ~0x3F;
+    int mismatchPosition = -1;
+    int i = 0;
+    for (; i < loopBound && mismatchPosition == -1; i += 8) {
+      long ours = dataBuffer.getLong(startOffset + i);
+      long theirs = buffer.getLong(i);
+      if (ours != theirs) {
+        long difference = ours ^ theirs;
+        mismatchPosition = i + ((littleEndian
+                ? Long.numberOfTrailingZeros(difference)
+                : Long.numberOfLeadingZeros(difference)) >>> 3);
+      }
+    }
+    if (mismatchPosition == -1) {
+      for (; i < limit && mismatchPosition == -1; i++) {
+        byte ours = dataBuffer.getByte(startOffset + i);
+        byte theirs = buffer.get(i);
+        if (ours != theirs) {
+          mismatchPosition = i;
+        }
+      }
+    }
+    if (mismatchPosition == -1) {
+      // assumes padding byte is zero
+      if (padded) {
+        if (i == bytes.length && i < length) {
+          return dataBuffer.getByte(startOffset + i) == 0 ? 0 : 1;
+        }
+        return 0;
+      } else {
+        return length - bytes.length;
+      }
+    }
+    return Byte.compareUnsigned(dataBuffer.getByte(startOffset + 
mismatchPosition), buffer.get(mismatchPosition));
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
index cb081d9a5f..16f6cbceb3 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/VarLengthValueReader.java
@@ -119,6 +119,15 @@ public class VarLengthValueReader implements ValueReader {
     return value;
   }
 
+  @Override
+  public int compareUnsignedBytes(int index, int numBytesPerValue, byte[] 
bytes) {
+    int offsetPosition = _dataSectionStartOffSet + Integer.BYTES * index;
+    int startOffset = _dataBuffer.getInt(offsetPosition);
+    int endOffset = _dataBuffer.getInt(offsetPosition + Integer.BYTES);
+    int length = endOffset - startOffset;
+    return ValueReader.compareUnsignedBytes(_dataBuffer, startOffset, length, 
false, bytes);
+  }
+
   @Override
   public void close() {
     // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by 
the caller and might be reused later. The
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
index 1dbbc994fc..de0c8ad4c2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BaseImmutableDictionary.java
@@ -28,7 +28,6 @@ import org.apache.pinot.segment.local.io.util.ValueReader;
 import org.apache.pinot.segment.local.io.util.VarLengthValueReader;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-import org.apache.pinot.spi.utils.ByteArray;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -219,14 +218,14 @@ public abstract class BaseImmutableDictionary implements 
Dictionary {
    * TODO: Clean up the segments with legacy non-zero padding byte, and remove 
the support for non-zero padding byte
    */
   protected int binarySearch(String value) {
-    byte[] buffer = getBuffer();
     int low = 0;
     int high = _length - 1;
     if (_paddingByte == 0) {
+      byte[] utf8 = value.getBytes(UTF_8);
       while (low <= high) {
         int mid = (low + high) >>> 1;
-        String midValue = _valueReader.getUnpaddedString(mid, 
_numBytesPerValue, _paddingByte, buffer);
-        int compareResult = midValue.compareTo(value);
+        // method requires zero padding byte, not safe to call in nonzero 
padding byte branch below
+        int compareResult = _valueReader.compareUnsignedBytes(mid, 
_numBytesPerValue, utf8);
         if (compareResult < 0) {
           low = mid + 1;
         } else if (compareResult > 0) {
@@ -236,6 +235,7 @@ public abstract class BaseImmutableDictionary implements 
Dictionary {
         }
       }
     } else {
+      byte[] buffer = getBuffer();
       String paddedValue = padString(value);
       while (low <= high) {
         int mid = (low + high) >>> 1;
@@ -259,8 +259,7 @@ public abstract class BaseImmutableDictionary implements 
Dictionary {
 
     while (low <= high) {
       int mid = (low + high) >>> 1;
-      byte[] midValue = _valueReader.getBytes(mid, _numBytesPerValue);
-      int compareResult = ByteArray.compare(midValue, value);
+      int compareResult = _valueReader.compareUnsignedBytes(mid, 
_numBytesPerValue, value);
       if (compareResult < 0) {
         low = mid + 1;
       } else if (compareResult > 0) {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readerwriter/ValueReaderComparisonTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readerwriter/ValueReaderComparisonTest.java
new file mode 100644
index 0000000000..c7d99208e2
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readerwriter/ValueReaderComparisonTest.java
@@ -0,0 +1,220 @@
+package org.apache.pinot.segment.local.segment.index.readerwriter;
+
+import com.google.common.primitives.Ints;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.UUID;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.segment.local.io.util.FixedByteValueReaderWriter;
+import org.apache.pinot.segment.local.io.util.ValueReader;
+import org.apache.pinot.segment.local.io.util.VarLengthValueReader;
+import org.apache.pinot.segment.local.io.util.VarLengthValueWriter;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class ValueReaderComparisonTest {
+    @DataProvider
+    public static Object[] params() {
+        return Stream.of(
+                Pair.of(ByteOrder.BIG_ENDIAN, true),
+                Pair.of(ByteOrder.LITTLE_ENDIAN, true),
+                // there is no little endian support for var length at the 
time of writing
+                Pair.of(ByteOrder.BIG_ENDIAN, false)
+        ).flatMap(pair -> Stream.of(
+                new AsciiTestCase(pair.getLeft(), pair.getRight()),
+                new HexTestCase(pair.getLeft(), pair.getRight()),
+                new Utf8TestCase(pair.getLeft(), pair.getRight())
+        )).toArray(Object[]::new);
+    }
+
+    static abstract class TestCase {
+
+        private final ByteOrder _byteOrder;
+        private final boolean _fixed;
+
+        public TestCase(ByteOrder byteOrder, boolean fixed) {
+            _byteOrder = byteOrder;
+            _fixed = fixed;
+        }
+
+        ByteOrder getByteOrder() {
+            return _byteOrder;
+        }
+
+        boolean isFixed() {
+            return _fixed;
+        }
+
+        abstract String[] generateStrings();
+
+        void testEqual(ValueReader reader, int numBytesPerValue, String... 
strings) {
+            for (int i = 0; i < strings.length; i++) {
+                assertComparisonCorrect(reader, i, numBytesPerValue, 
strings[i], 0);
+            }
+        }
+
+        void testCommonPrefixStoredLonger(ValueReader reader, int 
numBytesPerValue, String... strings) {
+            for (int i = 0; i < strings.length; i++) {
+                assertComparisonCorrect(reader, i, numBytesPerValue,
+                        strings[i].substring(0, strings[i].length() - 1), 1);
+            }
+        }
+
+        void testCommonPrefixParameterLonger(ValueReader reader, int 
numBytesPerValue, String... strings) {
+            for (int i = 0; i < strings.length; i++) {
+                assertComparisonCorrect(reader, i, numBytesPerValue, 
strings[i] + "a", -1);
+            }
+        }
+
+        void testInferiorPrefixes(ValueReader reader, int numBytesPerValue, 
String... strings) {
+            for (int i = 0; i < strings.length; i++) {
+                char[] chars = strings[i].toCharArray();
+                for (int j = 0; j < strings[i].length(); j++) {
+                    chars[j]--;
+                    assertComparisonCorrect(reader, i, numBytesPerValue, new 
String(chars), 1);
+                    chars[j]++; // fix the string for the next iteration
+                }
+            }
+        }
+
+        void testSuperiorPrefixes(ValueReader reader, int numBytesPerValue, 
String... strings) {
+            for (int i = 0; i < strings.length; i++) {
+                char[] chars = strings[i].toCharArray();
+                for (int j = 0; j < strings[i].length(); j++) {
+                    chars[j]++;
+                    assertComparisonCorrect(reader, i, numBytesPerValue, new 
String(chars), -1);
+                    chars[j]--; // fix the string for the next iteration
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            return getClass().getSimpleName();
+        }
+    }
+
+    static class AsciiTestCase extends TestCase {
+
+        public AsciiTestCase(ByteOrder byteOrder, boolean fixed) {
+            super(byteOrder, fixed);
+        }
+
+        @Override
+        public String[] generateStrings() {
+            // generate [a, ab, abc, ...]
+            return IntStream.range(1, 26).mapToObj(size -> {
+                char[] chars = new char[size];
+                for (int i = 0; i < size; i++) {
+                    chars[i] = (char) ('a' + i);
+                }
+                return new String(chars);
+            }).toArray(String[]::new);
+        }
+    }
+
+    static class HexTestCase extends TestCase {
+
+        public HexTestCase(ByteOrder byteOrder, boolean fixed) {
+            super(byteOrder, fixed);
+        }
+
+        @Override
+        public String[] generateStrings() {
+            return IntStream.range(0, 1000)
+                    .map(i -> i * 10000)
+                    .mapToObj(i -> BytesUtils.toHexString(Ints.toByteArray(i)))
+                    .toArray(String[]::new);
+        }
+    }
+
+    static class Utf8TestCase extends TestCase {
+        public Utf8TestCase(ByteOrder byteOrder, boolean fixed) {
+            super(byteOrder, fixed);
+        }
+
+        @Override
+        public String[] generateStrings() {
+            String[] symbols = new String[] {"a", "ß", "道", "\uD841\uDF0E"};
+            return IntStream.range(1, 100)
+                    .mapToObj(size -> {
+                        StringBuilder sb = new StringBuilder();
+                        int offset = size % symbols.length;
+                        for (int i = 0; i < size; i++) {
+                            sb.append(symbols[(i + offset) % symbols.length]);
+                        }
+                        return sb.toString();
+                    })
+                    .toArray(String[]::new);
+        }
+    }
+
+    @Test(dataProvider = "params")
+    public void testValueReaderComparison(TestCase testCase)
+            throws IOException {
+        String[] strings = testCase.generateStrings();
+        int maxUtf8Length = Integer.MIN_VALUE;
+        for (String string : strings) {
+            maxUtf8Length = 
Math.max(string.getBytes(StandardCharsets.UTF_8).length, maxUtf8Length);
+        }
+        // round up to next power of 2
+        int numBytesPerValue = 1 << (32 - 
Integer.numberOfLeadingZeros(maxUtf8Length));
+        Pair<ValueReader, PinotDataBuffer> readerAndBuffer = 
prepare(testCase.isFixed(), numBytesPerValue,
+                testCase.getByteOrder(), strings);
+        ValueReader reader = readerAndBuffer.getKey();
+        try {
+            testCase.testEqual(reader, numBytesPerValue, strings);
+            testCase.testCommonPrefixStoredLonger(reader, numBytesPerValue, 
strings);
+            testCase.testCommonPrefixParameterLonger(reader, numBytesPerValue, 
strings);
+            testCase.testInferiorPrefixes(reader, numBytesPerValue, strings);
+            testCase.testSuperiorPrefixes(reader, numBytesPerValue, strings);
+        } finally {
+          readerAndBuffer.getValue().close();
+        }
+    }
+
+    private static void assertComparisonCorrect(ValueReader readerWriter, int 
index, int numBytesPerValue,
+                                                String string, int signum) {
+        byte[] value = string.getBytes(StandardCharsets.UTF_8);
+        int comparison = readerWriter.compareUnsignedBytes(index, 
numBytesPerValue, value);
+        assertEquals(signum, Integer.compare(comparison, 0), index + ", " + 
string);
+        int comparisonViaMaterialization = 
readerWriter.getUnpaddedString(index, numBytesPerValue, (byte) 0,
+                new byte[numBytesPerValue]).compareTo(string);
+        assertEquals(signum, Integer.compare(comparisonViaMaterialization, 0));
+    }
+
+    private static Pair<ValueReader, PinotDataBuffer> prepare(boolean fixed, 
int numBytesPerValue, ByteOrder byteOrder,
+                                                              String... 
storedValues) throws IOException {
+        if (fixed) {
+            PinotDataBuffer buffer = PinotDataBuffer.allocateDirect(1000L
+                            + (long) storedValues.length * numBytesPerValue, 
byteOrder,
+                    "ValueReaderComparisonTest");
+            FixedByteValueReaderWriter readerWriter = new 
FixedByteValueReaderWriter(buffer);
+            for (int i = 0; i < storedValues.length; i++) {
+                readerWriter.writeBytes(i, numBytesPerValue, 
storedValues[i].getBytes(StandardCharsets.UTF_8));
+            }
+            return Pair.of(readerWriter, buffer);
+        } else {
+            assert byteOrder == ByteOrder.BIG_ENDIAN : "little endian 
unsupported by VarLengthValueWriter";
+            Path file = 
Files.createTempFile(ValueReaderComparisonTest.class.getName()
+                    + "-" + UUID.randomUUID(), ".tmp");
+            VarLengthValueWriter writer = new 
VarLengthValueWriter(file.toFile(), storedValues.length);
+            for (String storedValue : storedValues) {
+                writer.add(storedValue.getBytes(StandardCharsets.UTF_8));
+            }
+            writer.close();
+            PinotDataBuffer buffer = PinotDataBuffer.mapFile(file.toFile(), 
true, 0, Files.size(file), byteOrder,
+                    "ValueReaderComparisonTest");
+            return Pair.of(new VarLengthValueReader(buffer), buffer);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to