benwtrent commented on code in PR #12421:
URL: https://github.com/apache/lucene/pull/12421#discussion_r1261778701


##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.GrowableBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.ThreadInterruptedException;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level 
overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public class ConcurrentHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during 
a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ExplicitThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ExplicitThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+  private final ExplicitThreadLocal<NeighborQueue> beamCandidates;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check 
comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+
+  /** This is the "native" factory for ConcurrentHnswGraphBuilder. */
+  public static <T> ConcurrentHnswGraphBuilder<T> create(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    return new ConcurrentHnswGraphBuilder<>(
+        vectors, vectorEncoding, similarityFunction, M, beamWidth);
+  }
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them 
by their dense
+   * ordinals, using the given hyperparameter settings, and returns the 
resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - 
must provide a
+   *     different view over those vectors than the one used to add via 
addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of 
connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest 
neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ExplicitThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new GrowableBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are 
first
+    this.scratchNeighbors =
+        ExplicitThreadLocal.withInitial(() -> new 
NeighborArray(Math.max(beamWidth, M + 1), false));
+    this.beamCandidates =
+        ExplicitThreadLocal.withInitial(() -> new NeighborQueue(beamWidth, 
false));
+  }
+
+  private abstract static class ExplicitThreadLocal<U> {
+    private final ConcurrentHashMap<Long, U> map = new ConcurrentHashMap<>();
+
+    public U get() {
+      return map.computeIfAbsent(Thread.currentThread().getId(), k -> 
initialValue());
+    }
+
+    protected abstract U initialValue();
+
+    public static <U> ExplicitThreadLocal<U> withInitial(Supplier<U> 
initialValue) {
+      return new ExplicitThreadLocal<U>() {
+        @Override
+        protected U initialValue() {
+          return initialValue.get();
+        }
+      };
+    }
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link 
RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while 
avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors 
graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per 
core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained 
control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws 
IOException {
+    ExecutorService es;
+    int threadCount;
+    if (autoParallel) {
+      threadCount = Runtime.getRuntime().availableProcessors();
+      es =
+          Executors.newFixedThreadPool(
+              threadCount, new NamedThreadFactory("Concurrent HNSW builder"));
+    } else {
+      threadCount = 1;
+      es = Executors.newSingleThreadExecutor(new 
NamedThreadFactory("Concurrent HNSW builder"));
+    }
+
+    Future<ConcurrentOnHeapHnswGraph> f = buildAsync(vectorsToAdd, es, 
threadCount);
+    try {
+      return f.get();
+    } catch (InterruptedException e) {
+      throw new ThreadInterruptedException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e);
+    } finally {
+      es.shutdown();
+    }
+  }
+
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> 
vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  /**
+   * Bring-your-own ExecutorService graph builder.
+   *
+   * <p>Reads all the vectors from two copies of a {@link 
RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while 
avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors 
graph. Must be an
+   *     independent accessor for the vectors
+   * @param pool The ExecutorService to use. Must be an instance of 
ThreadPoolExecutor.
+   * @param concurrentTasks the number of tasks to submit in parallel.
+   */
+  public Future<ConcurrentOnHeapHnswGraph> buildAsync(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool, int 
concurrentTasks) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors 
provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + 
vectorsToAdd.size() + " vectors");
+    }
+    return addVectors(vectorsToAdd, pool, concurrentTasks);
+  }
+
+  // the goal here is to keep all the ExecutorService threads busy, but not to 
create potentially
+  // millions of futures by naively throwing everything at submit at once.  
So, we use
+  // a semaphore to wait until a thread is free before adding a new task.
+  private Future<ConcurrentOnHeapHnswGraph> addVectors(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool, int 
concurrentTasks) {
+    Semaphore semaphore = new Semaphore(concurrentTasks);
+    Set<Integer> inFlight = ConcurrentHashMap.newKeySet();
+    AtomicReference<Throwable> asyncException = new AtomicReference<>(null);
+
+    for (int i = 0; i < vectorsToAdd.size(); i++) {
+      final int node = i; // copy for closure
+      try {
+        semaphore.acquire();
+        inFlight.add(node);
+        pool.submit(
+            () -> {
+              try {
+                addGraphNode(node, vectorsToAdd);

Review Comment:
   I don't see how this works. VectorsToAdd is a random access vector class. 
Looking at the implementations, none of them are threadsafe and reuse arrays 
between calls.
   Is there a missing update to random access vectors? If not, you probably 
need one per thread.



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphSearcher.java:
##########
@@ -311,7 +369,6 @@ void searchLevel(
       graphSeek(graph, level, topCandidateNode);
       int friendOrd;
       while ((friendOrd = graphNextNeighbor(graph)) != NO_MORE_DOCS) {
-        assert friendOrd < size : "friendOrd=" + friendOrd + "; size=" + size;

Review Comment:
   We shouldn't remove a valid assertion. Searcher has long had a valid 
assumption that the graph isn't being updated. 
   
   If we are breaking these assumptions, we should have a different searcher 
class that better handles these new constraints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to