msokolov commented on code in PR #12254:
URL: https://github.com/apache/lucene/pull/12254#discussion_r1187517239


##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentOnHeapHnswGraph.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * An {@link HnswGraph} that offers concurrent access; for typical graphs you 
will get significant
+ * speedups in construction and searching as you add threads.
+ *
+ * <p>To search this graph, you should use a View obtained from {@link 
#getView()} to perform `seek`
+ * and `nextNeighbor` operations. For convenience, you can use these methods 
directly on the graph
+ * instance, which will give you a ThreadLocal View, but you can call 
`getView` directly if you need
+ * more control, e.g. for performing a second search in the same thread while 
the first is still in
+ * progress.
+ */
+public final class ConcurrentOnHeapHnswGraph extends HnswGraph implements 
Accountable {
+  private final AtomicReference<NodeAtLevel>
+      entryPoint; // the current graph entry node on the top level. -1 if not 
set
+
+  // views for compatibility with HnswGraph interface; prefer creating views 
explicitly
+  private final ThreadLocal<ConcurrentHnswGraphView> views =

Review Comment:
   I really do think we need to remove these convenience methods based on 
ThreadLocals. We don't ever expect this to be used by the codec and we don't 
need to support any other use case. It seems to be driven by conformance to the 
pre-existing HnswGraph API, but this isn't required and results in a lot of 
extra complexity here (new interface abstractions, generics, etc) that obscures 
the basic idea for reviewers trying to understand what's going on here. The 
concurrent algorithm is a complex powerful new contribution - let's not clutter 
the PR with unrelated stuff.



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentOnHeapHnswGraph.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * An {@link HnswGraph} that offers concurrent access; for typical graphs you 
will get significant
+ * speedups in construction and searching as you add threads.
+ *
+ * <p>To search this graph, you should use a View obtained from {@link 
#getView()} to perform `seek`
+ * and `nextNeighbor` operations. For convenience, you can use these methods 
directly on the graph
+ * instance, which will give you a ThreadLocal View, but you can call 
`getView` directly if you need
+ * more control, e.g. for performing a second search in the same thread while 
the first is still in
+ * progress.
+ */
+public final class ConcurrentOnHeapHnswGraph extends HnswGraph implements 
Accountable {
+  private final AtomicReference<NodeAtLevel>
+      entryPoint; // the current graph entry node on the top level. -1 if not 
set
+
+  // views for compatibility with HnswGraph interface; prefer creating views 
explicitly
+  private final ThreadLocal<ConcurrentHnswGraphView> views =
+      ThreadLocal.withInitial(ConcurrentHnswGraphView::new);
+
+  // Unlike OnHeapHnswGraph (OHHG), we use the same data structure for Level 0 
and higher node
+  // lists,
+  // a ConcurrentHashMap.  While the ArrayList used for L0 in OHHG is faster 
for single-threaded
+  // workloads, it imposes an unacceptable contention burden for concurrent 
workloads.
+  private final ConcurrentMap<Integer, ConcurrentMap<Integer, 
ConcurrentNeighborSet>> graphLevels;
+
+  // Neighbours' size on upper levels (nsize) and level 0 (nsize0)
+  private final int nsize;
+  private final int nsize0;
+
+  ConcurrentOnHeapHnswGraph(int M) {
+    this.entryPoint =
+        new AtomicReference<>(
+            new NodeAtLevel(0, -1)); // Entry node should be negative until a 
node is added
+    this.nsize = M;
+    this.nsize0 = 2 * M;
+
+    this.graphLevels = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns the neighbors connected to the given node.
+   *
+   * @param level level of the graph
+   * @param node the node whose neighbors are returned, represented as an 
ordinal on the level 0.
+   */
+  public ConcurrentNeighborSet getNeighbors(int level, int node) {
+    return graphLevels.get(level).get(node);
+  }
+
+  @Override
+  public synchronized int size() {
+    return graphLevels.get(0).size(); // all nodes are located on the 0th level
+  }
+
+  @Override
+  public void addNode(int level, int node) {
+    if (level >= graphLevels.size()) {
+      for (int i = graphLevels.size(); i <= level; i++) {
+        graphLevels.putIfAbsent(i, new ConcurrentHashMap<>());
+      }
+    }
+
+    graphLevels.get(level).put(node, new 
ConcurrentNeighborSet(connectionsOnLevel(level)));

Review Comment:
   How do we know two threads aren't adding the same node at once? I guess 
there must be something that guarantees that, let's at least add a comment here 
referencing what it is



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentOnHeapHnswGraph.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * An {@link HnswGraph} that offers concurrent access; for typical graphs you 
will get significant
+ * speedups in construction and searching as you add threads.
+ *
+ * <p>To search this graph, you should use a View obtained from {@link 
#getView()} to perform `seek`
+ * and `nextNeighbor` operations. For convenience, you can use these methods 
directly on the graph
+ * instance, which will give you a ThreadLocal View, but you can call 
`getView` directly if you need
+ * more control, e.g. for performing a second search in the same thread while 
the first is still in
+ * progress.
+ */
+public final class ConcurrentOnHeapHnswGraph extends HnswGraph implements 
Accountable {
+  private final AtomicReference<NodeAtLevel>
+      entryPoint; // the current graph entry node on the top level. -1 if not 
set
+
+  // views for compatibility with HnswGraph interface; prefer creating views 
explicitly
+  private final ThreadLocal<ConcurrentHnswGraphView> views =
+      ThreadLocal.withInitial(ConcurrentHnswGraphView::new);
+
+  // Unlike OnHeapHnswGraph (OHHG), we use the same data structure for Level 0 
and higher node
+  // lists,
+  // a ConcurrentHashMap.  While the ArrayList used for L0 in OHHG is faster 
for single-threaded
+  // workloads, it imposes an unacceptable contention burden for concurrent 
workloads.
+  private final ConcurrentMap<Integer, ConcurrentMap<Integer, 
ConcurrentNeighborSet>> graphLevels;
+
+  // Neighbours' size on upper levels (nsize) and level 0 (nsize0)
+  private final int nsize;
+  private final int nsize0;
+
+  ConcurrentOnHeapHnswGraph(int M) {
+    this.entryPoint =
+        new AtomicReference<>(
+            new NodeAtLevel(0, -1)); // Entry node should be negative until a 
node is added
+    this.nsize = M;
+    this.nsize0 = 2 * M;
+
+    this.graphLevels = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns the neighbors connected to the given node.
+   *
+   * @param level level of the graph
+   * @param node the node whose neighbors are returned, represented as an 
ordinal on the level 0.
+   */
+  public ConcurrentNeighborSet getNeighbors(int level, int node) {
+    return graphLevels.get(level).get(node);
+  }
+
+  @Override
+  public synchronized int size() {
+    return graphLevels.get(0).size(); // all nodes are located on the 0th level
+  }
+
+  @Override
+  public void addNode(int level, int node) {
+    if (level >= graphLevels.size()) {
+      for (int i = graphLevels.size(); i <= level; i++) {
+        graphLevels.putIfAbsent(i, new ConcurrentHashMap<>());
+      }
+    }
+
+    graphLevels.get(level).put(node, new 
ConcurrentNeighborSet(connectionsOnLevel(level)));
+  }
+
+  /**
+   * must be called after addNode to a level > 0
+   *
+   * <p>we don't do this as part of addNode itself, since it may not yet have 
been added to all the
+   * levels
+   */
+  void maybeUpdateEntryNode(int level, int node) {
+    while (true) {
+      NodeAtLevel oldEntry = entryPoint.get();
+      if (oldEntry.node >= 0 && oldEntry.level >= level) {
+        break;
+      }
+      entryPoint.compareAndSet(oldEntry, new NodeAtLevel(level, node));
+    }
+  }
+
+  private int connectionsOnLevel(int level) {
+    return level == 0 ? nsize0 : nsize;
+  }
+
+  @Override

Review Comment:
   Please remove, at least for now. I think we will find a better way to 
integrate with the codec that doesn't require conforming to this API



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level 
overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements 
IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during 
a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check 
comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them 
by their dense
+   * ordinals, using the given hyperparameter settings, and returns the 
resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - 
must provide a
+   *     different view over those vectors than the one used to add via 
addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of 
connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest 
neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are 
first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M 
+ 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link 
RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while 
avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors 
graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per 
core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained 
control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws 
IOException {
+    if (autoParallel) {
+      return build(
+          vectorsToAdd,
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(),
+              new NamedThreadFactory("Concurrent HNSW builder")));
+    } else {
+      return build(
+          vectorsToAdd,
+          Executors.newSingleThreadExecutor(new NamedThreadFactory("Concurrent 
HNSW builder")));
+    }
+  }
+
+  @Override
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> 
vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors 
provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + 
vectorsToAdd.size() + " vectors");
+    }
+    if (!(pool instanceof ThreadPoolExecutor)) {

Review Comment:
   Can we declare the argment to be of type ThreadPoolExecutor and let the 
compiler handle this? 



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level 
overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements 
IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during 
a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check 
comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them 
by their dense
+   * ordinals, using the given hyperparameter settings, and returns the 
resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - 
must provide a
+   *     different view over those vectors than the one used to add via 
addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of 
connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest 
neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are 
first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M 
+ 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link 
RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while 
avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors 
graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per 
core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained 
control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws 
IOException {
+    if (autoParallel) {
+      return build(
+          vectorsToAdd,
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(),
+              new NamedThreadFactory("Concurrent HNSW builder")));
+    } else {
+      return build(
+          vectorsToAdd,
+          Executors.newSingleThreadExecutor(new NamedThreadFactory("Concurrent 
HNSW builder")));
+    }
+  }
+
+  @Override
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> 
vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors 
provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + 
vectorsToAdd.size() + " vectors");
+    }
+    if (!(pool instanceof ThreadPoolExecutor)) {
+      throw new IllegalArgumentException("ExecutorService must be a 
ThreadPoolExecutor");
+    }
+    addVectors(vectorsToAdd, (ThreadPoolExecutor) pool);
+    return hnsw;
+  }
+
+  // the goal here is to keep all the ExecutorService threads busy, but not to 
create potentially
+  // millions of futures by naively throwing everything at submit at once.  
So, we use
+  // a semaphore to wait until a thread is free before adding a new task.
+  private void addVectors(RandomAccessVectorValues<T> vectorsToAdd, 
ThreadPoolExecutor pool) {
+    Semaphore semaphore = new Semaphore(pool.getMaximumPoolSize());
+
+    for (int i = 0; i < vectorsToAdd.size(); i++) {
+      final int node = i; // copy for closure

Review Comment:
   I see - here is where we guarantee that each node is added once. They might 
be added out of sequence now, but that's OK



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level 
overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements 
IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during 
a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check 
comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them 
by their dense
+   * ordinals, using the given hyperparameter settings, and returns the 
resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - 
must provide a
+   *     different view over those vectors than the one used to add via 
addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of 
connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest 
neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =

Review Comment:
   Instead of relying on ThreadLocal can we manage this thread -> searcher 
association explicitly? EG in addVectors can we create a collection of 
per-thread builder objects (like the views we have on the graph) and then 
manage them ourselves? could even be a map of threadid -> builder-view. In this 
project we've had issues with GC not capturing ThreadLocal storage, eg see 
https://markmail.org/thread/fbklvo4tkdd5d5u5. If the storage is managed locally 
as a reference from the builder, it should get cleaned up properly.



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level 
overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements 
IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during 
a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check 
comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them 
by their dense
+   * ordinals, using the given hyperparameter settings, and returns the 
resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - 
must provide a
+   *     different view over those vectors than the one used to add via 
addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of 
connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest 
neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are 
first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M 
+ 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link 
RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while 
avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors 
graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per 
core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained 
control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws 
IOException {
+    if (autoParallel) {
+      return build(
+          vectorsToAdd,
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(),
+              new NamedThreadFactory("Concurrent HNSW builder")));
+    } else {
+      return build(
+          vectorsToAdd,
+          Executors.newSingleThreadExecutor(new NamedThreadFactory("Concurrent 
HNSW builder")));
+    }
+  }
+
+  @Override
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> 
vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors 
provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + 
vectorsToAdd.size() + " vectors");
+    }
+    if (!(pool instanceof ThreadPoolExecutor)) {
+      throw new IllegalArgumentException("ExecutorService must be a 
ThreadPoolExecutor");
+    }
+    addVectors(vectorsToAdd, (ThreadPoolExecutor) pool);
+    return hnsw;
+  }
+
+  // the goal here is to keep all the ExecutorService threads busy, but not to 
create potentially
+  // millions of futures by naively throwing everything at submit at once.  
So, we use
+  // a semaphore to wait until a thread is free before adding a new task.
+  private void addVectors(RandomAccessVectorValues<T> vectorsToAdd, 
ThreadPoolExecutor pool) {
+    Semaphore semaphore = new Semaphore(pool.getMaximumPoolSize());
+
+    for (int i = 0; i < vectorsToAdd.size(); i++) {
+      final int node = i; // copy for closure
+      try {
+        semaphore.acquire();
+        pool.submit(
+            () -> {
+              try {
+                addGraphNode(node, vectorsToAdd);
+              } catch (IOException e) {
+                throw new RuntimeException(e);

Review Comment:
   We need to find a way to propagate these exceptions to the caller so we can 
re-throw. Also, UncheckedIOException is usually preferable for wrapping 
IOExceptions.



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level 
overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements 
IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during 
a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check 
comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them 
by their dense
+   * ordinals, using the given hyperparameter settings, and returns the 
resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - 
must provide a
+   *     different view over those vectors than the one used to add via 
addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of 
connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest 
neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are 
first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M 
+ 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link 
RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while 
avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors 
graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per 
core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained 
control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(

Review Comment:
   Let's remove this method. It can always be implemented by some sugar-wrapper 
that creates the appropriate kind of ThreadPool



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level 
overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements 
IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during 
a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check 
comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them 
by their dense
+   * ordinals, using the given hyperparameter settings, and returns the 
resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - 
must provide a
+   *     different view over those vectors than the one used to add via 
addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of 
connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest 
neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are 
first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M 
+ 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link 
RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while 
avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors 
graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per 
core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained 
control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws 
IOException {
+    if (autoParallel) {
+      return build(
+          vectorsToAdd,
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(),
+              new NamedThreadFactory("Concurrent HNSW builder")));
+    } else {
+      return build(
+          vectorsToAdd,
+          Executors.newSingleThreadExecutor(new NamedThreadFactory("Concurrent 
HNSW builder")));
+    }
+  }
+
+  @Override
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> 
vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors 
provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + 
vectorsToAdd.size() + " vectors");
+    }
+    if (!(pool instanceof ThreadPoolExecutor)) {
+      throw new IllegalArgumentException("ExecutorService must be a 
ThreadPoolExecutor");
+    }
+    addVectors(vectorsToAdd, (ThreadPoolExecutor) pool);
+    return hnsw;
+  }
+
+  // the goal here is to keep all the ExecutorService threads busy, but not to 
create potentially
+  // millions of futures by naively throwing everything at submit at once.  
So, we use
+  // a semaphore to wait until a thread is free before adding a new task.
+  private void addVectors(RandomAccessVectorValues<T> vectorsToAdd, 
ThreadPoolExecutor pool) {
+    Semaphore semaphore = new Semaphore(pool.getMaximumPoolSize());
+
+    for (int i = 0; i < vectorsToAdd.size(); i++) {
+      final int node = i; // copy for closure
+      try {
+        semaphore.acquire();
+        pool.submit(
+            () -> {
+              try {
+                addGraphNode(node, vectorsToAdd);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              } finally {
+                semaphore.release();
+              }
+            });
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);

Review Comment:
   Lucene generally uses IOException for most low-level failures and doesn't 
have any mechanism for handling uncaught unchecked exceptions, so we should 
convert this to an IOException and add throws to all the methods here that need 
it.



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level 
overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements 
IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during 
a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check 
comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them 
by their dense
+   * ordinals, using the given hyperparameter settings, and returns the 
resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - 
must provide a
+   *     different view over those vectors than the one used to add via 
addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of 
connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest 
neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are 
first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M 
+ 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link 
RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while 
avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors 
graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per 
core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained 
control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws 
IOException {
+    if (autoParallel) {
+      return build(
+          vectorsToAdd,
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(),
+              new NamedThreadFactory("Concurrent HNSW builder")));
+    } else {
+      return build(
+          vectorsToAdd,
+          Executors.newSingleThreadExecutor(new NamedThreadFactory("Concurrent 
HNSW builder")));
+    }
+  }
+
+  @Override
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> 
vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors 
provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + 
vectorsToAdd.size() + " vectors");
+    }
+    if (!(pool instanceof ThreadPoolExecutor)) {
+      throw new IllegalArgumentException("ExecutorService must be a 
ThreadPoolExecutor");
+    }
+    addVectors(vectorsToAdd, (ThreadPoolExecutor) pool);
+    return hnsw;
+  }
+
+  // the goal here is to keep all the ExecutorService threads busy, but not to 
create potentially
+  // millions of futures by naively throwing everything at submit at once.  
So, we use
+  // a semaphore to wait until a thread is free before adding a new task.
+  private void addVectors(RandomAccessVectorValues<T> vectorsToAdd, 
ThreadPoolExecutor pool) {
+    Semaphore semaphore = new Semaphore(pool.getMaximumPoolSize());
+
+    for (int i = 0; i < vectorsToAdd.size(); i++) {
+      final int node = i; // copy for closure
+      try {
+        semaphore.acquire();
+        pool.submit(
+            () -> {
+              try {
+                addGraphNode(node, vectorsToAdd);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              } finally {
+                semaphore.release();
+              }
+            });
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    // Wait for any remaining tasks to complete
+    pool.shutdown();
+    try {

Review Comment:
   Perhaps we could return a Future and let the caller handle the details of 
how long to wait and managing the ThreadExecutor that *they supplied to us*. We 
don't want to be closing the pool here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to