MarcusSorealheis commented on code in PR #12254: URL: https://github.com/apache/lucene/pull/12254#discussion_r1182114316
########## lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentOnHeapHnswGraph.java: ########## @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; + +/** + * An {@link HnswGraph} that offers concurrent access; for typical graphs you will get significant + * speedups in construction and searching as you add threads. + * + * <p>To search this graph, you should use a View obtained from {@link #getView()} to perform `seek` + * and `nextNeighbor` operations. For convenience, you can use these methods directly on the graph + * instance, which will give you a ThreadLocal View, but you can call `getView` directly if you need + * more control, e.g. for performing a second search in the same thread while the first is still in + * progress. + */ +public final class ConcurrentOnHeapHnswGraph extends HnswGraph implements Accountable { + private final AtomicReference<NodeAtLevel> + entryPoint; // the current graph entry node on the top level. -1 if not set + + // views for compatibility with HnswGraph interface; prefer creating views explicitly + private final ThreadLocal<ConcurrentHnswGraphView> views = + ThreadLocal.withInitial(ConcurrentHnswGraphView::new); + + // Unlike OnHeapHnswGraph (OHHG), we use the same data structure for Level 0 and higher node + // lists, + // a ConcurrentHashMap. While the ArrayList used for L0 in OHHG is faster for single-threaded + // workloads, it imposes an unacceptable contention burden for concurrent workloads. + private final ConcurrentMap<Integer, ConcurrentMap<Integer, ConcurrentNeighborSet>> graphLevels; + + // Neighbours' size on upper levels (nsize) and level 0 (nsize0) + private final int nsize; + private final int nsize0; + + ConcurrentOnHeapHnswGraph(int M) { + this.entryPoint = + new AtomicReference<>( + new NodeAtLevel(0, -1)); // Entry node should be negative until a node is added + this.nsize = M; + this.nsize0 = 2 * M; + + this.graphLevels = new ConcurrentHashMap<>(); + } + + /** + * Returns the neighbors connected to the given node. + * + * @param level level of the graph + * @param node the node whose neighbors are returned, represented as an ordinal on the level 0. + */ + public ConcurrentNeighborSet getNeighbors(int level, int node) { + return graphLevels.get(level).get(node); + } + + @Override + public synchronized int size() { + return graphLevels.get(0).size(); // all nodes are located on the 0th level + } + + @Override + public void addNode(int level, int node) { + if (level >= graphLevels.size()) { + for (int i = graphLevels.size(); i <= level; i++) { + graphLevels.putIfAbsent(i, new ConcurrentHashMap<>()); + } + } + + graphLevels.get(level).put(node, new ConcurrentNeighborSet(connectionsOnLevel(level))); + } + + /** + * must be called after addNode to a level > 0 + * + * <p>we don't do this as part of addNode itself, since it may not yet have been added to all the + * levels + */ + void maybeUpdateEntryNode(int level, int node) { + while (true) { + NodeAtLevel oldEntry = entryPoint.get(); + if (oldEntry.node >= 0 && oldEntry.level >= level) { + break; + } + entryPoint.compareAndSet(oldEntry, new NodeAtLevel(level, node)); + } + } + + private int connectionsOnLevel(int level) { + return level == 0 ? nsize0 : nsize; + } + + @Override + public void seek(int level, int target) throws IOException { + views.get().seek(level, target); + } + + @Override + public int nextNeighbor() throws IOException { + return views.get().nextNeighbor(); + } + + /** + * @return the current number of levels in the graph where nodes have been added and we have a + * valid entry point. + */ + @Override + public int numLevels() { + return entryPoint.get().level + 1; + } + + /** + * Returns the graph's current entry node on the top level shown as ordinals of the nodes on 0th + * level + * + * @return the graph's current entry node on the top level + */ + @Override + public int entryNode() { + return entryPoint.get().node; + } + + @Override + public NodesIterator getNodesOnLevel(int level) { + if (level == 0) { + return new ArrayNodesIterator(size()); + } else { + return new CollectionNodesIterator(graphLevels.get(level).keySet()); + } + } + + @Override + public long ramBytesUsed() { + // skip list used by Neighbor Set + long cskmNodesBytes = 3L * RamUsageEstimator.NUM_BYTES_OBJECT_REF; // K, V, index + long cskmIndexBytes = 3L * RamUsageEstimator.NUM_BYTES_OBJECT_REF; // node, down, right + long cskmBytes = + RamUsageEstimator.NUM_BYTES_OBJECT_REF // head + + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + + Runtime.getRuntime().availableProcessors() * Long.BYTES // longadder cells + + 4L * RamUsageEstimator.NUM_BYTES_OBJECT_REF; // internal view refs + long neighborSetBytes = + cskmBytes + + RamUsageEstimator.NUM_BYTES_OBJECT_REF // skiplist -> map reference + + Integer.BYTES + + RamUsageEstimator.NUM_BYTES_OBJECT_REF + + Integer.BYTES; // CNS fields + + // a CHM Node contains an int hash and a Node reference, as well as K and V references. + long chmNodeBytes = 3L * RamUsageEstimator.NUM_BYTES_OBJECT_REF + Integer.BYTES; + float chmLoadFactor = 0.75f; // this is hardcoded inside ConcurrentHashMap + // CHM has a striped counter Cell implementation, we expect at most one per core + long chmCounters = + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + + Runtime.getRuntime().availableProcessors() + * (RamUsageEstimator.NUM_BYTES_OBJECT_REF + Long.BYTES); + + long total = 0; + for (int l = 0; l <= entryPoint.get().level; l++) { + long numNodesOnLevel = graphLevels.get(l).size(); + + // we represent the graph structure with a concurrent hash map. + // we expect there to be nodesOnLevel / levelLoadFactor Nodes in its internal table. + // there is also an entrySet reference, 3 ints, and a float for internal use. + int nodeCount = (int) (numNodesOnLevel / chmLoadFactor); + long chmSize = Review Comment: Nit: I also wonder if these multiline assignments should be private methods to make it easier to follow. They probably were not split out into separate methods because they are quite explicit and easy enough to follow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org