Copilot commented on code in PR #17994:
URL: https://github.com/apache/pinot/pull/17994#discussion_r2999873456
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/vector/VectorIndexType.java:
##########
@@ -161,10 +198,15 @@ public MutableIndex
createMutableIndex(MutableIndexContext context, VectorIndexC
return null;
}
- return new MutableVectorIndex(context.getSegmentName(),
context.getFieldSpec().getName(), config);
- }
-
- public enum IndexType {
- HNSW
+ VectorBackendType backendType = config.resolveBackendType();
+ switch (backendType) {
+ case HNSW:
+ return new MutableVectorIndex(context.getSegmentName(),
context.getFieldSpec().getName(), config);
+ case IVF_FLAT:
+ // IVF_FLAT does not support mutable indexes in phase 1; return null
to skip index creation.
+ return null;
Review Comment:
`createMutableIndex()` silently returns null for IVF_FLAT. This allows
IVF_FLAT to be configured on realtime/mutable segments but results in no index
being built and queries potentially falling back to expensive exact scans at
runtime. Since the PR states IVF_FLAT is immutable-only in phase 1, it would be
safer to fail validation (or throw/log loudly here) when IVF_FLAT is configured
for mutable indexes.
```suggestion
// IVF_FLAT does not support mutable indexes in phase 1; fail fast
on misconfiguration.
throw new IllegalArgumentException(
"Vector backend type IVF_FLAT does not support mutable indexes
(segment: " + context.getSegmentName()
+ ", column: " + context.getFieldSpec().getName() + ')');
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/IvfFlatVectorIndexReader.java:
##########
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.vector;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.PriorityQueue;
+import org.apache.pinot.common.function.scalar.VectorFunctions;
+import
org.apache.pinot.segment.local.segment.index.vector.IvfFlatVectorIndexCreator;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.creator.VectorIndexConfig;
+import org.apache.pinot.segment.spi.index.reader.NprobeAware;
+import org.apache.pinot.segment.spi.index.reader.VectorIndexReader;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Reader for IVF_FLAT (Inverted File with flat vectors) index.
+ *
+ * <p>Loads the entire index into memory at construction time for fast search.
+ * The search algorithm:
+ * <ol>
+ * <li>Computes distance from the query to all centroids.</li>
+ * <li>Selects the {@code nprobe} closest centroids.</li>
+ * <li>Scans all vectors in those centroids' inverted lists.</li>
+ * <li>Returns the top-K doc IDs as a bitmap.</li>
+ * </ol>
+ *
+ * <h3>Thread safety</h3>
+ * <p>This class is thread-safe for concurrent reads. The loaded index data is
immutable
+ * after construction. The only mutable state is {@code _nprobe}, which is
volatile to
+ * allow query-time tuning from another thread. However, the typical pattern is
+ * single-threaded: set nprobe, then call getDocIds.</p>
+ */
+public class IvfFlatVectorIndexReader implements VectorIndexReader,
NprobeAware {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IvfFlatVectorIndexReader.class);
+
+ /** Default nprobe value when not explicitly set. */
+ static final int DEFAULT_NPROBE = 4;
+
+ // Index data loaded from file
+ private final int _dimension;
+ private final int _numVectors;
+ private final int _nlist;
+ private final VectorIndexConfig.VectorDistanceFunction _distanceFunction;
+ private final float[][] _centroids;
+ private final int[][] _listDocIds;
+ private final float[][][] _listVectors;
+ private final String _column;
+
+ /** Number of centroids to probe during search. */
+ private volatile int _nprobe;
+
+ /**
+ * Opens and loads an IVF_FLAT index from disk.
+ *
+ * @param column the column name
+ * @param indexDir the segment index directory
+ * @param config the vector index configuration
+ * @throws RuntimeException if the index file cannot be read or is corrupt
+ */
+ public IvfFlatVectorIndexReader(String column, File indexDir,
VectorIndexConfig config) {
+ _column = column;
+
+ // Initialize nprobe to the default; query-time tuning should use
NprobeAware#setNprobe.
+ int configuredNprobe = DEFAULT_NPROBE;
+
+ File indexFile = findIvfFlatIndexFile(indexDir, column);
+ if (indexFile == null) {
+ throw new IllegalStateException(
+ "Failed to find IVF_FLAT index file for column: " + column + " in
dir: " + indexDir);
+ }
+
+ try (DataInputStream in = new DataInputStream(new
FileInputStream(indexFile))) {
+ // --- Header ---
+ int magic = in.readInt();
+ Preconditions.checkState(magic == IvfFlatVectorIndexCreator.MAGIC,
+ "Invalid IVF_FLAT magic: 0x%s, expected 0x%s",
+ Integer.toHexString(magic),
Integer.toHexString(IvfFlatVectorIndexCreator.MAGIC));
+
+ int version = in.readInt();
+ Preconditions.checkState(version ==
IvfFlatVectorIndexCreator.FORMAT_VERSION,
+ "Unsupported IVF_FLAT format version: %s, expected: %s",
+ version, IvfFlatVectorIndexCreator.FORMAT_VERSION);
+
+ _dimension = in.readInt();
+ _numVectors = in.readInt();
+ _nlist = in.readInt();
+ int distanceFunctionOrdinal = in.readInt();
+ _distanceFunction =
VectorIndexConfig.VectorDistanceFunction.values()[distanceFunctionOrdinal];
+
+ // Clamp nprobe to valid range
+ _nprobe = Math.min(configuredNprobe, _nlist);
+ if (_nprobe <= 0) {
+ _nprobe = Math.min(DEFAULT_NPROBE, _nlist);
+ }
+
+ // --- Centroids ---
+ _centroids = new float[_nlist][_dimension];
+ for (int c = 0; c < _nlist; c++) {
+ for (int d = 0; d < _dimension; d++) {
+ _centroids[c][d] = in.readFloat();
+ }
+ }
+
+ // --- Inverted Lists ---
+ _listDocIds = new int[_nlist][];
+ _listVectors = new float[_nlist][][];
+
+ for (int c = 0; c < _nlist; c++) {
+ int listSize = in.readInt();
+ _listDocIds[c] = new int[listSize];
+ for (int i = 0; i < listSize; i++) {
+ _listDocIds[c][i] = in.readInt();
+ }
+ _listVectors[c] = new float[listSize][_dimension];
+ for (int i = 0; i < listSize; i++) {
+ for (int d = 0; d < _dimension; d++) {
+ _listVectors[c][i][d] = in.readFloat();
+ }
+ }
+ }
+
+ // We skip reading the offset table and footer since we read sequentially
+
+ LOGGER.info("Loaded IVF_FLAT index for column: {}: {} vectors, {}
centroids, dim={}, nprobe={}, distance={}",
+ column, _numVectors, _nlist, _dimension, _nprobe, _distanceFunction);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to load IVF_FLAT index for column: " + column + " from file:
" + indexFile, e);
+ }
+ }
+
+ @Override
+ public MutableRoaringBitmap getDocIds(float[] searchQuery, int topK) {
+ Preconditions.checkArgument(searchQuery.length == _dimension,
+ "Query dimension mismatch: expected %s, got %s", _dimension,
searchQuery.length);
+ Preconditions.checkArgument(topK > 0, "topK must be positive, got: %s",
topK);
+
+ if (_numVectors == 0 || _nlist == 0) {
+ return new MutableRoaringBitmap();
+ }
+
+ int effectiveNprobe = Math.min(_nprobe, _nlist);
+
+ // Step 1: Find the nprobe closest centroids
+ int[] probeCentroids = findClosestCentroids(searchQuery, effectiveNprobe);
+
+ // Step 2: Scan all vectors in the selected inverted lists, maintaining a
max-heap of size topK
+ // Max-heap: the largest distance is at the top, so we can efficiently
evict the worst candidate.
+ int effectiveTopK = Math.min(topK, _numVectors);
+ PriorityQueue<ScoredDoc> maxHeap = new PriorityQueue<>(effectiveTopK,
+ (a, b) -> Float.compare(b._distance, a._distance));
+
+ for (int probeIdx : probeCentroids) {
+ int[] docIds = _listDocIds[probeIdx];
+ float[][] vectors = _listVectors[probeIdx];
+
+ for (int i = 0; i < docIds.length; i++) {
+ float dist = computeDistance(searchQuery, vectors[i]);
+ if (maxHeap.size() < effectiveTopK) {
+ maxHeap.offer(new ScoredDoc(docIds[i], dist));
+ } else if (dist < maxHeap.peek()._distance) {
+ maxHeap.poll();
+ maxHeap.offer(new ScoredDoc(docIds[i], dist));
+ }
+ }
+ }
+
+ // Step 3: Collect results into a bitmap
+ MutableRoaringBitmap result = new MutableRoaringBitmap();
+ for (ScoredDoc doc : maxHeap) {
+ result.add(doc._docId);
+ }
+ return result;
+ }
+
+ /**
+ * Sets the number of centroids to probe during search.
+ * This allows query-time tuning of the recall/speed tradeoff.
+ *
+ * @param nprobe number of centroids to probe (clamped to [1, nlist])
+ */
+ public void setNprobe(int nprobe) {
+ _nprobe = Math.max(1, Math.min(nprobe, _nlist));
+ }
Review Comment:
`setNprobe()` mutates a volatile field on the reader instance, but readers
are shared across concurrent queries. This means one query can change `_nprobe`
while another query is executing, leading to non-deterministic recall/latency.
Consider making nprobe a per-call argument (preferred) or storing it in
per-thread state so concurrent queries don’t interfere.
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectoryPaths.java:
##########
@@ -156,6 +156,12 @@ public static File findVectorIndexIndexFile(File
segmentIndexDir, String column)
vectorIndexDirectory = column +
V1Constants.Indexes.VECTOR_HNSW_INDEX_FILE_EXTENSION;
formatFile = findFormatFile(segmentIndexDir, vectorIndexDirectory);
}
+
+ // check for IVF_FLAT index, if null
+ if (formatFile == null) {
+ String ivfFlatFile = column +
V1Constants.Indexes.VECTOR_IVF_FLAT_INDEX_FILE_EXTENSION;
+ formatFile = findFormatFile(segmentIndexDir, ivfFlatFile);
+ }
return formatFile;
}
Review Comment:
`SegmentDirectoryPaths.findVectorIndexIndexFile()` is used by
`HnswVectorIndexReader` and several tests assuming it returns a Lucene
directory. Adding the IVF_FLAT file fallback here can cause HNSW reader code to
receive a flat file path (and then fail when opening it as a directory),
producing confusing errors in misconfigured/migrating deployments. Consider
keeping this helper HNSW-only and adding a separate IVF_FLAT lookup method, or
ensure callers validate the returned path type/extension.
```suggestion
return formatFile;
}
@Nullable
@VisibleForTesting
public static File findIvfFlatVectorIndexFile(File segmentIndexDir, String
column) {
String ivfFlatFile = column +
V1Constants.Indexes.VECTOR_IVF_FLAT_INDEX_FILE_EXTENSION;
return findFormatFile(segmentIndexDir, ivfFlatFile);
}
```
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/VectorSimilarityFilterOperator.java:
##########
@@ -120,6 +157,106 @@ protected void explainAttributes(ExplainAttributeBuilder
attributeBuilder) {
attributeBuilder.putString("vectorIdentifier",
_predicate.getLhs().getIdentifier());
attributeBuilder.putString("vectorLiteral",
Arrays.toString(_predicate.getValue()));
attributeBuilder.putLongIdempotent("topKtoSearch", _predicate.getTopK());
+ if (_searchParams.isExactRerank()) {
+ attributeBuilder.putString("exactRerank", "true");
+ }
+ }
+
+ /**
+ * Executes the vector search with backend-specific parameter dispatch and
optional rerank.
+ */
+ private ImmutableRoaringBitmap executeSearch() {
+ String column = _predicate.getLhs().getIdentifier();
+ float[] queryVector = _predicate.getValue();
+ int topK = _predicate.getTopK();
+
+ // 1. Configure backend-specific parameters via interfaces
+ configureBackendParams(column);
+
+ // 2. Determine effective search count (higher if rerank is enabled)
+ int searchCount = topK;
+ if (_searchParams.isExactRerank()) {
+ searchCount = _searchParams.getEffectiveMaxCandidates(topK);
+ }
+
+ // 3. Execute ANN search
+ ImmutableRoaringBitmap annResults =
_vectorIndexReader.getDocIds(queryVector, searchCount);
+ int annCandidateCount = annResults.getCardinality();
+
+ LOGGER.debug("Vector search on column: {}, backend: {}, topK: {},
searchCount: {}, annCandidates: {}",
+ column, getBackendName(), topK, searchCount, annCandidateCount);
+
+ // 4. Apply exact rerank if requested
+ if (_searchParams.isExactRerank() && _forwardIndexReader != null &&
annCandidateCount > 0) {
+ ImmutableRoaringBitmap reranked = applyExactRerank(annResults,
queryVector, topK, column);
+ LOGGER.debug("Exact rerank on column: {}, candidates: {} -> final: {}",
+ column, annCandidateCount, reranked.getCardinality());
+ return reranked;
+ }
+
+ return annResults;
+ }
+
+ /**
+ * Configures backend-specific search parameters on the reader if it
supports them.
+ */
+ private void configureBackendParams(String column) {
+ // Set nprobe on IVF_FLAT readers
+ if (_vectorIndexReader instanceof NprobeAware) {
+ int nprobe = _searchParams.getNprobe();
+ ((NprobeAware) _vectorIndexReader).setNprobe(nprobe);
+ LOGGER.debug("Set nprobe={} on IVF_FLAT reader for column: {}", nprobe,
column);
+ }
+ }
+
+ /**
+ * Re-scores ANN candidates using exact distance from the forward index and
returns top-K.
+ */
+ @SuppressWarnings("unchecked")
+ private ImmutableRoaringBitmap applyExactRerank(ImmutableRoaringBitmap
annResults, float[] queryVector,
+ int topK, String column) {
+ // Max-heap: largest distance on top for efficient eviction
+ PriorityQueue<DocDistance> maxHeap = new PriorityQueue<>(topK + 1,
+ (a, b) -> Float.compare(b._distance, a._distance));
+
+ ForwardIndexReader rawReader = _forwardIndexReader;
+ try (ForwardIndexReaderContext context = rawReader.createContext()) {
+ org.roaringbitmap.IntIterator it = annResults.getIntIterator();
+ while (it.hasNext()) {
+ int docId = it.next();
+ float[] docVector = rawReader.getFloatMV(docId, context);
+ if (docVector == null || docVector.length == 0) {
+ continue;
+ }
+ // TODO: derive distance function from segment's vector index config
instead of hardcoding L2.
+ // Currently correct for EUCLIDEAN/L2; may produce suboptimal rerank
ordering for COSINE/DOT_PRODUCT.
+ float distance =
ExactVectorScanFilterOperator.computeL2SquaredDistance(queryVector, docVector);
+ if (maxHeap.size() < topK) {
+ maxHeap.add(new DocDistance(docId, distance));
+ } else if (distance < maxHeap.peek()._distance) {
+ maxHeap.poll();
+ maxHeap.add(new DocDistance(docId, distance));
+ }
Review Comment:
Exact rerank currently hard-codes L2 squared distance
(`computeL2SquaredDistance`) regardless of the column’s configured vector
distance function. This will produce incorrect ordering for COSINE /
DOT_PRODUCT / INNER_PRODUCT indexes. Rerank should use the same distance
function as the underlying vector index (derive it from the column’s vector
index config or expose it via the reader) before re-sorting top-K.
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/VectorIndexConfigValidator.java:
##########
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.creator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Validates {@link VectorIndexConfig} for backend-specific correctness.
+ *
+ * <p>This validator ensures that:
+ * <ul>
+ * <li>Required common fields (vectorDimension, vectorDistanceFunction) are
present and valid.</li>
+ * <li>The vectorIndexType resolves to a known {@link
VectorBackendType}.</li>
+ * <li>Backend-specific properties are valid for the resolved backend
type.</li>
+ * <li>Properties belonging to a different backend are rejected with a clear
error message.</li>
+ * </ul>
+ *
+ * <p>Thread-safe: this class is stateless and all methods are static.</p>
+ */
+public final class VectorIndexConfigValidator {
+
+ // HNSW-specific property keys
+ static final Set<String> HNSW_PROPERTIES = Collections.unmodifiableSet(new
HashSet<>(
+ Arrays.asList("maxCon", "beamWidth", "maxDimensions", "maxBufferSizeMB",
+ "useCompoundFile", "mode", "commit", "commitIntervalMs",
"commitDocs")));
+
+ // IVF_FLAT-specific property keys
+ static final Set<String> IVF_FLAT_PROPERTIES =
Collections.unmodifiableSet(new HashSet<>(
+ Arrays.asList("nlist", "trainSampleSize", "trainingSeed",
"minRowsForIndex")));
+
+ // Common property keys that appear in the properties map (legacy format
stores common fields there too)
+ private static final Set<String> COMMON_PROPERTIES =
Collections.unmodifiableSet(new HashSet<>(
+ Arrays.asList("vectorIndexType", "vectorDimension",
"vectorDistanceFunction", "version")));
+
+ private VectorIndexConfigValidator() {
+ }
+
+ /**
+ * Validates the given {@link VectorIndexConfig} for backend-specific
correctness.
+ *
+ * @param config the config to validate
+ * @throws IllegalArgumentException if validation fails
+ */
+ public static void validate(VectorIndexConfig config) {
+ if (config.isDisabled()) {
+ return;
+ }
+
+ VectorBackendType backendType = resolveBackendType(config);
+ validateCommonFields(config);
+ validateBackendSpecificProperties(config, backendType);
+ }
+
+ /**
+ * Resolves the {@link VectorBackendType} from the config. Defaults to HNSW
if the
+ * vectorIndexType field is null or empty, preserving backward compatibility.
+ *
+ * @param config the config to resolve from
+ * @return the resolved backend type
+ * @throws IllegalArgumentException if the vectorIndexType is not recognized
+ */
+ public static VectorBackendType resolveBackendType(VectorIndexConfig config)
{
+ String typeString = config.getVectorIndexType();
+ if (typeString == null || typeString.isEmpty()) {
+ return VectorBackendType.HNSW;
+ }
+ return VectorBackendType.fromString(typeString);
+ }
+
+ /**
+ * Validates common fields shared across all backend types.
+ */
+ private static void validateCommonFields(VectorIndexConfig config) {
+ if (config.getVectorDimension() <= 0) {
+ throw new IllegalArgumentException(
+ "vectorDimension must be a positive integer, got: " +
config.getVectorDimension());
+ }
+
+ if (config.getVectorDistanceFunction() == null) {
+ throw new IllegalArgumentException("vectorDistanceFunction is required");
+ }
+ }
+
+ /**
+ * Validates that the properties map only contains keys valid for the
resolved backend type,
+ * and that backend-specific property values are within acceptable ranges.
+ */
+ private static void validateBackendSpecificProperties(VectorIndexConfig
config, VectorBackendType backendType) {
+ Map<String, String> properties = config.getProperties();
+ if (properties == null || properties.isEmpty()) {
+ return;
+ }
+
+ switch (backendType) {
+ case HNSW:
+ validateNoForeignProperties(properties, HNSW_PROPERTIES,
IVF_FLAT_PROPERTIES, "HNSW", "IVF_FLAT");
+ validateHnswProperties(properties);
+ break;
+ case IVF_FLAT:
+ validateNoForeignProperties(properties, IVF_FLAT_PROPERTIES,
HNSW_PROPERTIES, "IVF_FLAT", "HNSW");
+ validateIvfFlatProperties(properties);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported vector backend type: "
+ backendType);
+ }
+ }
+
+ /**
+ * Ensures that properties belonging to a foreign backend are not present.
+ * Note: this only rejects known foreign-backend keys; arbitrary unknown
keys are allowed
+ * to support forward-compatible extensibility.
+ */
+ private static void validateNoForeignProperties(Map<String, String>
properties,
+ Set<String> ownProperties, Set<String> foreignProperties,
+ String ownType, String foreignType) {
+ for (String key : properties.keySet()) {
+ if (COMMON_PROPERTIES.contains(key)) {
+ continue;
+ }
+ if (foreignProperties.contains(key)) {
+ throw new IllegalArgumentException(
+ "Property '" + key + "' is specific to " + foreignType
+ + " and cannot be used with vectorIndexType " + ownType);
+ }
+ }
+ }
+
+ /**
+ * Validates HNSW-specific property values.
+ */
+ private static void validateHnswProperties(Map<String, String> properties) {
+ validatePositiveIntProperty(properties, "maxCon", "HNSW maxCon");
+ validatePositiveIntProperty(properties, "beamWidth", "HNSW beamWidth");
+ validatePositiveIntProperty(properties, "maxDimensions", "HNSW
maxDimensions");
+ validatePositiveDoubleProperty(properties, "maxBufferSizeMB", "HNSW
maxBufferSizeMB");
+ }
+
+ /**
+ * Validates IVF_FLAT-specific property values.
+ */
+ private static void validateIvfFlatProperties(Map<String, String>
properties) {
+ validatePositiveIntProperty(properties, "nlist", "IVF_FLAT nlist");
+ validatePositiveIntProperty(properties, "trainSampleSize", "IVF_FLAT
trainSampleSize");
+ validatePositiveIntProperty(properties, "minRowsForIndex", "IVF_FLAT
minRowsForIndex");
Review Comment:
`minRowsForIndex` is treated as an IVF_FLAT-specific property and validated
here, but there are no usages of this property in the IVF_FLAT creator/handler
code in this PR. If it’s intended to gate index creation, it should be enforced
at build time; otherwise consider removing it from the supported/validated
properties to avoid a misleading no-op configuration.
```suggestion
```
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/VectorSimilarityFilterOperator.java:
##########
@@ -22,58 +22,95 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.PriorityQueue;
+import javax.annotation.Nullable;
import
org.apache.pinot.common.request.context.predicate.VectorSimilarityPredicate;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.ExplainAttributeBuilder;
import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.index.reader.NprobeAware;
import org.apache.pinot.segment.spi.index.reader.VectorIndexReader;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.trace.FilterType;
import org.apache.pinot.spi.trace.InvocationRecording;
import org.apache.pinot.spi.trace.Tracing;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Operator for Vector Search query.
- * <p>Currently, we only support vector similarity search on float array
column.
- * Example:
- * {
- * "type": "vectorSimilarity",
- * "leftValue": "embedding",
- * "rightValue": [1.0, 2.0, 3.0],
- * "topK": 10
- * }
+ * Operator for vector similarity search using an ANN index (HNSW or IVF_FLAT).
*
+ * <p>This operator supports backend-neutral vector search with the following
capabilities:</p>
+ * <ul>
+ * <li><b>nprobe dispatch:</b> If the underlying reader implements {@link
NprobeAware}, the
+ * {@code vector.nprobe} query option is applied before search.</li>
+ * <li><b>Exact rerank:</b> When {@code vector.exactRerank=true}, ANN
candidates are re-scored
+ * using exact distance from the forward index and re-sorted before
final top-K selection.</li>
+ * <li><b>maxCandidates:</b> Controls how many ANN candidates are retrieved
before rerank. Only
+ * meaningful when rerank is enabled.</li>
Review Comment:
The Javadoc refers to query options as `vector.nprobe`,
`vector.exactRerank`, and `vector.maxCandidates`, but the actual query option
keys added in `CommonConstants.QueryOptionKey` are `vectorNprobe`,
`vectorExactRerank`, and `vectorMaxCandidates`. Align the documentation with
the real option names to avoid user confusion.
```suggestion
* {@code vectorNprobe} query option is applied before search.</li>
* <li><b>Exact rerank:</b> When {@code vectorExactRerank=true}, ANN
candidates are re-scored
* using exact distance from the forward index and re-sorted before
final top-K selection.</li>
* <li><b>maxCandidates:</b> The {@code vectorMaxCandidates} query option
controls how many ANN
* candidates are retrieved before rerank. Only meaningful when rerank
is enabled.</li>
```
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/VectorSimilarityFilterOperator.java:
##########
@@ -120,6 +157,106 @@ protected void explainAttributes(ExplainAttributeBuilder
attributeBuilder) {
attributeBuilder.putString("vectorIdentifier",
_predicate.getLhs().getIdentifier());
attributeBuilder.putString("vectorLiteral",
Arrays.toString(_predicate.getValue()));
attributeBuilder.putLongIdempotent("topKtoSearch", _predicate.getTopK());
+ if (_searchParams.isExactRerank()) {
+ attributeBuilder.putString("exactRerank", "true");
+ }
+ }
+
+ /**
+ * Executes the vector search with backend-specific parameter dispatch and
optional rerank.
+ */
+ private ImmutableRoaringBitmap executeSearch() {
+ String column = _predicate.getLhs().getIdentifier();
+ float[] queryVector = _predicate.getValue();
+ int topK = _predicate.getTopK();
+
+ // 1. Configure backend-specific parameters via interfaces
+ configureBackendParams(column);
+
+ // 2. Determine effective search count (higher if rerank is enabled)
+ int searchCount = topK;
+ if (_searchParams.isExactRerank()) {
+ searchCount = _searchParams.getEffectiveMaxCandidates(topK);
+ }
+
+ // 3. Execute ANN search
+ ImmutableRoaringBitmap annResults =
_vectorIndexReader.getDocIds(queryVector, searchCount);
+ int annCandidateCount = annResults.getCardinality();
+
+ LOGGER.debug("Vector search on column: {}, backend: {}, topK: {},
searchCount: {}, annCandidates: {}",
+ column, getBackendName(), topK, searchCount, annCandidateCount);
+
+ // 4. Apply exact rerank if requested
+ if (_searchParams.isExactRerank() && _forwardIndexReader != null &&
annCandidateCount > 0) {
+ ImmutableRoaringBitmap reranked = applyExactRerank(annResults,
queryVector, topK, column);
+ LOGGER.debug("Exact rerank on column: {}, candidates: {} -> final: {}",
+ column, annCandidateCount, reranked.getCardinality());
+ return reranked;
+ }
+
+ return annResults;
+ }
+
+ /**
+ * Configures backend-specific search parameters on the reader if it
supports them.
+ */
+ private void configureBackendParams(String column) {
+ // Set nprobe on IVF_FLAT readers
+ if (_vectorIndexReader instanceof NprobeAware) {
+ int nprobe = _searchParams.getNprobe();
+ ((NprobeAware) _vectorIndexReader).setNprobe(nprobe);
+ LOGGER.debug("Set nprobe={} on IVF_FLAT reader for column: {}", nprobe,
column);
Review Comment:
`VectorSimilarityFilterOperator` calls `setNprobe()` on the shared
`VectorIndexReader` instance. Index readers are created once per segment and
reused across queries, so mutating reader state introduces cross-query race
conditions (concurrent queries can overwrite each other’s nprobe). Prefer
passing nprobe as a parameter to the search call, or store it in a
per-thread/per-query context (e.g., ThreadLocal) rather than on the shared
reader instance.
```suggestion
*
* <p>NOTE: We intentionally avoid mutating the shared {@link
VectorIndexReader} instance here
* to prevent cross-query race conditions. Per-query nprobe tuning must be
implemented in a
* thread-safe manner (e.g., via per-query context passed to the reader)
rather than by
* changing shared reader state.</p>
*/
private void configureBackendParams(String column) {
// nprobe is currently not applied to the shared reader to avoid
cross-query races.
if (_vectorIndexReader instanceof NprobeAware) {
int nprobe = _searchParams.getNprobe();
LOGGER.debug(
"Requested nprobe={} for NprobeAware reader on column '{}' is
currently ignored to avoid "
+ "mutating shared VectorIndexReader state.",
nprobe, column);
```
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExactVectorScanFilterOperator.java:
##########
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.google.common.base.CaseFormat;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.pinot.common.function.scalar.VectorFunctions;
+import
org.apache.pinot.common.request.context.predicate.VectorSimilarityPredicate;
+import org.apache.pinot.core.common.BlockDocIdSet;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.ExplainAttributeBuilder;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.trace.FilterType;
+import org.apache.pinot.spi.trace.InvocationRecording;
+import org.apache.pinot.spi.trace.Tracing;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Fallback operator that performs exact brute-force vector similarity search
by scanning the forward index.
+ *
+ * <p>This operator is used when no ANN vector index exists on a segment for
the target column
+ * (e.g., the segment was built before the vector index was added, or the
index type is not
+ * supported). It reads all vectors from the forward index, computes exact
distances to the
+ * query vector, and returns the top-K closest document IDs.</p>
+ *
+ * <p>The distance computation uses L2 (Euclidean) squared distance. For
COSINE similarity,
+ * vectors should be pre-normalized. This matches the behavior of Lucene's
HNSW implementation.</p>
+ *
+ * <p>This operator is intentionally simple and correct rather than fast -- it
is a safety net.
+ * A warning is logged when this operator is used because it scans all
documents in the segment.</p>
+ *
+ * <p>This class is thread-safe for single-threaded execution per query (same
as other filter operators).</p>
+ */
+public class ExactVectorScanFilterOperator extends BaseFilterOperator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ExactVectorScanFilterOperator.class);
+ private static final String EXPLAIN_NAME = "VECTOR_SIMILARITY_EXACT_SCAN";
+
+ private final ForwardIndexReader<?> _forwardIndexReader;
+ private final VectorSimilarityPredicate _predicate;
+ private final String _column;
+ private ImmutableRoaringBitmap _matches;
+
+ /**
+ * Creates an exact scan operator.
+ *
+ * @param forwardIndexReader the forward index reader for the vector column
+ * @param predicate the vector similarity predicate containing query vector
and top-K
+ * @param column the column name (for logging and explain)
+ * @param numDocs the total number of documents in the segment
+ */
+ public ExactVectorScanFilterOperator(ForwardIndexReader<?>
forwardIndexReader,
+ VectorSimilarityPredicate predicate, String column, int numDocs) {
+ super(numDocs, false);
+ _forwardIndexReader = forwardIndexReader;
+ _predicate = predicate;
+ _column = column;
+ }
+
+ @Override
+ protected BlockDocIdSet getTrues() {
+ if (_matches == null) {
+ _matches = computeExactTopK();
+ }
+ return new BitmapDocIdSet(_matches, _numDocs);
+ }
+
+ @Override
+ public int getNumMatchingDocs() {
+ if (_matches == null) {
+ _matches = computeExactTopK();
+ }
+ return _matches.getCardinality();
+ }
+
+ @Override
+ public boolean canProduceBitmaps() {
+ return true;
+ }
+
+ @Override
+ public BitmapCollection getBitmaps() {
+ if (_matches == null) {
+ _matches = computeExactTopK();
+ }
+ record(_matches);
+ return new BitmapCollection(_numDocs, false, _matches);
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME + "(indexLookUp:exact_scan"
+ + ", operator:" + _predicate.getType()
+ + ", vector identifier:" + _column
+ + ", vector literal:" + Arrays.toString(_predicate.getValue())
+ + ", topK to search:" + _predicate.getTopK()
+ + ')';
+ }
+
+ @Override
+ protected String getExplainName() {
+ return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL,
EXPLAIN_NAME);
+ }
+
+ @Override
+ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) {
+ super.explainAttributes(attributeBuilder);
+ attributeBuilder.putString("indexLookUp", "exact_scan");
+ attributeBuilder.putString("operator", _predicate.getType().name());
+ attributeBuilder.putString("vectorIdentifier", _column);
+ attributeBuilder.putString("vectorLiteral",
Arrays.toString(_predicate.getValue()));
+ attributeBuilder.putLongIdempotent("topKtoSearch", _predicate.getTopK());
+ }
+
+ /**
+ * Performs brute-force exact search over all documents in the segment.
+ * Uses a max-heap to maintain the top-K closest vectors.
+ */
+ @SuppressWarnings("unchecked")
+ private ImmutableRoaringBitmap computeExactTopK() {
+ LOGGER.warn("Performing exact vector scan fallback on column: {} for
segment with {} docs. "
+ + "This is expensive -- consider adding a vector index.", _column,
_numDocs);
+
+ float[] queryVector = _predicate.getValue();
+ int topK = _predicate.getTopK();
+
+ // Max-heap: entry with largest distance is at the top so we can
efficiently evict it
+ PriorityQueue<DocDistance> maxHeap = new PriorityQueue<>(topK + 1,
+ (a, b) -> Float.compare(b._distance, a._distance));
+
+ ForwardIndexReader rawReader = _forwardIndexReader;
+ try (ForwardIndexReaderContext context = rawReader.createContext()) {
+ for (int docId = 0; docId < _numDocs; docId++) {
+ float[] docVector = rawReader.getFloatMV(docId, context);
+ if (docVector == null || docVector.length == 0) {
+ continue;
+ }
+ float distance = computeL2SquaredDistance(queryVector, docVector);
+ if (maxHeap.size() < topK) {
+ maxHeap.add(new DocDistance(docId, distance));
+ } else if (distance < maxHeap.peek()._distance) {
+ maxHeap.poll();
+ maxHeap.add(new DocDistance(docId, distance));
+ }
+ }
Review Comment:
The exact-scan fallback always ranks by L2 squared distance. For segments
without an ANN index, this can return wrong results when the table’s vector
index config uses COSINE / DOT_PRODUCT / INNER_PRODUCT. The fallback operator
should compute distances using the column’s configured distance function (or
fail fast if that cannot be determined), rather than assuming L2.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]