msokolov commented on code in PR #12660:
URL: https://github.com/apache/lucene/pull/12660#discussion_r1367900707


##########
lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java:
##########
@@ -635,17 +667,31 @@ private static DocsWithFieldSet writeVectorData(
     return docsWithField;
   }
 
+  private HnswGraphMerger createGraphMerger(
+      FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) {
+    if (exec != null) {
+      return new ConcurrentHnswMerger(
+          fieldInfo, scorerSupplier, M, beamWidth, exec, numMergeWorkers);
+    }
+    return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, 
beamWidth);
+  }
+
   @Override
   public void close() throws IOException {
     IOUtils.close(meta, vectorData, vectorIndex);
+    if (exec != null) {
+      exec.shutdownNow();
+    }
+    System.out.println(
+        "Total contention time: " + NeighborArray.contentionTime.get() / 
1000000 + " ms");

Review Comment:
   curious what this showed - can you publish some accounting?



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.lucene.util.BitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * A graph builder that manages multiple workers, it only supports adding the 
whole graph all at
+ * once. It will spawn a thread for each worker and the workers will pick the 
work in batches.
+ */
+public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder {
+
+  private static final int BATCH_SIZE = 2048;

Review Comment:
   maybe comment this? I guess it's the number of vectors we handle 
sequentially in each task?



##########
lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java:
##########
@@ -26,17 +26,39 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.codecs.KnnFieldVectorsWriter;
 import org.apache.lucene.codecs.KnnVectorsWriter;
-import org.apache.lucene.index.*;

Review Comment:
   I guess spotless likes these *-imports?? surprising



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.lucene.util.BitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * A graph builder that manages multiple workers, it only supports adding the 
whole graph all at
+ * once. It will spawn a thread for each worker and the workers will pick the 
work in batches.
+ */
+public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder {
+
+  private static final int BATCH_SIZE = 2048;
+
+  private final ExecutorService exec;
+  private final ConcurrentMergeWorker[] workers;
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  public HnswConcurrentMergeBuilder(
+      ExecutorService exec,
+      int numWorker,
+      RandomVectorScorerSupplier scorerSupplier,
+      int M,
+      int beamWidth,
+      OnHeapHnswGraph hnsw,
+      BitSet initializedNodes)
+      throws IOException {
+    this.exec = exec;
+    AtomicInteger workProgress = new AtomicInteger(0);
+    workers = new ConcurrentMergeWorker[numWorker];
+    for (int i = 0; i < numWorker; i++) {
+      workers[i] =
+          new ConcurrentMergeWorker(
+              scorerSupplier.copy(),
+              M,
+              beamWidth,
+              HnswGraphBuilder.randSeed,
+              hnsw,
+              initializedNodes,
+              workProgress);
+    }
+  }
+
+  @Override
+  public OnHeapHnswGraph build(int maxOrd) throws IOException {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(
+          HNSW_COMPONENT,
+          "build graph from " + maxOrd + " vectors, with " + workers.length + 
" workers");
+    }
+    List<Future<?>> futures = new ArrayList<>();
+    for (int i = 0; i < workers.length; i++) {
+      int finalI = i;
+      futures.add(
+          exec.submit(
+              () -> {
+                try {
+                  workers[finalI].run(maxOrd);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              }));
+    }
+    Throwable exc = null;
+    for (Future<?> future : futures) {
+      try {
+        future.get();
+      } catch (InterruptedException e) {
+        var newException = new ThreadInterruptedException(e);
+        if (exc == null) {
+          exc = newException;
+        } else {
+          exc.addSuppressed(newException);
+        }
+      } catch (ExecutionException e) {
+        if (exc == null) {
+          exc = e.getCause();
+        } else {
+          exc.addSuppressed(e.getCause());
+        }
+      }
+    }
+    if (exc != null) {
+      // The error handling was copied from TaskExecutor. should we just use 
TaskExecutor instead?
+      throw IOUtils.rethrowAlways(exc);
+    }
+    return workers[0].getGraph();
+  }
+
+  @Override
+  public void addGraphNode(int node) throws IOException {
+    throw new UnsupportedOperationException("This builder is for merge only");
+  }
+
+  @Override
+  public void setInfoStream(InfoStream infoStream) {
+    this.infoStream = infoStream;
+    for (IHnswGraphBuilder worker : workers) {
+      worker.setInfoStream(infoStream);
+    }
+  }
+
+  @Override
+  public OnHeapHnswGraph getGraph() {
+    return workers[0].getGraph();
+  }
+
+  private static final class ConcurrentMergeWorker extends HnswGraphBuilder {
+
+    private final AtomicInteger workProgress;
+    private final BitSet initializedNodes;
+
+    private ConcurrentMergeWorker(
+        RandomVectorScorerSupplier scorerSupplier,
+        int M,
+        int beamWidth,
+        long seed,
+        OnHeapHnswGraph hnsw,
+        BitSet initializedNodes,
+        AtomicInteger workProgress)
+        throws IOException {
+      super(
+          scorerSupplier,
+          M,
+          beamWidth,
+          seed,
+          hnsw,
+          new MergeSearcher(
+              new NeighborQueue(beamWidth, true), new 
FixedBitSet(hnsw.maxNodeId() + 1)));
+      this.workProgress = workProgress;
+      this.initializedNodes = initializedNodes;
+    }
+
+    private void run(int maxOrd) throws IOException {

Review Comment:
   This seems a bit complex - wouldn't it be simpler to divide the ords "up 
front" in the control loop? 



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws 
IOException {
     return hnsw;
   }
 
-  /** Set info-stream to output debugging information * */
+  @Override
   public void setInfoStream(InfoStream infoStream) {
     this.infoStream = infoStream;
   }
 
+  @Override
   public OnHeapHnswGraph getGraph() {
     return hnsw;
   }
 
-  private void addVectors(int maxOrd) throws IOException {
+  protected void addVectors(int minOrd, int maxOrd) throws IOException {
     long start = System.nanoTime(), t = start;
-    for (int node = 0; node < maxOrd; node++) {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + 
maxOrd + ")");
+    }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") 
initialized.size=" +
+    // initializedNodes.size());
+    for (int node = minOrd; node < maxOrd; node++) {
+      // System.out.println("add node " + node + " t=" + 
Thread.currentThread().getName());
       addGraphNode(node);
+      // System.out.println("entry node " + hnsw.entryNode());
+      // System.out.println("node " + node + " nbrs.size()=" + 
hnsw.getNeighbors(0, node).size());
       if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
         t = printGraphBuildStatus(node, start, t);
       }
     }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done 
+ graph.size=" +
+    // hnsw.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void addVectors(int maxOrd) throws IOException {
+    addVectors(0, maxOrd);
   }
 
-  /** Inserts a doc with vector value to the graph */
+  @Override
   public void addGraphNode(int node) throws IOException {
+    /*
+    Note: this implementation is thread safe when graph size is fixed (e.g. 
when merging)
+    The process of adding a node is roughly:
+    1. Add the node to all level from top to the bottom, but do not connect it 
to any other node,
+       nor try to promote itself to an entry node before the connection is done
+    2. Do the search from top to bottom, remember all the possible neighbours 
on each level the node
+       is on.
+    3. Add the neighbor to the node from bottom to top level, when adding the 
neighbour,
+       we always add all the outgoing links first before adding incoming link 
such that
+       when a search visiting this node, it can always find a way out
+    4. If the node has level that is less or equal to graph level, then we're 
done here.
+       If the node has level larger than graph level, then we need to promote 
the node
+       as the entry node. If, while we add the node to the graph, the entry 
node has changed
+       (which means the graph level has changed as well), we need to reinsert 
the node
+       to the newly introduced levels (repeating step 2,3 for new levels) and 
again try to
+       promote the node to entry node.
+    */
     RandomVectorScorer scorer = scorerSupplier.scorer(node);
     final int nodeLevel = getRandomGraphLevel(ml, random);
-    int curMaxLevel = hnsw.numLevels() - 1;
-
-    // If entrynode is -1, then this should finish without adding neighbors
-    if (hnsw.entryNode() == -1) {
-      for (int level = nodeLevel; level >= 0; level--) {
-        hnsw.addNode(level, node);
-      }
+    // first add nodes to all levels
+    for (int level = nodeLevel; level >= 0; level--) {
+      hnsw.addNode(level, node);
+    }
+    // then promote itself as entry node if entry node is not set
+    if (hnsw.trySetNewEntryNode(node, nodeLevel)) {
       return;
     }
-    int[] eps = new int[] {hnsw.entryNode()};
+    // if the entry node is already set, then we have to do all connections 
first before we can
+    // promote ourselves as entry node
+    // do connections from bottom up
+    int lowestUnsetLevel = 0;
+    int curMaxLevel;
+    do {
+      curMaxLevel = hnsw.numLevels() - 1;
+      // NOTE: the entry node and max level may not be paired, but because we 
get the level first
+      // we ensure that the entry node we get later will always exist on the 
curMaxLevel
+      int[] eps = new int[] {hnsw.entryNode()};
+      // for levels > nodeLevel search with topk = 1
+      GraphBuilderKnnCollector candidates = entryCandidates;
+      for (int level = curMaxLevel; level > nodeLevel; level--) {
+        candidates.clear();
+        graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
+        eps = new int[] {candidates.popNode()};
+      }
 
-    // if a node introduces new levels to the graph, add this new node on new 
levels
-    for (int level = nodeLevel; level > curMaxLevel; level--) {
-      hnsw.addNode(level, node);
-    }
+      // for levels <= nodeLevel search with topk = beamWidth, and add 
connections
+      candidates = beamCandidates;
+      NeighborArray[] scratchPerLevel =
+          new NeighborArray[Math.min(nodeLevel, curMaxLevel) - 
lowestUnsetLevel + 1];
+      for (int i = scratchPerLevel.length - 1; i >= 0; i--) {
+        int level = i + lowestUnsetLevel;
+        candidates.clear();
+        graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
+        eps = candidates.popUntilNearestKNodes();
+        scratchPerLevel[i] = new NeighborArray(Math.max(beamCandidates.k(), M 
+ 1), false);
+        popToScratch(candidates, scratchPerLevel[i]);
+      }
 
-    // for levels > nodeLevel search with topk = 1
-    GraphBuilderKnnCollector candidates = entryCandidates;
-    for (int level = curMaxLevel; level > nodeLevel; level--) {
-      candidates.clear();
-      graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
-      eps = new int[] {candidates.popNode()};
-    }
-    // for levels <= nodeLevel search with topk = beamWidth, and add 
connections
-    candidates = beamCandidates;
-    for (int level = Math.min(nodeLevel, curMaxLevel); level >= 0; level--) {
-      candidates.clear();
-      graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
-      eps = candidates.popUntilNearestKNodes();
-      hnsw.addNode(level, node);
-      addDiverseNeighbors(level, node, candidates);
-    }
+      for (int i = 0; i < scratchPerLevel.length; i++) {

Review Comment:
   hmm well we searched from top down and then add the nodes from bottom up!



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphMerger.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.util.hnsw;
+
+import java.io.IOException;
+import org.apache.lucene.codecs.KnnVectorsReader;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.InfoStream;
+
+/**
+ * Abstraction of merging multiple sealed graph into one on heap graph

Review Comment:
   I'm not sure what "sealed" means here - I guess these are accessed 
read-only, but that's always the case with merge operations.



##########
lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java:
##########
@@ -50,13 +72,24 @@ public final class Lucene95HnswVectorsWriter extends 
KnnVectorsWriter {
   private final IndexOutput meta, vectorData, vectorIndex;
   private final int M;
   private final int beamWidth;
+  private final int numMergeWorkers;
+  private final ExecutorService exec;
 
   private final List<FieldWriter<?>> fields = new ArrayList<>();
   private boolean finished;
 
-  Lucene95HnswVectorsWriter(SegmentWriteState state, int M, int beamWidth) 
throws IOException {
+  Lucene95HnswVectorsWriter(SegmentWriteState state, int M, int beamWidth, int 
numMergeWorkers)
+      throws IOException {
     this.M = M;
     this.beamWidth = beamWidth;
+    this.numMergeWorkers = numMergeWorkers;
+    if (numMergeWorkers <= 1) {
+      exec = null;
+    } else {
+      exec =
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(), new 
NamedThreadFactory("hnsw-merge"));

Review Comment:
   shouldn't this be allocating `numMergeWorkers` threads here?



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.lucene.util.BitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * A graph builder that manages multiple workers, it only supports adding the 
whole graph all at
+ * once. It will spawn a thread for each worker and the workers will pick the 
work in batches.
+ */
+public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder {
+
+  private static final int BATCH_SIZE = 2048;
+
+  private final ExecutorService exec;
+  private final ConcurrentMergeWorker[] workers;
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  public HnswConcurrentMergeBuilder(
+      ExecutorService exec,
+      int numWorker,
+      RandomVectorScorerSupplier scorerSupplier,
+      int M,
+      int beamWidth,
+      OnHeapHnswGraph hnsw,
+      BitSet initializedNodes)
+      throws IOException {
+    this.exec = exec;
+    AtomicInteger workProgress = new AtomicInteger(0);
+    workers = new ConcurrentMergeWorker[numWorker];
+    for (int i = 0; i < numWorker; i++) {
+      workers[i] =
+          new ConcurrentMergeWorker(
+              scorerSupplier.copy(),
+              M,
+              beamWidth,
+              HnswGraphBuilder.randSeed,

Review Comment:
   I wonder if we ought to generate different seeds for each worker although 
probably it makes no difference. In any case we are not going to produce a 
deterministic graph.



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.lucene.util.BitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * A graph builder that manages multiple workers, it only supports adding the 
whole graph all at
+ * once. It will spawn a thread for each worker and the workers will pick the 
work in batches.
+ */
+public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder {
+
+  private static final int BATCH_SIZE = 2048;
+
+  private final ExecutorService exec;
+  private final ConcurrentMergeWorker[] workers;
+  private InfoStream infoStream = InfoStream.getDefault();

Review Comment:
   should this be the segmentState.infoStream? I'm a little unclear about how 
InfoStream works



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws 
IOException {
     return hnsw;
   }
 
-  /** Set info-stream to output debugging information * */
+  @Override
   public void setInfoStream(InfoStream infoStream) {
     this.infoStream = infoStream;
   }
 
+  @Override
   public OnHeapHnswGraph getGraph() {
     return hnsw;
   }
 
-  private void addVectors(int maxOrd) throws IOException {
+  protected void addVectors(int minOrd, int maxOrd) throws IOException {
     long start = System.nanoTime(), t = start;
-    for (int node = 0; node < maxOrd; node++) {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + 
maxOrd + ")");
+    }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") 
initialized.size=" +
+    // initializedNodes.size());
+    for (int node = minOrd; node < maxOrd; node++) {
+      // System.out.println("add node " + node + " t=" + 
Thread.currentThread().getName());
       addGraphNode(node);
+      // System.out.println("entry node " + hnsw.entryNode());
+      // System.out.println("node " + node + " nbrs.size()=" + 
hnsw.getNeighbors(0, node).size());
       if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
         t = printGraphBuildStatus(node, start, t);
       }
     }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done 
+ graph.size=" +
+    // hnsw.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void addVectors(int maxOrd) throws IOException {
+    addVectors(0, maxOrd);
   }
 
-  /** Inserts a doc with vector value to the graph */
+  @Override
   public void addGraphNode(int node) throws IOException {
+    /*
+    Note: this implementation is thread safe when graph size is fixed (e.g. 
when merging)
+    The process of adding a node is roughly:
+    1. Add the node to all level from top to the bottom, but do not connect it 
to any other node,
+       nor try to promote itself to an entry node before the connection is done
+    2. Do the search from top to bottom, remember all the possible neighbours 
on each level the node
+       is on.
+    3. Add the neighbor to the node from bottom to top level, when adding the 
neighbour,
+       we always add all the outgoing links first before adding incoming link 
such that
+       when a search visiting this node, it can always find a way out
+    4. If the node has level that is less or equal to graph level, then we're 
done here.
+       If the node has level larger than graph level, then we need to promote 
the node
+       as the entry node. If, while we add the node to the graph, the entry 
node has changed
+       (which means the graph level has changed as well), we need to reinsert 
the node
+       to the newly introduced levels (repeating step 2,3 for new levels) and 
again try to
+       promote the node to entry node.
+    */

Review Comment:
   Thanks for the comments, this is a big help to understanding



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -33,7 +33,7 @@
  * Builder for HNSW graph. See {@link HnswGraph} for a gloss on the algorithm 
and the meaning of the
  * hyper-parameters.
  */
-public class HnswGraphBuilder {
+public class HnswGraphBuilder implements IHnswGraphBuilder {

Review Comment:
   I'll just note that we don't seem to use the `ICLassName` naming convention 
for interfaces elsewhere in this code base. I think I'd prefer to use 
`HnswGraphBuilder` for the interface name and rename this class to 
`SequentialHnswMerger` or so. I'm also unsure whether an interface is the right 
choice or we should use an abstract base class. It used to be an abstract base 
is more flexible since you can add methods to it in a backwards-compatible way. 
Although maybe with default methods that is no longer a concern?



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.lucene.util.BitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * A graph builder that manages multiple workers, it only supports adding the 
whole graph all at
+ * once. It will spawn a thread for each worker and the workers will pick the 
work in batches.
+ */
+public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder {
+
+  private static final int BATCH_SIZE = 2048;
+
+  private final ExecutorService exec;
+  private final ConcurrentMergeWorker[] workers;
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  public HnswConcurrentMergeBuilder(
+      ExecutorService exec,
+      int numWorker,
+      RandomVectorScorerSupplier scorerSupplier,
+      int M,
+      int beamWidth,
+      OnHeapHnswGraph hnsw,
+      BitSet initializedNodes)
+      throws IOException {
+    this.exec = exec;
+    AtomicInteger workProgress = new AtomicInteger(0);
+    workers = new ConcurrentMergeWorker[numWorker];
+    for (int i = 0; i < numWorker; i++) {
+      workers[i] =
+          new ConcurrentMergeWorker(
+              scorerSupplier.copy(),
+              M,
+              beamWidth,
+              HnswGraphBuilder.randSeed,
+              hnsw,
+              initializedNodes,
+              workProgress);
+    }
+  }
+
+  @Override
+  public OnHeapHnswGraph build(int maxOrd) throws IOException {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(
+          HNSW_COMPONENT,
+          "build graph from " + maxOrd + " vectors, with " + workers.length + 
" workers");
+    }
+    List<Future<?>> futures = new ArrayList<>();
+    for (int i = 0; i < workers.length; i++) {
+      int finalI = i;
+      futures.add(
+          exec.submit(
+              () -> {
+                try {
+                  workers[finalI].run(maxOrd);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              }));
+    }
+    Throwable exc = null;
+    for (Future<?> future : futures) {
+      try {
+        future.get();
+      } catch (InterruptedException e) {
+        var newException = new ThreadInterruptedException(e);
+        if (exc == null) {
+          exc = newException;
+        } else {
+          exc.addSuppressed(newException);
+        }
+      } catch (ExecutionException e) {
+        if (exc == null) {
+          exc = e.getCause();
+        } else {
+          exc.addSuppressed(e.getCause());
+        }
+      }
+    }
+    if (exc != null) {
+      // The error handling was copied from TaskExecutor. should we just use 
TaskExecutor instead?
+      throw IOUtils.rethrowAlways(exc);
+    }
+    return workers[0].getGraph();
+  }
+
+  @Override
+  public void addGraphNode(int node) throws IOException {
+    throw new UnsupportedOperationException("This builder is for merge only");
+  }
+
+  @Override
+  public void setInfoStream(InfoStream infoStream) {
+    this.infoStream = infoStream;
+    for (IHnswGraphBuilder worker : workers) {
+      worker.setInfoStream(infoStream);
+    }
+  }
+
+  @Override
+  public OnHeapHnswGraph getGraph() {
+    return workers[0].getGraph();
+  }
+
+  private static final class ConcurrentMergeWorker extends HnswGraphBuilder {
+
+    private final AtomicInteger workProgress;
+    private final BitSet initializedNodes;
+
+    private ConcurrentMergeWorker(
+        RandomVectorScorerSupplier scorerSupplier,
+        int M,
+        int beamWidth,
+        long seed,
+        OnHeapHnswGraph hnsw,
+        BitSet initializedNodes,
+        AtomicInteger workProgress)
+        throws IOException {
+      super(
+          scorerSupplier,
+          M,
+          beamWidth,
+          seed,
+          hnsw,
+          new MergeSearcher(
+              new NeighborQueue(beamWidth, true), new 
FixedBitSet(hnsw.maxNodeId() + 1)));
+      this.workProgress = workProgress;
+      this.initializedNodes = initializedNodes;
+    }
+
+    private void run(int maxOrd) throws IOException {
+      int start = getStartPos(maxOrd);
+      int end;
+      while (start != -1) {
+        end = Math.min(maxOrd, start + BATCH_SIZE);
+        addVectors(start, end);
+        start = getStartPos(maxOrd);
+      }
+    }
+
+    private int getStartPos(int maxOrd) {
+      int start = workProgress.get();
+      while (start < maxOrd) {
+        if (workProgress.compareAndSet(start, start + BATCH_SIZE)) {
+          break;
+        } else {
+          assert workProgress.get() > start;
+          start = workProgress.get();
+        }
+      }
+      if (start < maxOrd) {
+        return start;
+      } else {
+        return -1;
+      }
+    }
+
+    @Override
+    public void addGraphNode(int node) throws IOException {
+      if (initializedNodes != null && initializedNodes.get(node)) {
+        return;
+      }
+      super.addGraphNode(node);
+    }
+  }
+
+  /**
+   * This searcher will obtain the lock and make a copy of neighborArray when 
seeking the graph such
+   * that concurrent modification of the graph will not impact the search
+   */
+  private static class MergeSearcher extends HnswGraphSearcher {
+    private int[] nodeBuffer;
+    private int upto;
+    private int size;
+
+    private MergeSearcher(NeighborQueue candidates, BitSet visited) {
+      super(candidates, visited);
+    }
+
+    @Override
+    void graphSeek(HnswGraph graph, int level, int targetNode) {
+      NeighborArray neighborArray = ((OnHeapHnswGraph) 
graph).getNeighbors(level, targetNode);
+      long start = System.nanoTime();
+      neighborArray.rwlock.readLock().lock();
+      NeighborArray.contentionTime.addAndGet(System.nanoTime() - start);
+      if (nodeBuffer == null || nodeBuffer.length < neighborArray.size()) {
+        nodeBuffer = new int[neighborArray.size()];
+      }
+      size = neighborArray.size();
+      if (size >= 0) System.arraycopy(neighborArray.node, 0, nodeBuffer, 0, 
size);
+      neighborArray.rwlock.readLock().unlock();

Review Comment:
   it could be interesting to measure array copy time as well? I suspect it's 
not that great, but if it is comparable to contention time we might consider a 
different tradeoff. I'm also curious to know how contention varies with graph 
size - I suspect it gets proportionally less as the graph gets bigger 



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.lucene.util.BitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * A graph builder that manages multiple workers, it only supports adding the 
whole graph all at
+ * once. It will spawn a thread for each worker and the workers will pick the 
work in batches.
+ */
+public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder {
+
+  private static final int BATCH_SIZE = 2048;
+
+  private final ExecutorService exec;
+  private final ConcurrentMergeWorker[] workers;
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  public HnswConcurrentMergeBuilder(
+      ExecutorService exec,
+      int numWorker,
+      RandomVectorScorerSupplier scorerSupplier,
+      int M,
+      int beamWidth,
+      OnHeapHnswGraph hnsw,
+      BitSet initializedNodes)
+      throws IOException {
+    this.exec = exec;
+    AtomicInteger workProgress = new AtomicInteger(0);
+    workers = new ConcurrentMergeWorker[numWorker];
+    for (int i = 0; i < numWorker; i++) {
+      workers[i] =
+          new ConcurrentMergeWorker(
+              scorerSupplier.copy(),
+              M,
+              beamWidth,
+              HnswGraphBuilder.randSeed,
+              hnsw,
+              initializedNodes,
+              workProgress);
+    }
+  }
+
+  @Override
+  public OnHeapHnswGraph build(int maxOrd) throws IOException {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(
+          HNSW_COMPONENT,
+          "build graph from " + maxOrd + " vectors, with " + workers.length + 
" workers");
+    }
+    List<Future<?>> futures = new ArrayList<>();
+    for (int i = 0; i < workers.length; i++) {
+      int finalI = i;
+      futures.add(
+          exec.submit(
+              () -> {
+                try {
+                  workers[finalI].run(maxOrd);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              }));
+    }
+    Throwable exc = null;
+    for (Future<?> future : futures) {
+      try {
+        future.get();
+      } catch (InterruptedException e) {
+        var newException = new ThreadInterruptedException(e);
+        if (exc == null) {
+          exc = newException;
+        } else {
+          exc.addSuppressed(newException);
+        }
+      } catch (ExecutionException e) {
+        if (exc == null) {
+          exc = e.getCause();
+        } else {
+          exc.addSuppressed(e.getCause());
+        }
+      }
+    }
+    if (exc != null) {
+      // The error handling was copied from TaskExecutor. should we just use 
TaskExecutor instead?
+      throw IOUtils.rethrowAlways(exc);
+    }
+    return workers[0].getGraph();
+  }
+
+  @Override
+  public void addGraphNode(int node) throws IOException {
+    throw new UnsupportedOperationException("This builder is for merge only");

Review Comment:
   maybe someday this proves useful? But +100 to focus on merging that is where 
the most pain is



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws 
IOException {
     return hnsw;
   }
 
-  /** Set info-stream to output debugging information * */
+  @Override
   public void setInfoStream(InfoStream infoStream) {
     this.infoStream = infoStream;
   }
 
+  @Override
   public OnHeapHnswGraph getGraph() {
     return hnsw;
   }
 
-  private void addVectors(int maxOrd) throws IOException {
+  protected void addVectors(int minOrd, int maxOrd) throws IOException {
     long start = System.nanoTime(), t = start;
-    for (int node = 0; node < maxOrd; node++) {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + 
maxOrd + ")");
+    }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") 
initialized.size=" +
+    // initializedNodes.size());
+    for (int node = minOrd; node < maxOrd; node++) {
+      // System.out.println("add node " + node + " t=" + 
Thread.currentThread().getName());
       addGraphNode(node);
+      // System.out.println("entry node " + hnsw.entryNode());
+      // System.out.println("node " + node + " nbrs.size()=" + 
hnsw.getNeighbors(0, node).size());
       if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
         t = printGraphBuildStatus(node, start, t);
       }
     }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done 
+ graph.size=" +
+    // hnsw.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})

Review Comment:
   is this causing warnings? Is there a generic method in here?



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws 
IOException {
     return hnsw;
   }
 
-  /** Set info-stream to output debugging information * */
+  @Override
   public void setInfoStream(InfoStream infoStream) {
     this.infoStream = infoStream;
   }
 
+  @Override
   public OnHeapHnswGraph getGraph() {
     return hnsw;
   }
 
-  private void addVectors(int maxOrd) throws IOException {
+  protected void addVectors(int minOrd, int maxOrd) throws IOException {
     long start = System.nanoTime(), t = start;
-    for (int node = 0; node < maxOrd; node++) {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + 
maxOrd + ")");
+    }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") 
initialized.size=" +
+    // initializedNodes.size());
+    for (int node = minOrd; node < maxOrd; node++) {
+      // System.out.println("add node " + node + " t=" + 
Thread.currentThread().getName());
       addGraphNode(node);
+      // System.out.println("entry node " + hnsw.entryNode());
+      // System.out.println("node " + node + " nbrs.size()=" + 
hnsw.getNeighbors(0, node).size());
       if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
         t = printGraphBuildStatus(node, start, t);
       }
     }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done 
+ graph.size=" +
+    // hnsw.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void addVectors(int maxOrd) throws IOException {
+    addVectors(0, maxOrd);
   }
 
-  /** Inserts a doc with vector value to the graph */
+  @Override
   public void addGraphNode(int node) throws IOException {
+    /*
+    Note: this implementation is thread safe when graph size is fixed (e.g. 
when merging)
+    The process of adding a node is roughly:
+    1. Add the node to all level from top to the bottom, but do not connect it 
to any other node,
+       nor try to promote itself to an entry node before the connection is done
+    2. Do the search from top to bottom, remember all the possible neighbours 
on each level the node

Review Comment:
   In comments below we say "bottom up" but I think this top down is more in 
line with the impl since we start at the higher numbered level and finish with 
level 0



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws 
IOException {
     return hnsw;
   }
 
-  /** Set info-stream to output debugging information * */
+  @Override
   public void setInfoStream(InfoStream infoStream) {
     this.infoStream = infoStream;
   }
 
+  @Override
   public OnHeapHnswGraph getGraph() {
     return hnsw;
   }
 
-  private void addVectors(int maxOrd) throws IOException {
+  protected void addVectors(int minOrd, int maxOrd) throws IOException {
     long start = System.nanoTime(), t = start;
-    for (int node = 0; node < maxOrd; node++) {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + 
maxOrd + ")");
+    }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") 
initialized.size=" +
+    // initializedNodes.size());
+    for (int node = minOrd; node < maxOrd; node++) {
+      // System.out.println("add node " + node + " t=" + 
Thread.currentThread().getName());
       addGraphNode(node);
+      // System.out.println("entry node " + hnsw.entryNode());
+      // System.out.println("node " + node + " nbrs.size()=" + 
hnsw.getNeighbors(0, node).size());
       if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
         t = printGraphBuildStatus(node, start, t);
       }
     }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done 
+ graph.size=" +
+    // hnsw.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void addVectors(int maxOrd) throws IOException {
+    addVectors(0, maxOrd);
   }
 
-  /** Inserts a doc with vector value to the graph */
+  @Override
   public void addGraphNode(int node) throws IOException {
+    /*
+    Note: this implementation is thread safe when graph size is fixed (e.g. 
when merging)
+    The process of adding a node is roughly:
+    1. Add the node to all level from top to the bottom, but do not connect it 
to any other node,
+       nor try to promote itself to an entry node before the connection is done
+    2. Do the search from top to bottom, remember all the possible neighbours 
on each level the node
+       is on.
+    3. Add the neighbor to the node from bottom to top level, when adding the 
neighbour,
+       we always add all the outgoing links first before adding incoming link 
such that
+       when a search visiting this node, it can always find a way out

Review Comment:
   grammar: "when a search visits"



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws 
IOException {
     return hnsw;
   }
 
-  /** Set info-stream to output debugging information * */
+  @Override
   public void setInfoStream(InfoStream infoStream) {
     this.infoStream = infoStream;
   }
 
+  @Override
   public OnHeapHnswGraph getGraph() {
     return hnsw;
   }
 
-  private void addVectors(int maxOrd) throws IOException {
+  protected void addVectors(int minOrd, int maxOrd) throws IOException {
     long start = System.nanoTime(), t = start;
-    for (int node = 0; node < maxOrd; node++) {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + 
maxOrd + ")");
+    }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") 
initialized.size=" +
+    // initializedNodes.size());
+    for (int node = minOrd; node < maxOrd; node++) {
+      // System.out.println("add node " + node + " t=" + 
Thread.currentThread().getName());
       addGraphNode(node);
+      // System.out.println("entry node " + hnsw.entryNode());
+      // System.out.println("node " + node + " nbrs.size()=" + 
hnsw.getNeighbors(0, node).size());
       if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
         t = printGraphBuildStatus(node, start, t);
       }
     }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done 
+ graph.size=" +
+    // hnsw.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void addVectors(int maxOrd) throws IOException {
+    addVectors(0, maxOrd);
   }
 
-  /** Inserts a doc with vector value to the graph */
+  @Override
   public void addGraphNode(int node) throws IOException {
+    /*
+    Note: this implementation is thread safe when graph size is fixed (e.g. 
when merging)
+    The process of adding a node is roughly:
+    1. Add the node to all level from top to the bottom, but do not connect it 
to any other node,
+       nor try to promote itself to an entry node before the connection is done
+    2. Do the search from top to bottom, remember all the possible neighbours 
on each level the node
+       is on.
+    3. Add the neighbor to the node from bottom to top level, when adding the 
neighbour,
+       we always add all the outgoing links first before adding incoming link 
such that
+       when a search visiting this node, it can always find a way out
+    4. If the node has level that is less or equal to graph level, then we're 
done here.
+       If the node has level larger than graph level, then we need to promote 
the node
+       as the entry node. If, while we add the node to the graph, the entry 
node has changed
+       (which means the graph level has changed as well), we need to reinsert 
the node
+       to the newly introduced levels (repeating step 2,3 for new levels) and 
again try to
+       promote the node to entry node.
+    */
     RandomVectorScorer scorer = scorerSupplier.scorer(node);
     final int nodeLevel = getRandomGraphLevel(ml, random);
-    int curMaxLevel = hnsw.numLevels() - 1;
-
-    // If entrynode is -1, then this should finish without adding neighbors
-    if (hnsw.entryNode() == -1) {
-      for (int level = nodeLevel; level >= 0; level--) {
-        hnsw.addNode(level, node);
-      }
+    // first add nodes to all levels
+    for (int level = nodeLevel; level >= 0; level--) {
+      hnsw.addNode(level, node);
+    }
+    // then promote itself as entry node if entry node is not set
+    if (hnsw.trySetNewEntryNode(node, nodeLevel)) {
       return;
     }
-    int[] eps = new int[] {hnsw.entryNode()};
+    // if the entry node is already set, then we have to do all connections 
first before we can
+    // promote ourselves as entry node
+    // do connections from bottom up

Review Comment:
   we call this top down in the comments above, and I think we usually think of 
level 0 as the bottom?



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -221,34 +292,39 @@ private long printGraphBuildStatus(int node, long start, 
long t) {
     return now;
   }
 
-  private void addDiverseNeighbors(int level, int node, 
GraphBuilderKnnCollector candidates)
-      throws IOException {
+  private void addDiverseNeighbors(int level, int node, NeighborArray scratch) 
throws IOException {
     /* For each of the beamWidth nearest candidates (going from best to 
worst), select it only if it
      * is closer to target than it is to any of the already-selected neighbors 
(ie selected in this method,
      * since the node is new and has no prior neighbors).
      */
     NeighborArray neighbors = hnsw.getNeighbors(level, node);
     assert neighbors.size() == 0; // new node
-    popToScratch(candidates);
     int maxConnOnLevel = level == 0 ? M * 2 : M;
-    selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel);
+    boolean[] mask = selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel);
 
     // Link the selected nodes to the new node, and the new node to the 
selected nodes (again
     // applying diversity heuristic)
-    int size = neighbors.size();
-    for (int i = 0; i < size; i++) {
-      int nbr = neighbors.node[i];
+    for (int i = 0; i < scratch.size(); i++) {
+      if (mask[i] == false) {

Review Comment:
   could you add a comment explaining the need for mask? Are we changing the 
linking criteria here, or is this somehow to do with making this threadsafe?



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws 
IOException {
     return hnsw;
   }
 
-  /** Set info-stream to output debugging information * */
+  @Override
   public void setInfoStream(InfoStream infoStream) {
     this.infoStream = infoStream;
   }
 
+  @Override
   public OnHeapHnswGraph getGraph() {
     return hnsw;
   }
 
-  private void addVectors(int maxOrd) throws IOException {
+  protected void addVectors(int minOrd, int maxOrd) throws IOException {
     long start = System.nanoTime(), t = start;
-    for (int node = 0; node < maxOrd; node++) {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + 
maxOrd + ")");
+    }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") 
initialized.size=" +
+    // initializedNodes.size());
+    for (int node = minOrd; node < maxOrd; node++) {
+      // System.out.println("add node " + node + " t=" + 
Thread.currentThread().getName());
       addGraphNode(node);
+      // System.out.println("entry node " + hnsw.entryNode());
+      // System.out.println("node " + node + " nbrs.size()=" + 
hnsw.getNeighbors(0, node).size());
       if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
         t = printGraphBuildStatus(node, start, t);
       }
     }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done 
+ graph.size=" +
+    // hnsw.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void addVectors(int maxOrd) throws IOException {
+    addVectors(0, maxOrd);
   }
 
-  /** Inserts a doc with vector value to the graph */
+  @Override
   public void addGraphNode(int node) throws IOException {
+    /*
+    Note: this implementation is thread safe when graph size is fixed (e.g. 
when merging)
+    The process of adding a node is roughly:
+    1. Add the node to all level from top to the bottom, but do not connect it 
to any other node,
+       nor try to promote itself to an entry node before the connection is done

Review Comment:
   (unless the graph is empty and this is the first node)



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws 
IOException {
     return hnsw;
   }
 
-  /** Set info-stream to output debugging information * */
+  @Override
   public void setInfoStream(InfoStream infoStream) {
     this.infoStream = infoStream;
   }
 
+  @Override
   public OnHeapHnswGraph getGraph() {
     return hnsw;
   }
 
-  private void addVectors(int maxOrd) throws IOException {
+  protected void addVectors(int minOrd, int maxOrd) throws IOException {
     long start = System.nanoTime(), t = start;
-    for (int node = 0; node < maxOrd; node++) {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + 
maxOrd + ")");
+    }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") 
initialized.size=" +
+    // initializedNodes.size());
+    for (int node = minOrd; node < maxOrd; node++) {
+      // System.out.println("add node " + node + " t=" + 
Thread.currentThread().getName());
       addGraphNode(node);
+      // System.out.println("entry node " + hnsw.entryNode());
+      // System.out.println("node " + node + " nbrs.size()=" + 
hnsw.getNeighbors(0, node).size());
       if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
         t = printGraphBuildStatus(node, start, t);
       }
     }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done 
+ graph.size=" +
+    // hnsw.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void addVectors(int maxOrd) throws IOException {
+    addVectors(0, maxOrd);
   }
 
-  /** Inserts a doc with vector value to the graph */
+  @Override
   public void addGraphNode(int node) throws IOException {
+    /*
+    Note: this implementation is thread safe when graph size is fixed (e.g. 
when merging)
+    The process of adding a node is roughly:
+    1. Add the node to all level from top to the bottom, but do not connect it 
to any other node,
+       nor try to promote itself to an entry node before the connection is done
+    2. Do the search from top to bottom, remember all the possible neighbours 
on each level the node
+       is on.
+    3. Add the neighbor to the node from bottom to top level, when adding the 
neighbour,
+       we always add all the outgoing links first before adding incoming link 
such that
+       when a search visiting this node, it can always find a way out
+    4. If the node has level that is less or equal to graph level, then we're 
done here.
+       If the node has level larger than graph level, then we need to promote 
the node
+       as the entry node. If, while we add the node to the graph, the entry 
node has changed
+       (which means the graph level has changed as well), we need to reinsert 
the node
+       to the newly introduced levels (repeating step 2,3 for new levels) and 
again try to
+       promote the node to entry node.
+    */
     RandomVectorScorer scorer = scorerSupplier.scorer(node);
     final int nodeLevel = getRandomGraphLevel(ml, random);
-    int curMaxLevel = hnsw.numLevels() - 1;
-
-    // If entrynode is -1, then this should finish without adding neighbors
-    if (hnsw.entryNode() == -1) {
-      for (int level = nodeLevel; level >= 0; level--) {
-        hnsw.addNode(level, node);
-      }
+    // first add nodes to all levels
+    for (int level = nodeLevel; level >= 0; level--) {
+      hnsw.addNode(level, node);
+    }
+    // then promote itself as entry node if entry node is not set
+    if (hnsw.trySetNewEntryNode(node, nodeLevel)) {
       return;
     }
-    int[] eps = new int[] {hnsw.entryNode()};
+    // if the entry node is already set, then we have to do all connections 
first before we can
+    // promote ourselves as entry node
+    // do connections from bottom up
+    int lowestUnsetLevel = 0;
+    int curMaxLevel;
+    do {
+      curMaxLevel = hnsw.numLevels() - 1;
+      // NOTE: the entry node and max level may not be paired, but because we 
get the level first
+      // we ensure that the entry node we get later will always exist on the 
curMaxLevel
+      int[] eps = new int[] {hnsw.entryNode()};
+      // for levels > nodeLevel search with topk = 1
+      GraphBuilderKnnCollector candidates = entryCandidates;
+      for (int level = curMaxLevel; level > nodeLevel; level--) {
+        candidates.clear();
+        graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
+        eps = new int[] {candidates.popNode()};
+      }
 
-    // if a node introduces new levels to the graph, add this new node on new 
levels
-    for (int level = nodeLevel; level > curMaxLevel; level--) {
-      hnsw.addNode(level, node);
-    }
+      // for levels <= nodeLevel search with topk = beamWidth, and add 
connections
+      candidates = beamCandidates;
+      NeighborArray[] scratchPerLevel =
+          new NeighborArray[Math.min(nodeLevel, curMaxLevel) - 
lowestUnsetLevel + 1];
+      for (int i = scratchPerLevel.length - 1; i >= 0; i--) {
+        int level = i + lowestUnsetLevel;
+        candidates.clear();
+        graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
+        eps = candidates.popUntilNearestKNodes();
+        scratchPerLevel[i] = new NeighborArray(Math.max(beamCandidates.k(), M 
+ 1), false);
+        popToScratch(candidates, scratchPerLevel[i]);
+      }
 
-    // for levels > nodeLevel search with topk = 1
-    GraphBuilderKnnCollector candidates = entryCandidates;
-    for (int level = curMaxLevel; level > nodeLevel; level--) {
-      candidates.clear();
-      graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
-      eps = new int[] {candidates.popNode()};
-    }
-    // for levels <= nodeLevel search with topk = beamWidth, and add 
connections
-    candidates = beamCandidates;
-    for (int level = Math.min(nodeLevel, curMaxLevel); level >= 0; level--) {
-      candidates.clear();
-      graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
-      eps = candidates.popUntilNearestKNodes();
-      hnsw.addNode(level, node);
-      addDiverseNeighbors(level, node, candidates);
-    }
+      for (int i = 0; i < scratchPerLevel.length; i++) {
+        addDiverseNeighbors(i + lowestUnsetLevel, node, scratchPerLevel[i]);
+      }
+      lowestUnsetLevel = scratchPerLevel.length + lowestUnsetLevel;

Review Comment:
   `+=`?



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws 
IOException {
     return hnsw;
   }
 
-  /** Set info-stream to output debugging information * */
+  @Override
   public void setInfoStream(InfoStream infoStream) {
     this.infoStream = infoStream;
   }
 
+  @Override
   public OnHeapHnswGraph getGraph() {
     return hnsw;
   }
 
-  private void addVectors(int maxOrd) throws IOException {
+  protected void addVectors(int minOrd, int maxOrd) throws IOException {
     long start = System.nanoTime(), t = start;
-    for (int node = 0; node < maxOrd; node++) {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + 
maxOrd + ")");
+    }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") 
initialized.size=" +
+    // initializedNodes.size());
+    for (int node = minOrd; node < maxOrd; node++) {
+      // System.out.println("add node " + node + " t=" + 
Thread.currentThread().getName());
       addGraphNode(node);
+      // System.out.println("entry node " + hnsw.entryNode());
+      // System.out.println("node " + node + " nbrs.size()=" + 
hnsw.getNeighbors(0, node).size());
       if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
         t = printGraphBuildStatus(node, start, t);
       }
     }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done 
+ graph.size=" +
+    // hnsw.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void addVectors(int maxOrd) throws IOException {
+    addVectors(0, maxOrd);
   }
 
-  /** Inserts a doc with vector value to the graph */
+  @Override
   public void addGraphNode(int node) throws IOException {
+    /*
+    Note: this implementation is thread safe when graph size is fixed (e.g. 
when merging)
+    The process of adding a node is roughly:
+    1. Add the node to all level from top to the bottom, but do not connect it 
to any other node,
+       nor try to promote itself to an entry node before the connection is done
+    2. Do the search from top to bottom, remember all the possible neighbours 
on each level the node
+       is on.
+    3. Add the neighbor to the node from bottom to top level, when adding the 
neighbour,
+       we always add all the outgoing links first before adding incoming link 
such that
+       when a search visiting this node, it can always find a way out
+    4. If the node has level that is less or equal to graph level, then we're 
done here.
+       If the node has level larger than graph level, then we need to promote 
the node
+       as the entry node. If, while we add the node to the graph, the entry 
node has changed
+       (which means the graph level has changed as well), we need to reinsert 
the node
+       to the newly introduced levels (repeating step 2,3 for new levels) and 
again try to
+       promote the node to entry node.
+    */
     RandomVectorScorer scorer = scorerSupplier.scorer(node);
     final int nodeLevel = getRandomGraphLevel(ml, random);
-    int curMaxLevel = hnsw.numLevels() - 1;
-
-    // If entrynode is -1, then this should finish without adding neighbors
-    if (hnsw.entryNode() == -1) {
-      for (int level = nodeLevel; level >= 0; level--) {
-        hnsw.addNode(level, node);
-      }
+    // first add nodes to all levels
+    for (int level = nodeLevel; level >= 0; level--) {
+      hnsw.addNode(level, node);
+    }
+    // then promote itself as entry node if entry node is not set
+    if (hnsw.trySetNewEntryNode(node, nodeLevel)) {
       return;
     }
-    int[] eps = new int[] {hnsw.entryNode()};
+    // if the entry node is already set, then we have to do all connections 
first before we can
+    // promote ourselves as entry node
+    // do connections from bottom up
+    int lowestUnsetLevel = 0;
+    int curMaxLevel;
+    do {
+      curMaxLevel = hnsw.numLevels() - 1;
+      // NOTE: the entry node and max level may not be paired, but because we 
get the level first
+      // we ensure that the entry node we get later will always exist on the 
curMaxLevel
+      int[] eps = new int[] {hnsw.entryNode()};
+      // for levels > nodeLevel search with topk = 1
+      GraphBuilderKnnCollector candidates = entryCandidates;
+      for (int level = curMaxLevel; level > nodeLevel; level--) {
+        candidates.clear();
+        graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
+        eps = new int[] {candidates.popNode()};
+      }
 
-    // if a node introduces new levels to the graph, add this new node on new 
levels
-    for (int level = nodeLevel; level > curMaxLevel; level--) {
-      hnsw.addNode(level, node);
-    }
+      // for levels <= nodeLevel search with topk = beamWidth, and add 
connections
+      candidates = beamCandidates;
+      NeighborArray[] scratchPerLevel =
+          new NeighborArray[Math.min(nodeLevel, curMaxLevel) - 
lowestUnsetLevel + 1];
+      for (int i = scratchPerLevel.length - 1; i >= 0; i--) {
+        int level = i + lowestUnsetLevel;
+        candidates.clear();
+        graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
+        eps = candidates.popUntilNearestKNodes();
+        scratchPerLevel[i] = new NeighborArray(Math.max(beamCandidates.k(), M 
+ 1), false);
+        popToScratch(candidates, scratchPerLevel[i]);
+      }
 
-    // for levels > nodeLevel search with topk = 1
-    GraphBuilderKnnCollector candidates = entryCandidates;
-    for (int level = curMaxLevel; level > nodeLevel; level--) {
-      candidates.clear();
-      graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
-      eps = new int[] {candidates.popNode()};
-    }
-    // for levels <= nodeLevel search with topk = beamWidth, and add 
connections
-    candidates = beamCandidates;
-    for (int level = Math.min(nodeLevel, curMaxLevel); level >= 0; level--) {
-      candidates.clear();
-      graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
-      eps = candidates.popUntilNearestKNodes();
-      hnsw.addNode(level, node);
-      addDiverseNeighbors(level, node, candidates);
-    }
+      for (int i = 0; i < scratchPerLevel.length; i++) {
+        addDiverseNeighbors(i + lowestUnsetLevel, node, scratchPerLevel[i]);
+      }
+      lowestUnsetLevel = scratchPerLevel.length + lowestUnsetLevel;
+      assert lowestUnsetLevel == Math.min(nodeLevel, curMaxLevel) + 1;
+      if (lowestUnsetLevel > nodeLevel) {
+        return;
+      }
+      assert lowestUnsetLevel == curMaxLevel + 1 && nodeLevel > curMaxLevel;
+      if (hnsw.tryPromoteNewEntryNode(node, nodeLevel, curMaxLevel)) {
+        return;
+      }
+      if (hnsw.numLevels() == curMaxLevel + 1) {
+        throw new IllegalStateException(

Review Comment:
   this is a sanity check and should really never happen, right?



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -221,34 +292,39 @@ private long printGraphBuildStatus(int node, long start, 
long t) {
     return now;
   }
 
-  private void addDiverseNeighbors(int level, int node, 
GraphBuilderKnnCollector candidates)
-      throws IOException {
+  private void addDiverseNeighbors(int level, int node, NeighborArray scratch) 
throws IOException {
     /* For each of the beamWidth nearest candidates (going from best to 
worst), select it only if it
      * is closer to target than it is to any of the already-selected neighbors 
(ie selected in this method,
      * since the node is new and has no prior neighbors).
      */
     NeighborArray neighbors = hnsw.getNeighbors(level, node);
     assert neighbors.size() == 0; // new node
-    popToScratch(candidates);
     int maxConnOnLevel = level == 0 ? M * 2 : M;
-    selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel);
+    boolean[] mask = selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel);
 
     // Link the selected nodes to the new node, and the new node to the 
selected nodes (again
     // applying diversity heuristic)
-    int size = neighbors.size();
-    for (int i = 0; i < size; i++) {
-      int nbr = neighbors.node[i];
+    for (int i = 0; i < scratch.size(); i++) {
+      if (mask[i] == false) {
+        continue;
+      }
+      int nbr = scratch.node[i];
       NeighborArray nbrsOfNbr = hnsw.getNeighbors(level, nbr);
-      nbrsOfNbr.addOutOfOrder(node, neighbors.score[i]);
+      long start = System.nanoTime();
+      nbrsOfNbr.rwlock.writeLock().lock();
+      NeighborArray.contentionTime.addAndGet(System.nanoTime() - start);
+      nbrsOfNbr.addOutOfOrder(node, scratch.score[i]);
       if (nbrsOfNbr.size() > maxConnOnLevel) {
         int indexToRemove = findWorstNonDiverse(nbrsOfNbr, nbr);
         nbrsOfNbr.removeIndex(indexToRemove);
       }
+      nbrsOfNbr.rwlock.writeLock().unlock();

Review Comment:
   We should use try/finally to release the lock because in theory at least 
findWorstNonDiverse can throw an IOExc and we could end up never releasing the 
lock?  Probably we should check the read locks similarly



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/OnHeapHnswGraph.java:
##########
@@ -185,7 +187,41 @@ public int numLevels() {
    */
   @Override
   public int entryNode() {
-    return entryNode;
+    return entryNode.get().node;
+  }
+
+  /**
+   * Try to set the entry node if the graph does not have one
+   *
+   * @return True if the entry node is set to the provided node. False if the 
entry node is already

Review Comment:
   grammar nit: "if the entry node already exists"



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java:
##########
@@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws 
IOException {
     return hnsw;
   }
 
-  /** Set info-stream to output debugging information * */
+  @Override
   public void setInfoStream(InfoStream infoStream) {
     this.infoStream = infoStream;
   }
 
+  @Override
   public OnHeapHnswGraph getGraph() {
     return hnsw;
   }
 
-  private void addVectors(int maxOrd) throws IOException {
+  protected void addVectors(int minOrd, int maxOrd) throws IOException {
     long start = System.nanoTime(), t = start;
-    for (int node = 0; node < maxOrd; node++) {
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + 
maxOrd + ")");
+    }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") 
initialized.size=" +
+    // initializedNodes.size());
+    for (int node = minOrd; node < maxOrd; node++) {
+      // System.out.println("add node " + node + " t=" + 
Thread.currentThread().getName());
       addGraphNode(node);
+      // System.out.println("entry node " + hnsw.entryNode());
+      // System.out.println("node " + node + " nbrs.size()=" + 
hnsw.getNeighbors(0, node).size());
       if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
         t = printGraphBuildStatus(node, start, t);
       }
     }
+    //    System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done 
+ graph.size=" +
+    // hnsw.size());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void addVectors(int maxOrd) throws IOException {
+    addVectors(0, maxOrd);
   }
 
-  /** Inserts a doc with vector value to the graph */
+  @Override
   public void addGraphNode(int node) throws IOException {
+    /*
+    Note: this implementation is thread safe when graph size is fixed (e.g. 
when merging)
+    The process of adding a node is roughly:
+    1. Add the node to all level from top to the bottom, but do not connect it 
to any other node,
+       nor try to promote itself to an entry node before the connection is done
+    2. Do the search from top to bottom, remember all the possible neighbours 
on each level the node
+       is on.
+    3. Add the neighbor to the node from bottom to top level, when adding the 
neighbour,
+       we always add all the outgoing links first before adding incoming link 
such that
+       when a search visiting this node, it can always find a way out
+    4. If the node has level that is less or equal to graph level, then we're 
done here.
+       If the node has level larger than graph level, then we need to promote 
the node
+       as the entry node. If, while we add the node to the graph, the entry 
node has changed
+       (which means the graph level has changed as well), we need to reinsert 
the node
+       to the newly introduced levels (repeating step 2,3 for new levels) and 
again try to
+       promote the node to entry node.
+    */
     RandomVectorScorer scorer = scorerSupplier.scorer(node);
     final int nodeLevel = getRandomGraphLevel(ml, random);
-    int curMaxLevel = hnsw.numLevels() - 1;
-
-    // If entrynode is -1, then this should finish without adding neighbors
-    if (hnsw.entryNode() == -1) {
-      for (int level = nodeLevel; level >= 0; level--) {
-        hnsw.addNode(level, node);
-      }
+    // first add nodes to all levels
+    for (int level = nodeLevel; level >= 0; level--) {
+      hnsw.addNode(level, node);
+    }
+    // then promote itself as entry node if entry node is not set
+    if (hnsw.trySetNewEntryNode(node, nodeLevel)) {
       return;
     }
-    int[] eps = new int[] {hnsw.entryNode()};
+    // if the entry node is already set, then we have to do all connections 
first before we can
+    // promote ourselves as entry node
+    // do connections from bottom up
+    int lowestUnsetLevel = 0;
+    int curMaxLevel;
+    do {
+      curMaxLevel = hnsw.numLevels() - 1;
+      // NOTE: the entry node and max level may not be paired, but because we 
get the level first
+      // we ensure that the entry node we get later will always exist on the 
curMaxLevel
+      int[] eps = new int[] {hnsw.entryNode()};
+      // for levels > nodeLevel search with topk = 1
+      GraphBuilderKnnCollector candidates = entryCandidates;
+      for (int level = curMaxLevel; level > nodeLevel; level--) {
+        candidates.clear();
+        graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null);
+        eps = new int[] {candidates.popNode()};

Review Comment:
   why do we create a `new int[]` instead of updating the one we have? This 
irked me in the existing code as well 



##########
lucene/core/src/test/org/apache/lucene/util/hnsw/HnswGraphTestCase.java:
##########
@@ -709,6 +710,7 @@ public void testHnswGraphBuilderInvalid() throws 
IOException {
         IllegalArgumentException.class, () -> 
HnswGraphBuilder.create(scorerSupplier, 10, 0, 0));
   }
 
+  @Ignore

Review Comment:
   what's the plan for this?



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/IncrementalHnswGraphMerger.java:
##########
@@ -32,22 +32,23 @@
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.CollectionUtil;
 import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
 
 /**
  * This selects the biggest Hnsw graph from the provided merge state and 
initializes a new
  * HnswGraphBuilder with that graph as a starting point.
  *
  * @lucene.experimental
  */
-public class IncrementalHnswGraphMerger {
+public class IncrementalHnswGraphMerger implements HnswGraphMerger {
 
-  private KnnVectorsReader initReader;
-  private MergeState.DocMap initDocMap;
-  private int initGraphSize;
-  private final FieldInfo fieldInfo;
-  private final RandomVectorScorerSupplier scorerSupplier;
-  private final int M;
-  private final int beamWidth;
+  protected KnnVectorsReader initReader;
+  protected MergeState.DocMap initDocMap;
+  protected int initGraphSize;
+  protected final FieldInfo fieldInfo;

Review Comment:
   nit: can we move the final ones before the mutable ones?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to