Copilot commented on code in PR #17994:
URL: https://github.com/apache/pinot/pull/17994#discussion_r2998490271


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/IvfFlatVectorIndexReader.java:
##########
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.vector;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.PriorityQueue;
+import 
org.apache.pinot.segment.local.segment.index.vector.IvfFlatVectorIndexCreator;
+import org.apache.pinot.segment.local.utils.VectorDistanceFunction;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.creator.VectorIndexConfig;
+import org.apache.pinot.segment.spi.index.reader.NprobeAware;
+import org.apache.pinot.segment.spi.index.reader.VectorIndexReader;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Reader for IVF_FLAT (Inverted File with flat vectors) index.
+ *
+ * <p>Loads the entire index into memory at construction time for fast search.
+ * The search algorithm:
+ * <ol>
+ *   <li>Computes distance from the query to all centroids.</li>
+ *   <li>Selects the {@code nprobe} closest centroids.</li>
+ *   <li>Scans all vectors in those centroids' inverted lists.</li>
+ *   <li>Returns the top-K doc IDs as a bitmap.</li>
+ * </ol>
+ *
+ * <h3>Thread safety</h3>
+ * <p>This class is thread-safe for concurrent reads. The loaded index data is 
immutable
+ * after construction. The only mutable state is {@code _nprobe}, which is 
volatile to
+ * allow query-time tuning from another thread. However, the typical pattern is
+ * single-threaded: set nprobe, then call getDocIds.</p>
+ */
+public class IvfFlatVectorIndexReader implements VectorIndexReader, 
NprobeAware {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IvfFlatVectorIndexReader.class);
+
+  /** Default nprobe value when not explicitly set. */
+  static final int DEFAULT_NPROBE = 4;
+
+  // Index data loaded from file
+  private final int _dimension;
+  private final int _numVectors;
+  private final int _nlist;
+  private final VectorIndexConfig.VectorDistanceFunction _distanceFunction;
+  private final float[][] _centroids;
+  private final int[][] _listDocIds;
+  private final float[][][] _listVectors;
+  private final String _column;
+
+  /** Number of centroids to probe during search. */
+  private volatile int _nprobe;
+
+  /**
+   * Opens and loads an IVF_FLAT index from disk.
+   *
+   * @param column    the column name
+   * @param indexDir  the segment index directory
+   * @param config    the vector index configuration
+   * @throws RuntimeException if the index file cannot be read or is corrupt
+   */
+  public IvfFlatVectorIndexReader(String column, File indexDir, 
VectorIndexConfig config) {
+    _column = column;
+
+    // Determine nprobe from config
+    int configuredNprobe = DEFAULT_NPROBE;
+    if (config.getProperties() != null && 
config.getProperties().containsKey("nprobe")) {
+      configuredNprobe = 
Integer.parseInt(config.getProperties().get("nprobe"));
+    }
+
+    File indexFile = findIvfFlatIndexFile(indexDir, column);
+    if (indexFile == null) {
+      throw new IllegalStateException(
+          "Failed to find IVF_FLAT index file for column: " + column + " in 
dir: " + indexDir);
+    }
+
+    try (DataInputStream in = new DataInputStream(new 
FileInputStream(indexFile))) {
+      // --- Header ---
+      int magic = in.readInt();
+      Preconditions.checkState(magic == IvfFlatVectorIndexCreator.MAGIC,
+          "Invalid IVF_FLAT magic: 0x%s, expected 0x%s",
+          Integer.toHexString(magic), 
Integer.toHexString(IvfFlatVectorIndexCreator.MAGIC));
+
+      int version = in.readInt();
+      Preconditions.checkState(version == 
IvfFlatVectorIndexCreator.FORMAT_VERSION,
+          "Unsupported IVF_FLAT format version: %s, expected: %s",
+          version, IvfFlatVectorIndexCreator.FORMAT_VERSION);
+
+      _dimension = in.readInt();
+      _numVectors = in.readInt();
+      _nlist = in.readInt();
+      int distanceFunctionOrdinal = in.readInt();
+      _distanceFunction = 
VectorIndexConfig.VectorDistanceFunction.values()[distanceFunctionOrdinal];
+
+      // Clamp nprobe to valid range
+      _nprobe = Math.min(configuredNprobe, _nlist);
+      if (_nprobe <= 0) {
+        _nprobe = Math.min(DEFAULT_NPROBE, _nlist);
+      }
+
+      // --- Centroids ---
+      _centroids = new float[_nlist][_dimension];
+      for (int c = 0; c < _nlist; c++) {
+        for (int d = 0; d < _dimension; d++) {
+          _centroids[c][d] = in.readFloat();
+        }
+      }
+
+      // --- Inverted Lists ---
+      _listDocIds = new int[_nlist][];
+      _listVectors = new float[_nlist][][];
+
+      for (int c = 0; c < _nlist; c++) {
+        int listSize = in.readInt();
+        _listDocIds[c] = new int[listSize];
+        for (int i = 0; i < listSize; i++) {
+          _listDocIds[c][i] = in.readInt();
+        }
+        _listVectors[c] = new float[listSize][_dimension];
+        for (int i = 0; i < listSize; i++) {
+          for (int d = 0; d < _dimension; d++) {
+            _listVectors[c][i][d] = in.readFloat();
+          }
+        }
+      }
+
+      // We skip reading the offset table and footer since we read sequentially
+
+      LOGGER.info("Loaded IVF_FLAT index for column: {}: {} vectors, {} 
centroids, dim={}, nprobe={}, distance={}",
+          column, _numVectors, _nlist, _dimension, _nprobe, _distanceFunction);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "Failed to load IVF_FLAT index for column: " + column + " from file: 
" + indexFile, e);
+    }
+  }
+
+  @Override
+  public MutableRoaringBitmap getDocIds(float[] searchQuery, int topK) {
+    Preconditions.checkArgument(searchQuery.length == _dimension,
+        "Query dimension mismatch: expected %s, got %s", _dimension, 
searchQuery.length);
+    Preconditions.checkArgument(topK > 0, "topK must be positive, got: %s", 
topK);
+
+    if (_numVectors == 0 || _nlist == 0) {
+      return new MutableRoaringBitmap();
+    }
+
+    int effectiveNprobe = Math.min(_nprobe, _nlist);
+
+    // Step 1: Find the nprobe closest centroids
+    int[] probeCentroids = findClosestCentroids(searchQuery, effectiveNprobe);
+
+    // Step 2: Scan all vectors in the selected inverted lists, maintaining a 
max-heap of size topK
+    // Max-heap: the largest distance is at the top, so we can efficiently 
evict the worst candidate.
+    int effectiveTopK = Math.min(topK, _numVectors);
+    PriorityQueue<ScoredDoc> maxHeap = new PriorityQueue<>(effectiveTopK,
+        (a, b) -> Float.compare(b._distance, a._distance));
+
+    for (int probeIdx : probeCentroids) {
+      int[] docIds = _listDocIds[probeIdx];
+      float[][] vectors = _listVectors[probeIdx];
+
+      for (int i = 0; i < docIds.length; i++) {
+        float dist = VectorDistanceFunction.computeDistance(searchQuery, 
vectors[i], _distanceFunction);
+        if (maxHeap.size() < effectiveTopK) {
+          maxHeap.offer(new ScoredDoc(docIds[i], dist));
+        } else if (dist < maxHeap.peek()._distance) {
+          maxHeap.poll();
+          maxHeap.offer(new ScoredDoc(docIds[i], dist));
+        }
+      }
+    }
+
+    // Step 3: Collect results into a bitmap
+    MutableRoaringBitmap result = new MutableRoaringBitmap();
+    for (ScoredDoc doc : maxHeap) {
+      result.add(doc._docId);
+    }
+    return result;
+  }
+
+  /**
+   * Sets the number of centroids to probe during search.
+   * This allows query-time tuning of the recall/speed tradeoff.
+   *
+   * @param nprobe number of centroids to probe (clamped to [1, nlist])
+   */
+  public void setNprobe(int nprobe) {
+    _nprobe = Math.max(1, Math.min(nprobe, _nlist));
+  }
+
+  /**
+   * Returns the current nprobe setting.
+   */
+  public int getNprobe() {
+    return _nprobe;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    // No resources to release -- all data is in Java heap arrays
+  }
+
+  // -----------------------------------------------------------------------
+  // Internal helpers
+  // -----------------------------------------------------------------------
+
+  /**
+   * Finds the n closest centroids to the given query vector.
+   *
+   * @param query  the query vector
+   * @param n      number of centroids to return
+   * @return array of centroid indices sorted by increasing distance
+   */
+  private int[] findClosestCentroids(float[] query, int n) {
+    // Compute distance to each centroid
+    float[] centroidDistances = new float[_nlist];
+    for (int c = 0; c < _nlist; c++) {
+      centroidDistances[c] = VectorDistanceFunction.computeDistance(query, 
_centroids[c], _distanceFunction);
+    }
+
+    // Find top-n using partial sort
+    // For simplicity, use an indexed sort on centroid distances
+    Integer[] indices = new Integer[_nlist];
+    for (int i = 0; i < _nlist; i++) {
+      indices[i] = i;
+    }
+    Arrays.sort(indices, (a, b) -> Float.compare(centroidDistances[a], 
centroidDistances[b]));
+
+    int[] result = new int[n];
+    for (int i = 0; i < n; i++) {
+      result[i] = indices[i];
+    }
+    return result;

Review Comment:
   `findClosestCentroids()` allocates an `Integer[]` and fully sorts all 
centroids for every query. For typical IVF settings this adds significant 
per-query CPU/GC overhead. Consider using a primitive top-N selection (e.g., 
fixed-size max-heap over centroid distances or a partial selection algorithm) 
to avoid boxing and `O(nlist log nlist)` sorting when only `nprobe` smallest 
are needed.
   ```suggestion
       // Handle the degenerate case explicitly to avoid index math on 
zero-length arrays
       int[] bestIndices = new int[n];
       if (n == 0) {
         return bestIndices;
       }
   
       // bestDistances[i] holds the distance for bestIndices[i]; arrays are 
kept sorted by distance ascending
       float[] bestDistances = new float[n];
       Arrays.fill(bestDistances, Float.POSITIVE_INFINITY);
   
       // Single pass over centroids: compute distance and maintain the current 
top-n in sorted order
       for (int c = 0; c < _nlist; c++) {
         float distance = VectorDistanceFunction.computeDistance(query, 
_centroids[c], _distanceFunction);
   
         // If this distance is not better than the current worst in the top-n, 
skip it
         if (distance >= bestDistances[n - 1]) {
           continue;
         }
   
         // Insert (c, distance) into the sorted arrays by shifting larger 
entries to the right
         int insertPos = n - 1;
         while (insertPos > 0 && distance < bestDistances[insertPos - 1]) {
           bestDistances[insertPos] = bestDistances[insertPos - 1];
           bestIndices[insertPos] = bestIndices[insertPos - 1];
           insertPos--;
         }
         bestDistances[insertPos] = distance;
         bestIndices[insertPos] = c;
       }
   
       return bestIndices;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to