jpountz commented on code in PR #14097: URL: https://github.com/apache/lucene/pull/14097#discussion_r1905528784
########## lucene/misc/src/java/org/apache/lucene/misc/index/BpVectorReorderer.java: ########## @@ -0,0 +1,788 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.misc.index; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.RecursiveAction; +import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FloatVectorValues; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.Sorter; +import org.apache.lucene.index.SortingCodecReader; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.TaskExecutor; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.CloseableThreadLocal; +import org.apache.lucene.util.IntroSelector; +import org.apache.lucene.util.IntsRef; +import org.apache.lucene.util.VectorUtil; + +/** + * Implementation of "recursive graph bisection", also called "bipartite graph partitioning" and + * often abbreviated BP, an approach to doc ID assignment that aims at reducing the sum of the log + * gap between consecutive neighbor node ids. See BPIndexReorderer in misc module. + */ +public class BpVectorReorderer extends AbstractBPReorderer { + + /* + * Note on using centroids (mean of vectors in a partition) to maximize scores of pairs of vectors within each partition: + * The function used to compare the vectors must have higher values for vectors + * that are more similar to each other, and must preserve inequalities over sums of vectors. + * + * <p>This property enables us to use the centroid of a collection of vectors to represent the + * collection. For Euclidean and inner product score functions, the centroid is the point that + * minimizes the sum of distances from all the points (thus maximizing the score). + * + * <p>sum((c0 - v)^2) = n * c0^2 - 2 * c0 * sum(v) + sum(v^2) taking derivative w.r.t. c0 and + * setting to 0 we get sum(v) = n * c0; i.e. c0 (the centroid) is the place that minimizes the sum + * of (l2) distances from the vectors (thus maximizing the euclidean score function). + * + * <p>to maximize dot-product over unit vectors, note that: sum(dot(c0, v)) = dot(c0, sum(v)) + * which is maximized, again, when c0 = sum(v) / n. For max inner product score, vectors may not + * be unit vectors. In this case there is no maximum, but since all colinear vectors of whatever + * scale will generate the same partition for these angular scores, we are free to choose any + * scale and ignore the normalization factor. + * + */ + + /** Minimum problem size that will result in tasks being split. */ + private static final int FORK_THRESHOLD = 8192; + + /** + * Limits how many incremental updates we do before initiating a full recalculation. Some wasted + * work is done when this is exceeded, but more is saved when it is not. Setting this to zero + * prevents any incremental updates from being done, instead the centroids are fully recalculated + * for each iteration. We're not able to make it very big since too much numerical error + * accumulates, which seems to be around 50, thus resulting in suboptimal reordering. It's not + * clear how helpful this is though; measurements vary. + */ + private static final int MAX_CENTROID_UPDATES = 0; + + private final String partitionField; + + /** Constructor. */ + public BpVectorReorderer(String partitionField) { + setMinPartitionSize(DEFAULT_MIN_PARTITION_SIZE); + setMaxIters(DEFAULT_MAX_ITERS); + // 10% of the available heap size by default + setRAMBudgetMB(Runtime.getRuntime().totalMemory() / 1024d / 1024d / 10d); + this.partitionField = partitionField; + } + + private static class PerThreadState { + + final FloatVectorValues vectors; + final float[] leftCentroid; + final float[] rightCentroid; + final float[] scratch; + + PerThreadState(FloatVectorValues vectors) { + try { + this.vectors = vectors.copy(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + leftCentroid = new float[vectors.dimension()]; + rightCentroid = new float[leftCentroid.length]; + scratch = new float[leftCentroid.length]; + } + } + + private static class DocMap extends Sorter.DocMap { + + private final int[] newToOld; + private final int[] oldToNew; + + public DocMap(int[] newToOld) { + this.newToOld = newToOld; + oldToNew = new int[newToOld.length]; + for (int i = 0; i < newToOld.length; ++i) { + oldToNew[newToOld[i]] = i; + } + } + + @Override + public int size() { + return newToOld.length; + } + + @Override + public int oldToNew(int docID) { + return oldToNew[docID]; + } + + @Override + public int newToOld(int docID) { + return newToOld[docID]; + } + } + + private abstract class BaseRecursiveAction extends RecursiveAction { + + protected final TaskExecutor executor; + protected final int depth; + + BaseRecursiveAction(TaskExecutor executor, int depth) { + this.executor = executor; + this.depth = depth; + } + + protected final boolean shouldFork(int problemSize, int totalProblemSize) { + if (executor == null) { + return false; + } + if (getSurplusQueuedTaskCount() > 3) { + // Fork tasks if this worker doesn't have more queued work than other workers + // See javadocs of #getSurplusQueuedTaskCount for more details + return false; + } + if (problemSize == totalProblemSize) { + // Sometimes fork regardless of the problem size to make sure that unit tests also exercise + // forking + return true; + } + return problemSize > FORK_THRESHOLD; + } + } + + private class ReorderTask extends BaseRecursiveAction { + + private final VectorSimilarityFunction vectorScore; + // the ids assigned to this task, a sub-range of all the ids + private final IntsRef ids; + // the biases for the ids - a number < 0 when the doc goes left and > 0 for right + private final float[] biases; + private final CloseableThreadLocal<PerThreadState> threadLocal; + + ReorderTask( + IntsRef ids, + float[] biases, + CloseableThreadLocal<PerThreadState> threadLocal, + TaskExecutor executor, + int depth, + VectorSimilarityFunction vectorScore) { + super(executor, depth); + this.ids = ids; + this.biases = biases; + this.threadLocal = threadLocal; + this.vectorScore = vectorScore; + } + + @Override + protected void compute() { + if (depth > 0) { + Arrays.sort(ids.ints, ids.offset, ids.offset + ids.length); + } else { + assert sorted(ids); + } + + int halfLength = ids.length >>> 1; + if (halfLength < minPartitionSize) { + return; + } + + // split the ids in half + IntsRef left = new IntsRef(ids.ints, ids.offset, halfLength); + IntsRef right = new IntsRef(ids.ints, ids.offset + halfLength, ids.length - halfLength); + + PerThreadState state = threadLocal.get(); + FloatVectorValues vectors = state.vectors; + float[] leftCentroid = state.leftCentroid; + float[] rightCentroid = state.rightCentroid; + float[] scratch = state.scratch; + + try { + computeCentroid(left, vectors, leftCentroid, vectorScore); + computeCentroid(right, vectors, rightCentroid, vectorScore); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + for (int iter = 0; iter < maxIters; ++iter) { + int moved; + try { + moved = shuffle(vectors, ids, right.offset, leftCentroid, rightCentroid, scratch, biases); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + if (moved == 0) { + break; + } + if (moved > MAX_CENTROID_UPDATES) { + // if we swapped too many times we don't use the relative calculation because it + // introduces too much error + try { + computeCentroid(left, vectors, leftCentroid, vectorScore); + computeCentroid(right, vectors, rightCentroid, vectorScore); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + // It is fine for all tasks to share the same docs / biases array since they all work on + // different slices of the array at a given point in time. + ReorderTask leftTask = + new ReorderTask(left, biases, threadLocal, executor, depth + 1, vectorScore); + ReorderTask rightTask = + new ReorderTask(right, biases, threadLocal, executor, depth + 1, vectorScore); + + if (shouldFork(ids.length, ids.ints.length)) { + invokeAll(leftTask, rightTask); + } else { + leftTask.compute(); + rightTask.compute(); + } + } + + static void computeCentroid( + IntsRef ids, + FloatVectorValues vectors, + float[] centroid, + VectorSimilarityFunction vectorSimilarity) + throws IOException { + Arrays.fill(centroid, 0); + for (int i = ids.offset; i < ids.offset + ids.length; i++) { + VectorUtil.add(centroid, vectors.vectorValue(ids.ints[i])); + } + switch (vectorSimilarity) { + case EUCLIDEAN, MAXIMUM_INNER_PRODUCT -> vectorScalarMul(1 / (float) ids.length, centroid); + case DOT_PRODUCT, COSINE -> + vectorScalarMul( + 1 / (float) Math.sqrt(VectorUtil.dotProduct(centroid, centroid)), centroid); + } + ; + } + + /** Shuffle IDs across both partitions so that each partition is closer to its centroid. */ + private int shuffle( + FloatVectorValues vectors, + IntsRef ids, + int midPoint, + float[] leftCentroid, + float[] rightCentroid, + float[] scratch, + float[] biases) + throws IOException { + + // Computing biases is typically a bottleneck, because each iteration needs to iterate over + // all postings to recompute biases, and the total number of postings is usually one order of + // magnitude or more than the number of docs. So we try to parallelize it. Review Comment: I had overlooked the number of dimensions, good point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org