msokolov commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367900707
########## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ########## @@ -635,17 +667,31 @@ private static DocsWithFieldSet writeVectorData( return docsWithField; } + private HnswGraphMerger createGraphMerger( + FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) { + if (exec != null) { + return new ConcurrentHnswMerger( + fieldInfo, scorerSupplier, M, beamWidth, exec, numMergeWorkers); + } + return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth); + } + @Override public void close() throws IOException { IOUtils.close(meta, vectorData, vectorIndex); + if (exec != null) { + exec.shutdownNow(); + } + System.out.println( + "Total contention time: " + NeighborArray.contentionTime.get() / 1000000 + " ms"); Review Comment: curious what this showed - can you publish some accounting? ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; Review Comment: maybe comment this? I guess it's the number of vectors we handle sequentially in each task? ########## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ########## @@ -26,17 +26,39 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.KnnFieldVectorsWriter; import org.apache.lucene.codecs.KnnVectorsWriter; -import org.apache.lucene.index.*; Review Comment: I guess spotless likes these *-imports?? surprising ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); + + public HnswConcurrentMergeBuilder( + ExecutorService exec, + int numWorker, + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + OnHeapHnswGraph hnsw, + BitSet initializedNodes) + throws IOException { + this.exec = exec; + AtomicInteger workProgress = new AtomicInteger(0); + workers = new ConcurrentMergeWorker[numWorker]; + for (int i = 0; i < numWorker; i++) { + workers[i] = + new ConcurrentMergeWorker( + scorerSupplier.copy(), + M, + beamWidth, + HnswGraphBuilder.randSeed, + hnsw, + initializedNodes, + workProgress); + } + } + + @Override + public OnHeapHnswGraph build(int maxOrd) throws IOException { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message( + HNSW_COMPONENT, + "build graph from " + maxOrd + " vectors, with " + workers.length + " workers"); + } + List<Future<?>> futures = new ArrayList<>(); + for (int i = 0; i < workers.length; i++) { + int finalI = i; + futures.add( + exec.submit( + () -> { + try { + workers[finalI].run(maxOrd); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + Throwable exc = null; + for (Future<?> future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + var newException = new ThreadInterruptedException(e); + if (exc == null) { + exc = newException; + } else { + exc.addSuppressed(newException); + } + } catch (ExecutionException e) { + if (exc == null) { + exc = e.getCause(); + } else { + exc.addSuppressed(e.getCause()); + } + } + } + if (exc != null) { + // The error handling was copied from TaskExecutor. should we just use TaskExecutor instead? + throw IOUtils.rethrowAlways(exc); + } + return workers[0].getGraph(); + } + + @Override + public void addGraphNode(int node) throws IOException { + throw new UnsupportedOperationException("This builder is for merge only"); + } + + @Override + public void setInfoStream(InfoStream infoStream) { + this.infoStream = infoStream; + for (IHnswGraphBuilder worker : workers) { + worker.setInfoStream(infoStream); + } + } + + @Override + public OnHeapHnswGraph getGraph() { + return workers[0].getGraph(); + } + + private static final class ConcurrentMergeWorker extends HnswGraphBuilder { + + private final AtomicInteger workProgress; + private final BitSet initializedNodes; + + private ConcurrentMergeWorker( + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + long seed, + OnHeapHnswGraph hnsw, + BitSet initializedNodes, + AtomicInteger workProgress) + throws IOException { + super( + scorerSupplier, + M, + beamWidth, + seed, + hnsw, + new MergeSearcher( + new NeighborQueue(beamWidth, true), new FixedBitSet(hnsw.maxNodeId() + 1))); + this.workProgress = workProgress; + this.initializedNodes = initializedNodes; + } + + private void run(int maxOrd) throws IOException { Review Comment: This seems a bit complex - wouldn't it be simpler to divide the ords "up front" in the control loop? ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; - for (int node = 0; node < maxOrd; node++) { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); + } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + + // initializedNodes.size()); + for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + + // hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { + addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { + /* + Note: this implementation is thread safe when graph size is fixed (e.g. when merging) + The process of adding a node is roughly: + 1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done + 2. Do the search from top to bottom, remember all the possible neighbours on each level the node + is on. + 3. Add the neighbor to the node from bottom to top level, when adding the neighbour, + we always add all the outgoing links first before adding incoming link such that + when a search visiting this node, it can always find a way out + 4. If the node has level that is less or equal to graph level, then we're done here. + If the node has level larger than graph level, then we need to promote the node + as the entry node. If, while we add the node to the graph, the entry node has changed + (which means the graph level has changed as well), we need to reinsert the node + to the newly introduced levels (repeating step 2,3 for new levels) and again try to + promote the node to entry node. + */ RandomVectorScorer scorer = scorerSupplier.scorer(node); final int nodeLevel = getRandomGraphLevel(ml, random); - int curMaxLevel = hnsw.numLevels() - 1; - - // If entrynode is -1, then this should finish without adding neighbors - if (hnsw.entryNode() == -1) { - for (int level = nodeLevel; level >= 0; level--) { - hnsw.addNode(level, node); - } + // first add nodes to all levels + for (int level = nodeLevel; level >= 0; level--) { + hnsw.addNode(level, node); + } + // then promote itself as entry node if entry node is not set + if (hnsw.trySetNewEntryNode(node, nodeLevel)) { return; } - int[] eps = new int[] {hnsw.entryNode()}; + // if the entry node is already set, then we have to do all connections first before we can + // promote ourselves as entry node + // do connections from bottom up + int lowestUnsetLevel = 0; + int curMaxLevel; + do { + curMaxLevel = hnsw.numLevels() - 1; + // NOTE: the entry node and max level may not be paired, but because we get the level first + // we ensure that the entry node we get later will always exist on the curMaxLevel + int[] eps = new int[] {hnsw.entryNode()}; + // for levels > nodeLevel search with topk = 1 + GraphBuilderKnnCollector candidates = entryCandidates; + for (int level = curMaxLevel; level > nodeLevel; level--) { + candidates.clear(); + graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); + eps = new int[] {candidates.popNode()}; + } - // if a node introduces new levels to the graph, add this new node on new levels - for (int level = nodeLevel; level > curMaxLevel; level--) { - hnsw.addNode(level, node); - } + // for levels <= nodeLevel search with topk = beamWidth, and add connections + candidates = beamCandidates; + NeighborArray[] scratchPerLevel = + new NeighborArray[Math.min(nodeLevel, curMaxLevel) - lowestUnsetLevel + 1]; + for (int i = scratchPerLevel.length - 1; i >= 0; i--) { + int level = i + lowestUnsetLevel; + candidates.clear(); + graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); + eps = candidates.popUntilNearestKNodes(); + scratchPerLevel[i] = new NeighborArray(Math.max(beamCandidates.k(), M + 1), false); + popToScratch(candidates, scratchPerLevel[i]); + } - // for levels > nodeLevel search with topk = 1 - GraphBuilderKnnCollector candidates = entryCandidates; - for (int level = curMaxLevel; level > nodeLevel; level--) { - candidates.clear(); - graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); - eps = new int[] {candidates.popNode()}; - } - // for levels <= nodeLevel search with topk = beamWidth, and add connections - candidates = beamCandidates; - for (int level = Math.min(nodeLevel, curMaxLevel); level >= 0; level--) { - candidates.clear(); - graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); - eps = candidates.popUntilNearestKNodes(); - hnsw.addNode(level, node); - addDiverseNeighbors(level, node, candidates); - } + for (int i = 0; i < scratchPerLevel.length; i++) { Review Comment: hmm well we searched from top down and then add the nodes from bottom up! ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphMerger.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import java.io.IOException; +import org.apache.lucene.codecs.KnnVectorsReader; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.InfoStream; + +/** + * Abstraction of merging multiple sealed graph into one on heap graph Review Comment: I'm not sure what "sealed" means here - I guess these are accessed read-only, but that's always the case with merge operations. ########## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ########## @@ -50,13 +72,24 @@ public final class Lucene95HnswVectorsWriter extends KnnVectorsWriter { private final IndexOutput meta, vectorData, vectorIndex; private final int M; private final int beamWidth; + private final int numMergeWorkers; + private final ExecutorService exec; private final List<FieldWriter<?>> fields = new ArrayList<>(); private boolean finished; - Lucene95HnswVectorsWriter(SegmentWriteState state, int M, int beamWidth) throws IOException { + Lucene95HnswVectorsWriter(SegmentWriteState state, int M, int beamWidth, int numMergeWorkers) + throws IOException { this.M = M; this.beamWidth = beamWidth; + this.numMergeWorkers = numMergeWorkers; + if (numMergeWorkers <= 1) { + exec = null; + } else { + exec = + Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("hnsw-merge")); Review Comment: shouldn't this be allocating `numMergeWorkers` threads here? ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); + + public HnswConcurrentMergeBuilder( + ExecutorService exec, + int numWorker, + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + OnHeapHnswGraph hnsw, + BitSet initializedNodes) + throws IOException { + this.exec = exec; + AtomicInteger workProgress = new AtomicInteger(0); + workers = new ConcurrentMergeWorker[numWorker]; + for (int i = 0; i < numWorker; i++) { + workers[i] = + new ConcurrentMergeWorker( + scorerSupplier.copy(), + M, + beamWidth, + HnswGraphBuilder.randSeed, Review Comment: I wonder if we ought to generate different seeds for each worker although probably it makes no difference. In any case we are not going to produce a deterministic graph. ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); Review Comment: should this be the segmentState.infoStream? I'm a little unclear about how InfoStream works ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; - for (int node = 0; node < maxOrd; node++) { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); + } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + + // initializedNodes.size()); + for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + + // hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { + addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { + /* + Note: this implementation is thread safe when graph size is fixed (e.g. when merging) + The process of adding a node is roughly: + 1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done + 2. Do the search from top to bottom, remember all the possible neighbours on each level the node + is on. + 3. Add the neighbor to the node from bottom to top level, when adding the neighbour, + we always add all the outgoing links first before adding incoming link such that + when a search visiting this node, it can always find a way out + 4. If the node has level that is less or equal to graph level, then we're done here. + If the node has level larger than graph level, then we need to promote the node + as the entry node. If, while we add the node to the graph, the entry node has changed + (which means the graph level has changed as well), we need to reinsert the node + to the newly introduced levels (repeating step 2,3 for new levels) and again try to + promote the node to entry node. + */ Review Comment: Thanks for the comments, this is a big help to understanding ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -33,7 +33,7 @@ * Builder for HNSW graph. See {@link HnswGraph} for a gloss on the algorithm and the meaning of the * hyper-parameters. */ -public class HnswGraphBuilder { +public class HnswGraphBuilder implements IHnswGraphBuilder { Review Comment: I'll just note that we don't seem to use the `ICLassName` naming convention for interfaces elsewhere in this code base. I think I'd prefer to use `HnswGraphBuilder` for the interface name and rename this class to `SequentialHnswMerger` or so. I'm also unsure whether an interface is the right choice or we should use an abstract base class. It used to be an abstract base is more flexible since you can add methods to it in a backwards-compatible way. Although maybe with default methods that is no longer a concern? ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); + + public HnswConcurrentMergeBuilder( + ExecutorService exec, + int numWorker, + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + OnHeapHnswGraph hnsw, + BitSet initializedNodes) + throws IOException { + this.exec = exec; + AtomicInteger workProgress = new AtomicInteger(0); + workers = new ConcurrentMergeWorker[numWorker]; + for (int i = 0; i < numWorker; i++) { + workers[i] = + new ConcurrentMergeWorker( + scorerSupplier.copy(), + M, + beamWidth, + HnswGraphBuilder.randSeed, + hnsw, + initializedNodes, + workProgress); + } + } + + @Override + public OnHeapHnswGraph build(int maxOrd) throws IOException { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message( + HNSW_COMPONENT, + "build graph from " + maxOrd + " vectors, with " + workers.length + " workers"); + } + List<Future<?>> futures = new ArrayList<>(); + for (int i = 0; i < workers.length; i++) { + int finalI = i; + futures.add( + exec.submit( + () -> { + try { + workers[finalI].run(maxOrd); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + Throwable exc = null; + for (Future<?> future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + var newException = new ThreadInterruptedException(e); + if (exc == null) { + exc = newException; + } else { + exc.addSuppressed(newException); + } + } catch (ExecutionException e) { + if (exc == null) { + exc = e.getCause(); + } else { + exc.addSuppressed(e.getCause()); + } + } + } + if (exc != null) { + // The error handling was copied from TaskExecutor. should we just use TaskExecutor instead? + throw IOUtils.rethrowAlways(exc); + } + return workers[0].getGraph(); + } + + @Override + public void addGraphNode(int node) throws IOException { + throw new UnsupportedOperationException("This builder is for merge only"); + } + + @Override + public void setInfoStream(InfoStream infoStream) { + this.infoStream = infoStream; + for (IHnswGraphBuilder worker : workers) { + worker.setInfoStream(infoStream); + } + } + + @Override + public OnHeapHnswGraph getGraph() { + return workers[0].getGraph(); + } + + private static final class ConcurrentMergeWorker extends HnswGraphBuilder { + + private final AtomicInteger workProgress; + private final BitSet initializedNodes; + + private ConcurrentMergeWorker( + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + long seed, + OnHeapHnswGraph hnsw, + BitSet initializedNodes, + AtomicInteger workProgress) + throws IOException { + super( + scorerSupplier, + M, + beamWidth, + seed, + hnsw, + new MergeSearcher( + new NeighborQueue(beamWidth, true), new FixedBitSet(hnsw.maxNodeId() + 1))); + this.workProgress = workProgress; + this.initializedNodes = initializedNodes; + } + + private void run(int maxOrd) throws IOException { + int start = getStartPos(maxOrd); + int end; + while (start != -1) { + end = Math.min(maxOrd, start + BATCH_SIZE); + addVectors(start, end); + start = getStartPos(maxOrd); + } + } + + private int getStartPos(int maxOrd) { + int start = workProgress.get(); + while (start < maxOrd) { + if (workProgress.compareAndSet(start, start + BATCH_SIZE)) { + break; + } else { + assert workProgress.get() > start; + start = workProgress.get(); + } + } + if (start < maxOrd) { + return start; + } else { + return -1; + } + } + + @Override + public void addGraphNode(int node) throws IOException { + if (initializedNodes != null && initializedNodes.get(node)) { + return; + } + super.addGraphNode(node); + } + } + + /** + * This searcher will obtain the lock and make a copy of neighborArray when seeking the graph such + * that concurrent modification of the graph will not impact the search + */ + private static class MergeSearcher extends HnswGraphSearcher { + private int[] nodeBuffer; + private int upto; + private int size; + + private MergeSearcher(NeighborQueue candidates, BitSet visited) { + super(candidates, visited); + } + + @Override + void graphSeek(HnswGraph graph, int level, int targetNode) { + NeighborArray neighborArray = ((OnHeapHnswGraph) graph).getNeighbors(level, targetNode); + long start = System.nanoTime(); + neighborArray.rwlock.readLock().lock(); + NeighborArray.contentionTime.addAndGet(System.nanoTime() - start); + if (nodeBuffer == null || nodeBuffer.length < neighborArray.size()) { + nodeBuffer = new int[neighborArray.size()]; + } + size = neighborArray.size(); + if (size >= 0) System.arraycopy(neighborArray.node, 0, nodeBuffer, 0, size); + neighborArray.rwlock.readLock().unlock(); Review Comment: it could be interesting to measure array copy time as well? I suspect it's not that great, but if it is comparable to contention time we might consider a different tradeoff. I'm also curious to know how contention varies with graph size - I suspect it gets proportionally less as the graph gets bigger ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); + + public HnswConcurrentMergeBuilder( + ExecutorService exec, + int numWorker, + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + OnHeapHnswGraph hnsw, + BitSet initializedNodes) + throws IOException { + this.exec = exec; + AtomicInteger workProgress = new AtomicInteger(0); + workers = new ConcurrentMergeWorker[numWorker]; + for (int i = 0; i < numWorker; i++) { + workers[i] = + new ConcurrentMergeWorker( + scorerSupplier.copy(), + M, + beamWidth, + HnswGraphBuilder.randSeed, + hnsw, + initializedNodes, + workProgress); + } + } + + @Override + public OnHeapHnswGraph build(int maxOrd) throws IOException { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message( + HNSW_COMPONENT, + "build graph from " + maxOrd + " vectors, with " + workers.length + " workers"); + } + List<Future<?>> futures = new ArrayList<>(); + for (int i = 0; i < workers.length; i++) { + int finalI = i; + futures.add( + exec.submit( + () -> { + try { + workers[finalI].run(maxOrd); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + Throwable exc = null; + for (Future<?> future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + var newException = new ThreadInterruptedException(e); + if (exc == null) { + exc = newException; + } else { + exc.addSuppressed(newException); + } + } catch (ExecutionException e) { + if (exc == null) { + exc = e.getCause(); + } else { + exc.addSuppressed(e.getCause()); + } + } + } + if (exc != null) { + // The error handling was copied from TaskExecutor. should we just use TaskExecutor instead? + throw IOUtils.rethrowAlways(exc); + } + return workers[0].getGraph(); + } + + @Override + public void addGraphNode(int node) throws IOException { + throw new UnsupportedOperationException("This builder is for merge only"); Review Comment: maybe someday this proves useful? But +100 to focus on merging that is where the most pain is ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; - for (int node = 0; node < maxOrd; node++) { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); + } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + + // initializedNodes.size()); + for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + + // hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) Review Comment: is this causing warnings? Is there a generic method in here? ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; - for (int node = 0; node < maxOrd; node++) { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); + } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + + // initializedNodes.size()); + for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + + // hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { + addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { + /* + Note: this implementation is thread safe when graph size is fixed (e.g. when merging) + The process of adding a node is roughly: + 1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done + 2. Do the search from top to bottom, remember all the possible neighbours on each level the node Review Comment: In comments below we say "bottom up" but I think this top down is more in line with the impl since we start at the higher numbered level and finish with level 0 ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; - for (int node = 0; node < maxOrd; node++) { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); + } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + + // initializedNodes.size()); + for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + + // hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { + addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { + /* + Note: this implementation is thread safe when graph size is fixed (e.g. when merging) + The process of adding a node is roughly: + 1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done + 2. Do the search from top to bottom, remember all the possible neighbours on each level the node + is on. + 3. Add the neighbor to the node from bottom to top level, when adding the neighbour, + we always add all the outgoing links first before adding incoming link such that + when a search visiting this node, it can always find a way out Review Comment: grammar: "when a search visits" ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; - for (int node = 0; node < maxOrd; node++) { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); + } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + + // initializedNodes.size()); + for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + + // hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { + addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { + /* + Note: this implementation is thread safe when graph size is fixed (e.g. when merging) + The process of adding a node is roughly: + 1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done + 2. Do the search from top to bottom, remember all the possible neighbours on each level the node + is on. + 3. Add the neighbor to the node from bottom to top level, when adding the neighbour, + we always add all the outgoing links first before adding incoming link such that + when a search visiting this node, it can always find a way out + 4. If the node has level that is less or equal to graph level, then we're done here. + If the node has level larger than graph level, then we need to promote the node + as the entry node. If, while we add the node to the graph, the entry node has changed + (which means the graph level has changed as well), we need to reinsert the node + to the newly introduced levels (repeating step 2,3 for new levels) and again try to + promote the node to entry node. + */ RandomVectorScorer scorer = scorerSupplier.scorer(node); final int nodeLevel = getRandomGraphLevel(ml, random); - int curMaxLevel = hnsw.numLevels() - 1; - - // If entrynode is -1, then this should finish without adding neighbors - if (hnsw.entryNode() == -1) { - for (int level = nodeLevel; level >= 0; level--) { - hnsw.addNode(level, node); - } + // first add nodes to all levels + for (int level = nodeLevel; level >= 0; level--) { + hnsw.addNode(level, node); + } + // then promote itself as entry node if entry node is not set + if (hnsw.trySetNewEntryNode(node, nodeLevel)) { return; } - int[] eps = new int[] {hnsw.entryNode()}; + // if the entry node is already set, then we have to do all connections first before we can + // promote ourselves as entry node + // do connections from bottom up Review Comment: we call this top down in the comments above, and I think we usually think of level 0 as the bottom? ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -221,34 +292,39 @@ private long printGraphBuildStatus(int node, long start, long t) { return now; } - private void addDiverseNeighbors(int level, int node, GraphBuilderKnnCollector candidates) - throws IOException { + private void addDiverseNeighbors(int level, int node, NeighborArray scratch) throws IOException { /* For each of the beamWidth nearest candidates (going from best to worst), select it only if it * is closer to target than it is to any of the already-selected neighbors (ie selected in this method, * since the node is new and has no prior neighbors). */ NeighborArray neighbors = hnsw.getNeighbors(level, node); assert neighbors.size() == 0; // new node - popToScratch(candidates); int maxConnOnLevel = level == 0 ? M * 2 : M; - selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel); + boolean[] mask = selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel); // Link the selected nodes to the new node, and the new node to the selected nodes (again // applying diversity heuristic) - int size = neighbors.size(); - for (int i = 0; i < size; i++) { - int nbr = neighbors.node[i]; + for (int i = 0; i < scratch.size(); i++) { + if (mask[i] == false) { Review Comment: could you add a comment explaining the need for mask? Are we changing the linking criteria here, or is this somehow to do with making this threadsafe? ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; - for (int node = 0; node < maxOrd; node++) { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); + } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + + // initializedNodes.size()); + for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + + // hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { + addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { + /* + Note: this implementation is thread safe when graph size is fixed (e.g. when merging) + The process of adding a node is roughly: + 1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done Review Comment: (unless the graph is empty and this is the first node) ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; - for (int node = 0; node < maxOrd; node++) { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); + } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + + // initializedNodes.size()); + for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + + // hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { + addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { + /* + Note: this implementation is thread safe when graph size is fixed (e.g. when merging) + The process of adding a node is roughly: + 1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done + 2. Do the search from top to bottom, remember all the possible neighbours on each level the node + is on. + 3. Add the neighbor to the node from bottom to top level, when adding the neighbour, + we always add all the outgoing links first before adding incoming link such that + when a search visiting this node, it can always find a way out + 4. If the node has level that is less or equal to graph level, then we're done here. + If the node has level larger than graph level, then we need to promote the node + as the entry node. If, while we add the node to the graph, the entry node has changed + (which means the graph level has changed as well), we need to reinsert the node + to the newly introduced levels (repeating step 2,3 for new levels) and again try to + promote the node to entry node. + */ RandomVectorScorer scorer = scorerSupplier.scorer(node); final int nodeLevel = getRandomGraphLevel(ml, random); - int curMaxLevel = hnsw.numLevels() - 1; - - // If entrynode is -1, then this should finish without adding neighbors - if (hnsw.entryNode() == -1) { - for (int level = nodeLevel; level >= 0; level--) { - hnsw.addNode(level, node); - } + // first add nodes to all levels + for (int level = nodeLevel; level >= 0; level--) { + hnsw.addNode(level, node); + } + // then promote itself as entry node if entry node is not set + if (hnsw.trySetNewEntryNode(node, nodeLevel)) { return; } - int[] eps = new int[] {hnsw.entryNode()}; + // if the entry node is already set, then we have to do all connections first before we can + // promote ourselves as entry node + // do connections from bottom up + int lowestUnsetLevel = 0; + int curMaxLevel; + do { + curMaxLevel = hnsw.numLevels() - 1; + // NOTE: the entry node and max level may not be paired, but because we get the level first + // we ensure that the entry node we get later will always exist on the curMaxLevel + int[] eps = new int[] {hnsw.entryNode()}; + // for levels > nodeLevel search with topk = 1 + GraphBuilderKnnCollector candidates = entryCandidates; + for (int level = curMaxLevel; level > nodeLevel; level--) { + candidates.clear(); + graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); + eps = new int[] {candidates.popNode()}; + } - // if a node introduces new levels to the graph, add this new node on new levels - for (int level = nodeLevel; level > curMaxLevel; level--) { - hnsw.addNode(level, node); - } + // for levels <= nodeLevel search with topk = beamWidth, and add connections + candidates = beamCandidates; + NeighborArray[] scratchPerLevel = + new NeighborArray[Math.min(nodeLevel, curMaxLevel) - lowestUnsetLevel + 1]; + for (int i = scratchPerLevel.length - 1; i >= 0; i--) { + int level = i + lowestUnsetLevel; + candidates.clear(); + graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); + eps = candidates.popUntilNearestKNodes(); + scratchPerLevel[i] = new NeighborArray(Math.max(beamCandidates.k(), M + 1), false); + popToScratch(candidates, scratchPerLevel[i]); + } - // for levels > nodeLevel search with topk = 1 - GraphBuilderKnnCollector candidates = entryCandidates; - for (int level = curMaxLevel; level > nodeLevel; level--) { - candidates.clear(); - graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); - eps = new int[] {candidates.popNode()}; - } - // for levels <= nodeLevel search with topk = beamWidth, and add connections - candidates = beamCandidates; - for (int level = Math.min(nodeLevel, curMaxLevel); level >= 0; level--) { - candidates.clear(); - graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); - eps = candidates.popUntilNearestKNodes(); - hnsw.addNode(level, node); - addDiverseNeighbors(level, node, candidates); - } + for (int i = 0; i < scratchPerLevel.length; i++) { + addDiverseNeighbors(i + lowestUnsetLevel, node, scratchPerLevel[i]); + } + lowestUnsetLevel = scratchPerLevel.length + lowestUnsetLevel; Review Comment: `+=`? ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; - for (int node = 0; node < maxOrd; node++) { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); + } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + + // initializedNodes.size()); + for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + + // hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { + addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { + /* + Note: this implementation is thread safe when graph size is fixed (e.g. when merging) + The process of adding a node is roughly: + 1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done + 2. Do the search from top to bottom, remember all the possible neighbours on each level the node + is on. + 3. Add the neighbor to the node from bottom to top level, when adding the neighbour, + we always add all the outgoing links first before adding incoming link such that + when a search visiting this node, it can always find a way out + 4. If the node has level that is less or equal to graph level, then we're done here. + If the node has level larger than graph level, then we need to promote the node + as the entry node. If, while we add the node to the graph, the entry node has changed + (which means the graph level has changed as well), we need to reinsert the node + to the newly introduced levels (repeating step 2,3 for new levels) and again try to + promote the node to entry node. + */ RandomVectorScorer scorer = scorerSupplier.scorer(node); final int nodeLevel = getRandomGraphLevel(ml, random); - int curMaxLevel = hnsw.numLevels() - 1; - - // If entrynode is -1, then this should finish without adding neighbors - if (hnsw.entryNode() == -1) { - for (int level = nodeLevel; level >= 0; level--) { - hnsw.addNode(level, node); - } + // first add nodes to all levels + for (int level = nodeLevel; level >= 0; level--) { + hnsw.addNode(level, node); + } + // then promote itself as entry node if entry node is not set + if (hnsw.trySetNewEntryNode(node, nodeLevel)) { return; } - int[] eps = new int[] {hnsw.entryNode()}; + // if the entry node is already set, then we have to do all connections first before we can + // promote ourselves as entry node + // do connections from bottom up + int lowestUnsetLevel = 0; + int curMaxLevel; + do { + curMaxLevel = hnsw.numLevels() - 1; + // NOTE: the entry node and max level may not be paired, but because we get the level first + // we ensure that the entry node we get later will always exist on the curMaxLevel + int[] eps = new int[] {hnsw.entryNode()}; + // for levels > nodeLevel search with topk = 1 + GraphBuilderKnnCollector candidates = entryCandidates; + for (int level = curMaxLevel; level > nodeLevel; level--) { + candidates.clear(); + graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); + eps = new int[] {candidates.popNode()}; + } - // if a node introduces new levels to the graph, add this new node on new levels - for (int level = nodeLevel; level > curMaxLevel; level--) { - hnsw.addNode(level, node); - } + // for levels <= nodeLevel search with topk = beamWidth, and add connections + candidates = beamCandidates; + NeighborArray[] scratchPerLevel = + new NeighborArray[Math.min(nodeLevel, curMaxLevel) - lowestUnsetLevel + 1]; + for (int i = scratchPerLevel.length - 1; i >= 0; i--) { + int level = i + lowestUnsetLevel; + candidates.clear(); + graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); + eps = candidates.popUntilNearestKNodes(); + scratchPerLevel[i] = new NeighborArray(Math.max(beamCandidates.k(), M + 1), false); + popToScratch(candidates, scratchPerLevel[i]); + } - // for levels > nodeLevel search with topk = 1 - GraphBuilderKnnCollector candidates = entryCandidates; - for (int level = curMaxLevel; level > nodeLevel; level--) { - candidates.clear(); - graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); - eps = new int[] {candidates.popNode()}; - } - // for levels <= nodeLevel search with topk = beamWidth, and add connections - candidates = beamCandidates; - for (int level = Math.min(nodeLevel, curMaxLevel); level >= 0; level--) { - candidates.clear(); - graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); - eps = candidates.popUntilNearestKNodes(); - hnsw.addNode(level, node); - addDiverseNeighbors(level, node, candidates); - } + for (int i = 0; i < scratchPerLevel.length; i++) { + addDiverseNeighbors(i + lowestUnsetLevel, node, scratchPerLevel[i]); + } + lowestUnsetLevel = scratchPerLevel.length + lowestUnsetLevel; + assert lowestUnsetLevel == Math.min(nodeLevel, curMaxLevel) + 1; + if (lowestUnsetLevel > nodeLevel) { + return; + } + assert lowestUnsetLevel == curMaxLevel + 1 && nodeLevel > curMaxLevel; + if (hnsw.tryPromoteNewEntryNode(node, nodeLevel, curMaxLevel)) { + return; + } + if (hnsw.numLevels() == curMaxLevel + 1) { + throw new IllegalStateException( Review Comment: this is a sanity check and should really never happen, right? ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -221,34 +292,39 @@ private long printGraphBuildStatus(int node, long start, long t) { return now; } - private void addDiverseNeighbors(int level, int node, GraphBuilderKnnCollector candidates) - throws IOException { + private void addDiverseNeighbors(int level, int node, NeighborArray scratch) throws IOException { /* For each of the beamWidth nearest candidates (going from best to worst), select it only if it * is closer to target than it is to any of the already-selected neighbors (ie selected in this method, * since the node is new and has no prior neighbors). */ NeighborArray neighbors = hnsw.getNeighbors(level, node); assert neighbors.size() == 0; // new node - popToScratch(candidates); int maxConnOnLevel = level == 0 ? M * 2 : M; - selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel); + boolean[] mask = selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel); // Link the selected nodes to the new node, and the new node to the selected nodes (again // applying diversity heuristic) - int size = neighbors.size(); - for (int i = 0; i < size; i++) { - int nbr = neighbors.node[i]; + for (int i = 0; i < scratch.size(); i++) { + if (mask[i] == false) { + continue; + } + int nbr = scratch.node[i]; NeighborArray nbrsOfNbr = hnsw.getNeighbors(level, nbr); - nbrsOfNbr.addOutOfOrder(node, neighbors.score[i]); + long start = System.nanoTime(); + nbrsOfNbr.rwlock.writeLock().lock(); + NeighborArray.contentionTime.addAndGet(System.nanoTime() - start); + nbrsOfNbr.addOutOfOrder(node, scratch.score[i]); if (nbrsOfNbr.size() > maxConnOnLevel) { int indexToRemove = findWorstNonDiverse(nbrsOfNbr, nbr); nbrsOfNbr.removeIndex(indexToRemove); } + nbrsOfNbr.rwlock.writeLock().unlock(); Review Comment: We should use try/finally to release the lock because in theory at least findWorstNonDiverse can throw an IOExc and we could end up never releasing the lock? Probably we should check the read locks similarly ########## lucene/core/src/java/org/apache/lucene/util/hnsw/OnHeapHnswGraph.java: ########## @@ -185,7 +187,41 @@ public int numLevels() { */ @Override public int entryNode() { - return entryNode; + return entryNode.get().node; + } + + /** + * Try to set the entry node if the graph does not have one + * + * @return True if the entry node is set to the provided node. False if the entry node is already Review Comment: grammar nit: "if the entry node already exists" ########## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ########## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; - for (int node = 0; node < maxOrd; node++) { + if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); + } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + + // initializedNodes.size()); + for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } + // System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + + // hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { + addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { + /* + Note: this implementation is thread safe when graph size is fixed (e.g. when merging) + The process of adding a node is roughly: + 1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done + 2. Do the search from top to bottom, remember all the possible neighbours on each level the node + is on. + 3. Add the neighbor to the node from bottom to top level, when adding the neighbour, + we always add all the outgoing links first before adding incoming link such that + when a search visiting this node, it can always find a way out + 4. If the node has level that is less or equal to graph level, then we're done here. + If the node has level larger than graph level, then we need to promote the node + as the entry node. If, while we add the node to the graph, the entry node has changed + (which means the graph level has changed as well), we need to reinsert the node + to the newly introduced levels (repeating step 2,3 for new levels) and again try to + promote the node to entry node. + */ RandomVectorScorer scorer = scorerSupplier.scorer(node); final int nodeLevel = getRandomGraphLevel(ml, random); - int curMaxLevel = hnsw.numLevels() - 1; - - // If entrynode is -1, then this should finish without adding neighbors - if (hnsw.entryNode() == -1) { - for (int level = nodeLevel; level >= 0; level--) { - hnsw.addNode(level, node); - } + // first add nodes to all levels + for (int level = nodeLevel; level >= 0; level--) { + hnsw.addNode(level, node); + } + // then promote itself as entry node if entry node is not set + if (hnsw.trySetNewEntryNode(node, nodeLevel)) { return; } - int[] eps = new int[] {hnsw.entryNode()}; + // if the entry node is already set, then we have to do all connections first before we can + // promote ourselves as entry node + // do connections from bottom up + int lowestUnsetLevel = 0; + int curMaxLevel; + do { + curMaxLevel = hnsw.numLevels() - 1; + // NOTE: the entry node and max level may not be paired, but because we get the level first + // we ensure that the entry node we get later will always exist on the curMaxLevel + int[] eps = new int[] {hnsw.entryNode()}; + // for levels > nodeLevel search with topk = 1 + GraphBuilderKnnCollector candidates = entryCandidates; + for (int level = curMaxLevel; level > nodeLevel; level--) { + candidates.clear(); + graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); + eps = new int[] {candidates.popNode()}; Review Comment: why do we create a `new int[]` instead of updating the one we have? This irked me in the existing code as well ########## lucene/core/src/test/org/apache/lucene/util/hnsw/HnswGraphTestCase.java: ########## @@ -709,6 +710,7 @@ public void testHnswGraphBuilderInvalid() throws IOException { IllegalArgumentException.class, () -> HnswGraphBuilder.create(scorerSupplier, 10, 0, 0)); } + @Ignore Review Comment: what's the plan for this? ########## lucene/core/src/java/org/apache/lucene/util/hnsw/IncrementalHnswGraphMerger.java: ########## @@ -32,22 +32,23 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.InfoStream; /** * This selects the biggest Hnsw graph from the provided merge state and initializes a new * HnswGraphBuilder with that graph as a starting point. * * @lucene.experimental */ -public class IncrementalHnswGraphMerger { +public class IncrementalHnswGraphMerger implements HnswGraphMerger { - private KnnVectorsReader initReader; - private MergeState.DocMap initDocMap; - private int initGraphSize; - private final FieldInfo fieldInfo; - private final RandomVectorScorerSupplier scorerSupplier; - private final int M; - private final int beamWidth; + protected KnnVectorsReader initReader; + protected MergeState.DocMap initDocMap; + protected int initGraphSize; + protected final FieldInfo fieldInfo; Review Comment: nit: can we move the final ones before the mutable ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org