Re: [PR] Initial impl of MMapDirectory for Java 22 [lucene]
uschindler commented on PR #12706: URL: https://github.com/apache/lucene/pull/12706#issuecomment-1774030650 I updated the Jenkins jobs running mmap tests to use this branch: https://jenkins.thetaphi.de/job/Lucene-MMAPv2-Linux/, https://jenkins.thetaphi.de/job/Lucene-MMAPv2-Windows/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Improve handling of NullPointerException in MMapDirectory's IndexInputs (check the "closed" condition) [lucene]
uschindler merged PR #12705: URL: https://github.com/apache/lucene/pull/12705 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [I] Enable recursive graph bisection out of the box? [lucene]
gf2121 commented on issue #12665: URL: https://github.com/apache/lucene/issues/12665#issuecomment-1774050262 > essentially calling OfflineSorter on all postings FYI, I came up with some ideas to optimize this sort before, hoping to be helpful :) 1. If we use a stable sorter, we can only compare docIds because termIds are already in order. 2. If we take the maxDoc into consideration, we can save 1 round of reorder when `maxDoc < (1 << 24)`. 3. We may even purely use an offline version of radix sorter to sort the whole file, since all we need is just 3 or 4 times reorder based on point 1 and 2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [I] Specialize arc store for continuous label in FST [lucene]
gf2121 closed issue #12701: Specialize arc store for continuous label in FST URL: https://github.com/apache/lucene/issues/12701 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Improve handling of NullPointerException in MMapDirectory's IndexInputs (check the "closed" condition) [lucene]
uschindler commented on PR #12705: URL: https://github.com/apache/lucene/pull/12705#issuecomment-1774091447 IllegalStateException cannot happen in that code, only in access to memory segments closed by other threads. NPE was a special case as it may happen easier. IllegalStateException has a clear meaning and cannot happen unless you call a method that documents to throw it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Improve handling of NullPointerException in MMapDirectory's IndexInputs (check the "closed" condition) [lucene]
uschindler commented on PR #12705: URL: https://github.com/apache/lucene/pull/12705#issuecomment-1774092282 > If that's the case, it seems fine, although a bit fragile to maintain? I argued during the long journey of Panama Foreign to have a specific subclass of IllegalStateException for the case of unmapping a MemorySegment from another thread. It is too late. You could look into exception message but that's even more fragile due to localization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
msokolov commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367900707 ## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ## @@ -635,17 +667,31 @@ private static DocsWithFieldSet writeVectorData( return docsWithField; } + private HnswGraphMerger createGraphMerger( + FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) { +if (exec != null) { + return new ConcurrentHnswMerger( + fieldInfo, scorerSupplier, M, beamWidth, exec, numMergeWorkers); +} +return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth); + } + @Override public void close() throws IOException { IOUtils.close(meta, vectorData, vectorIndex); +if (exec != null) { + exec.shutdownNow(); +} +System.out.println( +"Total contention time: " + NeighborArray.contentionTime.get() / 100 + " ms"); Review Comment: curious what this showed - can you publish some accounting? ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; Review Comment: maybe comment this? I guess it's the number of vectors we handle sequentially in each task? ## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ## @@ -26,17 +26,39 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.KnnFieldVectorsWriter; import org.apache.lucene.codecs.KnnVectorsWriter; -import org.apache.lucene.index.*; Review Comment: I guess spotless likes these *-imports?? surprising ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.ut
Re: [PR] Improve handling of NullPointerException in MMapDirectory's IndexInputs (check the "closed" condition) [lucene]
uschindler commented on PR #12705: URL: https://github.com/apache/lucene/pull/12705#issuecomment-1774105554 > > If that's the case, it seems fine, although a bit fragile to maintain? > > I argued during the long journey of Panama Foreign to have a specific subclass of IllegalStateException for the case of unmapping a MemorySegment from another thread. It is too late. You could look into exception message but that's even more fragile due to localization. I think in contrast to the signalling NPE, the IllegalStateException is not hurting if it appears as cause of the AlreadyClosedException. The error message also tells you that the segment was closed by another thread, which might be helpful. I will think about it and possibly make a new PR that improves this for MemorySegmentIndexInput. I can also figure out if there is a check for MemorySegments if they are unreachable. I think they added something in recent versions. It may not work in Java 19, but later ones can be improved. Uwe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Improve handling of NullPointerException in MMapDirectory's IndexInputs (check the "closed" condition) [lucene]
uschindler commented on PR #12705: URL: https://github.com/apache/lucene/pull/12705#issuecomment-1774106700 You can call `segment.scope().isAlive()` to figure out if the scope is still alive. This works for Java 20+. The Java 19 version can't use this. I will possibly create a new PR to handle this and also rethrow the IllegalStateException as done here for NPE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
[PR] MMapDirectory with MemorySegment: Confirm that scope/session is no longer alive before throwing AlreadyClosedException [lucene]
uschindler opened a new pull request, #12707: URL: https://github.com/apache/lucene/pull/12707 Followup on #12705: With memory segments we get an IllegalStateException. Instead of always rewriting it to AlreadyClosedException we confirm before if the segment scope (session in Java 19) is no longer alive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Improve handling of NullPointerException in MMapDirectory's IndexInputs (check the "closed" condition) [lucene]
uschindler commented on PR #12705: URL: https://github.com/apache/lucene/pull/12705#issuecomment-1774116253 I improved the IllegalStateHandling in #12707 in the same way by confirming the state of the segment's scope (Java20+) / session (Java19). @msokolov: Please have a quick look before I merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] MMapDirectory with MemorySegment: Confirm that scope/session is no longer alive before throwing AlreadyClosedException [lucene]
uschindler merged PR #12707: URL: https://github.com/apache/lucene/pull/12707 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [I] Improve hash mixing in FST's double-barrel LRU hash [lucene]
dweiss commented on issue #12704: URL: https://github.com/apache/lucene/issues/12704#issuecomment-1774156970 I borrowed that constant in BitMixer from Sebastiano Vigna, I believe. Here is a nice overview of its origin/ rationale: https://softwareengineering.stackexchange.com/questions/402542/where-do-magic-hashing-constants-like-0x9e3779b9-and-0x9e3779b1-come-from I can only confirm that a good hash redistribution function, along with linear probing, give very good results in most hash/index redistribution problems I've seen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367952944 ## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ## @@ -50,13 +72,24 @@ public final class Lucene95HnswVectorsWriter extends KnnVectorsWriter { private final IndexOutput meta, vectorData, vectorIndex; private final int M; private final int beamWidth; + private final int numMergeWorkers; + private final ExecutorService exec; private final List> fields = new ArrayList<>(); private boolean finished; - Lucene95HnswVectorsWriter(SegmentWriteState state, int M, int beamWidth) throws IOException { + Lucene95HnswVectorsWriter(SegmentWriteState state, int M, int beamWidth, int numMergeWorkers) + throws IOException { this.M = M; this.beamWidth = beamWidth; +this.numMergeWorkers = numMergeWorkers; +if (numMergeWorkers <= 1) { + exec = null; +} else { + exec = + Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("hnsw-merge")); Review Comment: Yeah you're right. BTW I'm thinking whether it will be better to have a big threadpool at the Format class and pass it in to every writer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367953194 ## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ## @@ -635,17 +667,31 @@ private static DocsWithFieldSet writeVectorData( return docsWithField; } + private HnswGraphMerger createGraphMerger( + FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) { +if (exec != null) { + return new ConcurrentHnswMerger( + fieldInfo, scorerSupplier, M, beamWidth, exec, numMergeWorkers); +} +return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth); + } + @Override public void close() throws IOException { IOUtils.close(meta, vectorData, vectorIndex); +if (exec != null) { + exec.shutdownNow(); +} +System.out.println( +"Total contention time: " + NeighborArray.contentionTime.get() / 100 + " ms"); Review Comment: Let me post one to the description. In general it's normally 5000ms for a 100k docs merge using 8 threads. (single thread time roughly 95000 ms, multithread wall time roughly 17000 ms) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367953519 ## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ## @@ -26,17 +26,39 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.KnnFieldVectorsWriter; import org.apache.lucene.codecs.KnnVectorsWriter; -import org.apache.lucene.index.*; Review Comment: LOL I always see some import adjustment after `gradlew tidy` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367953845 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); Review Comment: It's set by calling `setInfoStream`, see `IncrementalHnswGraphMerger#merge()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367953931 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); + + public HnswConcurrentMergeBuilder( + ExecutorService exec, + int numWorker, + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + OnHeapHnswGraph hnsw, + BitSet initializedNodes) + throws IOException { +this.exec = exec; +AtomicInteger workProgress = new AtomicInteger(0); +workers = new ConcurrentMergeWorker[numWorker]; +for (int i = 0; i < numWorker; i++) { + workers[i] = + new ConcurrentMergeWorker( + scorerSupplier.copy(), + M, + beamWidth, + HnswGraphBuilder.randSeed, Review Comment: Yeah I'm not sure what's the best way, but I guess it does not matter that much... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367955218 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); + + public HnswConcurrentMergeBuilder( + ExecutorService exec, + int numWorker, + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + OnHeapHnswGraph hnsw, + BitSet initializedNodes) + throws IOException { +this.exec = exec; +AtomicInteger workProgress = new AtomicInteger(0); +workers = new ConcurrentMergeWorker[numWorker]; +for (int i = 0; i < numWorker; i++) { + workers[i] = + new ConcurrentMergeWorker( + scorerSupplier.copy(), + M, + beamWidth, + HnswGraphBuilder.randSeed, + hnsw, + initializedNodes, + workProgress); +} + } + + @Override + public OnHeapHnswGraph build(int maxOrd) throws IOException { +if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message( + HNSW_COMPONENT, + "build graph from " + maxOrd + " vectors, with " + workers.length + " workers"); +} +List> futures = new ArrayList<>(); +for (int i = 0; i < workers.length; i++) { + int finalI = i; + futures.add( + exec.submit( + () -> { +try { + workers[finalI].run(maxOrd); +} catch (IOException e) { + throw new RuntimeException(e); +} + })); +} +Throwable exc = null; +for (Future future : futures) { + try { +future.get(); + } catch (InterruptedException e) { +var newException = new ThreadInterruptedException(e); +if (exc == null) { + exc = newException; +} else { + exc.addSuppressed(newException); +} + } catch (ExecutionException e) { +if (exc == null) { + exc = e.getCause(); +} else { + exc.addSuppressed(e.getCause()); +} + } +} +if (exc != null) { + // The error handling was copied from TaskExecutor. should we just use TaskExecutor instead? + throw IOUtils.rethrowAlways(exc); +} +return workers[0].getGraph(); + } + + @Override + public void addGraphNode(int node) throws IOException { +throw new UnsupportedOperationException("This builder is for merge only"); + } + + @Override + public void setInfoStream(InfoStream infoStream) { +this.infoStream = infoStream; +for (IHnswGraphBuilder worker : workers) { + worker.setInfoStream(infoStream); +} + } + + @Override + public OnHeapHnswGraph getGraph() { +return workers[0].getGraph(); + } + + private static final class ConcurrentMergeWorker extends HnswGraphBuilder { + +private final AtomicInteger workProgress; +private final BitSet initializedNodes; + +private ConcurrentM
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367955272 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); + + public HnswConcurrentMergeBuilder( + ExecutorService exec, + int numWorker, + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + OnHeapHnswGraph hnsw, + BitSet initializedNodes) + throws IOException { +this.exec = exec; +AtomicInteger workProgress = new AtomicInteger(0); +workers = new ConcurrentMergeWorker[numWorker]; +for (int i = 0; i < numWorker; i++) { + workers[i] = + new ConcurrentMergeWorker( + scorerSupplier.copy(), + M, + beamWidth, + HnswGraphBuilder.randSeed, + hnsw, + initializedNodes, + workProgress); +} + } + + @Override + public OnHeapHnswGraph build(int maxOrd) throws IOException { +if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message( + HNSW_COMPONENT, + "build graph from " + maxOrd + " vectors, with " + workers.length + " workers"); +} +List> futures = new ArrayList<>(); +for (int i = 0; i < workers.length; i++) { + int finalI = i; + futures.add( + exec.submit( + () -> { +try { + workers[finalI].run(maxOrd); +} catch (IOException e) { + throw new RuntimeException(e); +} + })); +} +Throwable exc = null; +for (Future future : futures) { + try { +future.get(); + } catch (InterruptedException e) { +var newException = new ThreadInterruptedException(e); +if (exc == null) { + exc = newException; +} else { + exc.addSuppressed(newException); +} + } catch (ExecutionException e) { +if (exc == null) { + exc = e.getCause(); +} else { + exc.addSuppressed(e.getCause()); +} + } +} +if (exc != null) { + // The error handling was copied from TaskExecutor. should we just use TaskExecutor instead? + throw IOUtils.rethrowAlways(exc); +} +return workers[0].getGraph(); + } + + @Override + public void addGraphNode(int node) throws IOException { +throw new UnsupportedOperationException("This builder is for merge only"); + } + + @Override + public void setInfoStream(InfoStream infoStream) { +this.infoStream = infoStream; +for (IHnswGraphBuilder worker : workers) { + worker.setInfoStream(infoStream); +} + } + + @Override + public OnHeapHnswGraph getGraph() { +return workers[0].getGraph(); + } + + private static final class ConcurrentMergeWorker extends HnswGraphBuilder { + +private final AtomicInteger workProgress; +private final BitSet initializedNodes; + +private ConcurrentM
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367955515 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ## @@ -33,7 +33,7 @@ * Builder for HNSW graph. See {@link HnswGraph} for a gloss on the algorithm and the meaning of the * hyper-parameters. */ -public class HnswGraphBuilder { +public class HnswGraphBuilder implements IHnswGraphBuilder { Review Comment: Yes I noticed that, I use the `IHnswGraphBuilder` just to avoid changing the name of `HnswGraphBuilder`, otherwise it will create a few more diff files that are making the review harder. I can for sure change the class name accordingly after the code is fully reviewed and approved :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367955644 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; -for (int node = 0; node < maxOrd; node++) { +if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); +} +//System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + +// initializedNodes.size()); +for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 1 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } +//System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + +// hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) Review Comment: It was previous code that's causing the warning. I'll remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367956157 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; -for (int node = 0; node < maxOrd; node++) { +if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); +} +//System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + +// initializedNodes.size()); +for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 1 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } +//System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + +// hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { +addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { +/* +Note: this implementation is thread safe when graph size is fixed (e.g. when merging) +The process of adding a node is roughly: +1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done +2. Do the search from top to bottom, remember all the possible neighbours on each level the node + is on. +3. Add the neighbor to the node from bottom to top level, when adding the neighbour, + we always add all the outgoing links first before adding incoming link such that + when a search visiting this node, it can always find a way out +4. If the node has level that is less or equal to graph level, then we're done here. + If the node has level larger than graph level, then we need to promote the node + as the entry node. If, while we add the node to the graph, the entry node has changed + (which means the graph level has changed as well), we need to reinsert the node + to the newly introduced levels (repeating step 2,3 for new levels) and again try to + promote the node to entry node. +*/ RandomVectorScorer scorer = scorerSupplier.scorer(node); final int nodeLevel = getRandomGraphLevel(ml, random); -int curMaxLevel = hnsw.numLevels() - 1; - -// If entrynode is -1, then this should finish without adding neighbors -if (hnsw.entryNode() == -1) { - for (int level = nodeLevel; level >= 0; level--) { -hnsw.addNode(level, node); - } +// first add nodes to all levels +for (int level = nodeLevel; level >= 0; level--) { + hnsw.addNode(level, node); +} +// then promote itself as entry node if entry node is not set +if (hnsw.trySetNewEntryNode(node, nodeLevel)) { return; } -int[] eps = new int[] {hnsw.entryNode()}; +// if the entry node is already set, then we have to do all connections first before we can +// promote ourselves as entry node +// do connections from bottom up Review Comment: Ah this comment was misplaced, sorry for the confusion, I will update it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367956372 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; -for (int node = 0; node < maxOrd; node++) { +if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); +} +//System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + +// initializedNodes.size()); +for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 1 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } +//System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + +// hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { +addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { +/* +Note: this implementation is thread safe when graph size is fixed (e.g. when merging) +The process of adding a node is roughly: +1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done +2. Do the search from top to bottom, remember all the possible neighbours on each level the node + is on. +3. Add the neighbor to the node from bottom to top level, when adding the neighbour, + we always add all the outgoing links first before adding incoming link such that + when a search visiting this node, it can always find a way out +4. If the node has level that is less or equal to graph level, then we're done here. + If the node has level larger than graph level, then we need to promote the node + as the entry node. If, while we add the node to the graph, the entry node has changed + (which means the graph level has changed as well), we need to reinsert the node + to the newly introduced levels (repeating step 2,3 for new levels) and again try to + promote the node to entry node. +*/ RandomVectorScorer scorer = scorerSupplier.scorer(node); final int nodeLevel = getRandomGraphLevel(ml, random); -int curMaxLevel = hnsw.numLevels() - 1; - -// If entrynode is -1, then this should finish without adding neighbors -if (hnsw.entryNode() == -1) { - for (int level = nodeLevel; level >= 0; level--) { -hnsw.addNode(level, node); - } +// first add nodes to all levels +for (int level = nodeLevel; level >= 0; level--) { + hnsw.addNode(level, node); +} +// then promote itself as entry node if entry node is not set +if (hnsw.trySetNewEntryNode(node, nodeLevel)) { return; } -int[] eps = new int[] {hnsw.entryNode()}; +// if the entry node is already set, then we have to do all connections first before we can +// promote ourselves as entry node +// do connections from bottom up +int lowestUnsetLevel = 0; +int curMaxLevel; +do { + curMaxLevel = hnsw.numLevels() - 1; + // NOTE: the entry node and max level may not be paired, but because we get the level first + // we ensure that the entry node we get later will always exist on the curMaxLevel + int[] eps = new int[] {hnsw.entryNode()}; + // for levels > nodeLevel search with topk = 1 + GraphBuilderKnnCollector candidates = entryCandidates; + for (int level = curMaxLevel; level > nodeLevel; level--) { +candidates.clear(); +graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); +eps = new int[] {candidates.popNode()}; Review Comment: good catch, let me change it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apach
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367956726 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ## @@ -151,61 +159,124 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return hnsw; } - /** Set info-stream to output debugging information * */ + @Override public void setInfoStream(InfoStream infoStream) { this.infoStream = infoStream; } + @Override public OnHeapHnswGraph getGraph() { return hnsw; } - private void addVectors(int maxOrd) throws IOException { + protected void addVectors(int minOrd, int maxOrd) throws IOException { long start = System.nanoTime(), t = start; -for (int node = 0; node < maxOrd; node++) { +if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")"); +} +//System.out.println("addVectors [" + minOrd + " " + maxOrd + ") initialized.size=" + +// initializedNodes.size()); +for (int node = minOrd; node < maxOrd; node++) { + // System.out.println("add node " + node + " t=" + Thread.currentThread().getName()); addGraphNode(node); + // System.out.println("entry node " + hnsw.entryNode()); + // System.out.println("node " + node + " nbrs.size()=" + hnsw.getNeighbors(0, node).size()); if ((node % 1 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) { t = printGraphBuildStatus(node, start, t); } } +//System.out.println("addVectors [" + minOrd + " " + maxOrd + ") done + graph.size=" + +// hnsw.size()); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void addVectors(int maxOrd) throws IOException { +addVectors(0, maxOrd); } - /** Inserts a doc with vector value to the graph */ + @Override public void addGraphNode(int node) throws IOException { +/* +Note: this implementation is thread safe when graph size is fixed (e.g. when merging) +The process of adding a node is roughly: +1. Add the node to all level from top to the bottom, but do not connect it to any other node, + nor try to promote itself to an entry node before the connection is done +2. Do the search from top to bottom, remember all the possible neighbours on each level the node + is on. +3. Add the neighbor to the node from bottom to top level, when adding the neighbour, + we always add all the outgoing links first before adding incoming link such that + when a search visiting this node, it can always find a way out +4. If the node has level that is less or equal to graph level, then we're done here. + If the node has level larger than graph level, then we need to promote the node + as the entry node. If, while we add the node to the graph, the entry node has changed + (which means the graph level has changed as well), we need to reinsert the node + to the newly introduced levels (repeating step 2,3 for new levels) and again try to + promote the node to entry node. +*/ RandomVectorScorer scorer = scorerSupplier.scorer(node); final int nodeLevel = getRandomGraphLevel(ml, random); -int curMaxLevel = hnsw.numLevels() - 1; - -// If entrynode is -1, then this should finish without adding neighbors -if (hnsw.entryNode() == -1) { - for (int level = nodeLevel; level >= 0; level--) { -hnsw.addNode(level, node); - } +// first add nodes to all levels +for (int level = nodeLevel; level >= 0; level--) { + hnsw.addNode(level, node); +} +// then promote itself as entry node if entry node is not set +if (hnsw.trySetNewEntryNode(node, nodeLevel)) { return; } -int[] eps = new int[] {hnsw.entryNode()}; +// if the entry node is already set, then we have to do all connections first before we can +// promote ourselves as entry node +// do connections from bottom up +int lowestUnsetLevel = 0; +int curMaxLevel; +do { + curMaxLevel = hnsw.numLevels() - 1; + // NOTE: the entry node and max level may not be paired, but because we get the level first + // we ensure that the entry node we get later will always exist on the curMaxLevel + int[] eps = new int[] {hnsw.entryNode()}; + // for levels > nodeLevel search with topk = 1 + GraphBuilderKnnCollector candidates = entryCandidates; + for (int level = curMaxLevel; level > nodeLevel; level--) { +candidates.clear(); +graphSearcher.searchLevel(candidates, scorer, level, eps, hnsw, null); +eps = new int[] {candidates.popNode()}; + } -// if a node introduces new levels to the graph, add this new node on new levels -for (int level = nodeLevel; level > curMaxLevel; level--) { - hnsw.addNode(level, node); -} + // for levels <= nodeLevel search with topk = beamWidth, and add
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367956886 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ## @@ -221,34 +292,39 @@ private long printGraphBuildStatus(int node, long start, long t) { return now; } - private void addDiverseNeighbors(int level, int node, GraphBuilderKnnCollector candidates) - throws IOException { + private void addDiverseNeighbors(int level, int node, NeighborArray scratch) throws IOException { /* For each of the beamWidth nearest candidates (going from best to worst), select it only if it * is closer to target than it is to any of the already-selected neighbors (ie selected in this method, * since the node is new and has no prior neighbors). */ NeighborArray neighbors = hnsw.getNeighbors(level, node); assert neighbors.size() == 0; // new node -popToScratch(candidates); int maxConnOnLevel = level == 0 ? M * 2 : M; -selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel); +boolean[] mask = selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel); // Link the selected nodes to the new node, and the new node to the selected nodes (again // applying diversity heuristic) -int size = neighbors.size(); -for (int i = 0; i < size; i++) { - int nbr = neighbors.node[i]; +for (int i = 0; i < scratch.size(); i++) { + if (mask[i] == false) { +continue; + } + int nbr = scratch.node[i]; NeighborArray nbrsOfNbr = hnsw.getNeighbors(level, nbr); - nbrsOfNbr.addOutOfOrder(node, neighbors.score[i]); + long start = System.nanoTime(); + nbrsOfNbr.rwlock.writeLock().lock(); + NeighborArray.contentionTime.addAndGet(System.nanoTime() - start); + nbrsOfNbr.addOutOfOrder(node, scratch.score[i]); if (nbrsOfNbr.size() > maxConnOnLevel) { int indexToRemove = findWorstNonDiverse(nbrsOfNbr, nbr); nbrsOfNbr.removeIndex(indexToRemove); } + nbrsOfNbr.rwlock.writeLock().unlock(); Review Comment: Yes, thanks for catching that, this is the earlier code that I forgot to improve. Let me change both places -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367957707 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ## @@ -221,34 +292,39 @@ private long printGraphBuildStatus(int node, long start, long t) { return now; } - private void addDiverseNeighbors(int level, int node, GraphBuilderKnnCollector candidates) - throws IOException { + private void addDiverseNeighbors(int level, int node, NeighborArray scratch) throws IOException { /* For each of the beamWidth nearest candidates (going from best to worst), select it only if it * is closer to target than it is to any of the already-selected neighbors (ie selected in this method, * since the node is new and has no prior neighbors). */ NeighborArray neighbors = hnsw.getNeighbors(level, node); assert neighbors.size() == 0; // new node -popToScratch(candidates); int maxConnOnLevel = level == 0 ? M * 2 : M; -selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel); +boolean[] mask = selectAndLinkDiverse(neighbors, scratch, maxConnOnLevel); // Link the selected nodes to the new node, and the new node to the selected nodes (again // applying diversity heuristic) -int size = neighbors.size(); -for (int i = 0; i < size; i++) { - int nbr = neighbors.node[i]; +for (int i = 0; i < scratch.size(); i++) { + if (mask[i] == false) { Review Comment: Sure, I added both javadoc and some more comments in `addDiverseNeighbors` method to explain why we need it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367958357 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphMerger.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import java.io.IOException; +import org.apache.lucene.codecs.KnnVectorsReader; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.InfoStream; + +/** + * Abstraction of merging multiple sealed graph into one on heap graph Review Comment: Yeah I'm not sure what's the word here, I guess I just want to say "merge from multiple on-disk graph to one on heap graph"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367958754 ## lucene/core/src/test/org/apache/lucene/util/hnsw/HnswGraphTestCase.java: ## @@ -709,6 +710,7 @@ public void testHnswGraphBuilderInvalid() throws IOException { IllegalArgumentException.class, () -> HnswGraphBuilder.create(scorerSupplier, 10, 0, 0)); } + @Ignore Review Comment: Ah, this is something I need some help, I think the real problem here is this line ``` ramUsed(hnsw) ``` Cannot evaluate how much ram is used by the read write lock attached to each neighborArray... So it throws error however I do the estimation... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
msokolov commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367971257 ## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ## @@ -635,17 +667,31 @@ private static DocsWithFieldSet writeVectorData( return docsWithField; } + private HnswGraphMerger createGraphMerger( + FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) { +if (exec != null) { + return new ConcurrentHnswMerger( + fieldInfo, scorerSupplier, M, beamWidth, exec, numMergeWorkers); +} +return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth); + } + @Override public void close() throws IOException { IOUtils.close(meta, vectorData, vectorIndex); +if (exec != null) { + exec.shutdownNow(); +} +System.out.println( +"Total contention time: " + NeighborArray.contentionTime.get() / 100 + " ms"); Review Comment: Oooh, that's quite a bit! Almost 1/3 of the time spent waiting for locks! I wonder how it would be w/2 threads or 4, and then again with 1M docs. In any case the speedup is great and we can still run the CPU on other tasks while we're waiting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
msokolov commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367971976 ## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ## @@ -50,13 +72,24 @@ public final class Lucene95HnswVectorsWriter extends KnnVectorsWriter { private final IndexOutput meta, vectorData, vectorIndex; private final int M; private final int beamWidth; + private final int numMergeWorkers; + private final ExecutorService exec; private final List> fields = new ArrayList<>(); private boolean finished; - Lucene95HnswVectorsWriter(SegmentWriteState state, int M, int beamWidth) throws IOException { + Lucene95HnswVectorsWriter(SegmentWriteState state, int M, int beamWidth, int numMergeWorkers) + throws IOException { this.M = M; this.beamWidth = beamWidth; +this.numMergeWorkers = numMergeWorkers; +if (numMergeWorkers <= 1) { + exec = null; +} else { + exec = + Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("hnsw-merge")); Review Comment: So I *think* that these merge operations are triggered by a single background merge thread? But we shouldn't rely on that and should provide global control over resources dedicate to this, so something like that (thread pool in Format) makes sense. However conceptually the Format doesn't seem right to me - as a user, I would want to be able to control this from IndexWriterConfig along with other merge-related parameters in IndexWriter like buffer size and number of docs before flush. Possibly we could have other concurrent processes running during background merge (like merging different formats in parallel) and we don't want to have separate thread pools for all of those. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
msokolov commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367972354 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); + + public HnswConcurrentMergeBuilder( + ExecutorService exec, + int numWorker, + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + OnHeapHnswGraph hnsw, + BitSet initializedNodes) + throws IOException { +this.exec = exec; +AtomicInteger workProgress = new AtomicInteger(0); +workers = new ConcurrentMergeWorker[numWorker]; +for (int i = 0; i < numWorker; i++) { + workers[i] = + new ConcurrentMergeWorker( + scorerSupplier.copy(), + M, + beamWidth, + HnswGraphBuilder.randSeed, + hnsw, + initializedNodes, + workProgress); +} + } + + @Override + public OnHeapHnswGraph build(int maxOrd) throws IOException { +if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message( + HNSW_COMPONENT, + "build graph from " + maxOrd + " vectors, with " + workers.length + " workers"); +} +List> futures = new ArrayList<>(); +for (int i = 0; i < workers.length; i++) { + int finalI = i; + futures.add( + exec.submit( + () -> { +try { + workers[finalI].run(maxOrd); +} catch (IOException e) { + throw new RuntimeException(e); +} + })); +} +Throwable exc = null; +for (Future future : futures) { + try { +future.get(); + } catch (InterruptedException e) { +var newException = new ThreadInterruptedException(e); +if (exc == null) { + exc = newException; +} else { + exc.addSuppressed(newException); +} + } catch (ExecutionException e) { +if (exc == null) { + exc = e.getCause(); +} else { + exc.addSuppressed(e.getCause()); +} + } +} +if (exc != null) { + // The error handling was copied from TaskExecutor. should we just use TaskExecutor instead? + throw IOUtils.rethrowAlways(exc); +} +return workers[0].getGraph(); + } + + @Override + public void addGraphNode(int node) throws IOException { +throw new UnsupportedOperationException("This builder is for merge only"); + } + + @Override + public void setInfoStream(InfoStream infoStream) { +this.infoStream = infoStream; +for (IHnswGraphBuilder worker : workers) { + worker.setInfoStream(infoStream); +} + } + + @Override + public OnHeapHnswGraph getGraph() { +return workers[0].getGraph(); + } + + private static final class ConcurrentMergeWorker extends HnswGraphBuilder { + +private final AtomicInteger workProgress; +private final BitSet initializedNodes; + +private Concurre
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367981481 ## lucene/core/src/test/org/apache/lucene/util/hnsw/HnswGraphTestCase.java: ## @@ -709,6 +710,7 @@ public void testHnswGraphBuilderInvalid() throws IOException { IllegalArgumentException.class, () -> HnswGraphBuilder.create(scorerSupplier, 10, 0, 0)); } + @Ignore Review Comment: Oh I guess that's the solution, the error I saw was: ``` java.lang.RuntimeException: Can't access field 'readerLock' of class 'java.util.concurrent.locks.ReentrantReadWriteLock' for RAM estimation. at __randomizedtesting.SeedInfo.seed([9EFF02730D10D00B:5BBB6633369B07BC]:0) at org.apache.lucene.tests.util.RamUsageTester.lambda$createCacheEntry$3(RamUsageTester.java:345) at java.base/java.security.AccessController.doPrivileged(AccessController.java:318) at org.apache.lucene.tests.util.RamUsageTester.createCacheEntry(RamUsageTester.java:322) at org.apache.lucene.tests.util.RamUsageTester.handleOther(RamUsageTester.java:214) at org.apache.lucene.tests.util.RamUsageTester.measureObjectSize(RamUsageTester.java:137) at org.apache.lucene.tests.util.RamUsageTester.ramUsed(RamUsageTester.java:88) ``` Let me try to add lock to that ignore list -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367982181 ## lucene/core/src/test/org/apache/lucene/util/hnsw/HnswGraphTestCase.java: ## @@ -709,6 +710,7 @@ public void testHnswGraphBuilderInvalid() throws IOException { IllegalArgumentException.class, () -> HnswGraphBuilder.create(scorerSupplier, 10, 0, 0)); } + @Ignore Review Comment: Yeah after I added the lock and the AtomicReference to the ignore list it works :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367984246 ## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ## @@ -50,13 +72,24 @@ public final class Lucene95HnswVectorsWriter extends KnnVectorsWriter { private final IndexOutput meta, vectorData, vectorIndex; private final int M; private final int beamWidth; + private final int numMergeWorkers; + private final ExecutorService exec; private final List> fields = new ArrayList<>(); private boolean finished; - Lucene95HnswVectorsWriter(SegmentWriteState state, int M, int beamWidth) throws IOException { + Lucene95HnswVectorsWriter(SegmentWriteState state, int M, int beamWidth, int numMergeWorkers) + throws IOException { this.M = M; this.beamWidth = beamWidth; +this.numMergeWorkers = numMergeWorkers; +if (numMergeWorkers <= 1) { + exec = null; +} else { + exec = + Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("hnsw-merge")); Review Comment: But this setting should only affect HNSW merge but not necessary all other (potential future) KNN merge? So if we put it to IWC then whether that's a bit too general, so maybe put it into format is ok? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367984559 ## lucene/core/src/java/org/apache/lucene/codecs/lucene95/Lucene95HnswVectorsWriter.java: ## @@ -635,17 +667,31 @@ private static DocsWithFieldSet writeVectorData( return docsWithField; } + private HnswGraphMerger createGraphMerger( + FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) { +if (exec != null) { + return new ConcurrentHnswMerger( + fieldInfo, scorerSupplier, M, beamWidth, exec, numMergeWorkers); +} +return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth); + } + @Override public void close() throws IOException { IOUtils.close(meta, vectorData, vectorIndex); +if (exec != null) { + exec.shutdownNow(); +} +System.out.println( +"Total contention time: " + NeighborArray.contentionTime.get() / 100 + " ms"); Review Comment: > Almost 1/3 of the time spent waiting for locks! I don't think so? the 5000 ms is the cpu time and 17k ms is wall time, so roughly if we divide the 5000ms into 8 then it's about 4~5% of total wall time spent in waiting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367986199 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); + + public HnswConcurrentMergeBuilder( + ExecutorService exec, + int numWorker, + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + OnHeapHnswGraph hnsw, + BitSet initializedNodes) + throws IOException { +this.exec = exec; +AtomicInteger workProgress = new AtomicInteger(0); +workers = new ConcurrentMergeWorker[numWorker]; +for (int i = 0; i < numWorker; i++) { + workers[i] = + new ConcurrentMergeWorker( + scorerSupplier.copy(), + M, + beamWidth, + HnswGraphBuilder.randSeed, + hnsw, + initializedNodes, + workProgress); +} + } + + @Override + public OnHeapHnswGraph build(int maxOrd) throws IOException { +if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message( + HNSW_COMPONENT, + "build graph from " + maxOrd + " vectors, with " + workers.length + " workers"); +} +List> futures = new ArrayList<>(); +for (int i = 0; i < workers.length; i++) { + int finalI = i; + futures.add( + exec.submit( + () -> { +try { + workers[finalI].run(maxOrd); +} catch (IOException e) { + throw new RuntimeException(e); +} + })); +} +Throwable exc = null; +for (Future future : futures) { + try { +future.get(); + } catch (InterruptedException e) { +var newException = new ThreadInterruptedException(e); +if (exc == null) { + exc = newException; +} else { + exc.addSuppressed(newException); +} + } catch (ExecutionException e) { +if (exc == null) { + exc = e.getCause(); +} else { + exc.addSuppressed(e.getCause()); +} + } +} +if (exc != null) { + // The error handling was copied from TaskExecutor. should we just use TaskExecutor instead? + throw IOUtils.rethrowAlways(exc); +} +return workers[0].getGraph(); + } + + @Override + public void addGraphNode(int node) throws IOException { +throw new UnsupportedOperationException("This builder is for merge only"); + } + + @Override + public void setInfoStream(InfoStream infoStream) { +this.infoStream = infoStream; +for (IHnswGraphBuilder worker : workers) { + worker.setInfoStream(infoStream); +} + } + + @Override + public OnHeapHnswGraph getGraph() { +return workers[0].getGraph(); + } + + private static final class ConcurrentMergeWorker extends HnswGraphBuilder { + +private final AtomicInteger workProgress; +private final BitSet initializedNodes; + +private ConcurrentM
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367986490 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java: ## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.util.hnsw; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.apache.lucene.util.hnsw.HnswGraphBuilder.HNSW_COMPONENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** + * A graph builder that manages multiple workers, it only supports adding the whole graph all at + * once. It will spawn a thread for each worker and the workers will pick the work in batches. + */ +public class HnswConcurrentMergeBuilder implements IHnswGraphBuilder { + + private static final int BATCH_SIZE = 2048; + + private final ExecutorService exec; + private final ConcurrentMergeWorker[] workers; + private InfoStream infoStream = InfoStream.getDefault(); + + public HnswConcurrentMergeBuilder( + ExecutorService exec, + int numWorker, + RandomVectorScorerSupplier scorerSupplier, + int M, + int beamWidth, + OnHeapHnswGraph hnsw, + BitSet initializedNodes) + throws IOException { +this.exec = exec; +AtomicInteger workProgress = new AtomicInteger(0); +workers = new ConcurrentMergeWorker[numWorker]; +for (int i = 0; i < numWorker; i++) { + workers[i] = + new ConcurrentMergeWorker( + scorerSupplier.copy(), + M, + beamWidth, + HnswGraphBuilder.randSeed, + hnsw, + initializedNodes, + workProgress); +} + } + + @Override + public OnHeapHnswGraph build(int maxOrd) throws IOException { +if (infoStream.isEnabled(HNSW_COMPONENT)) { + infoStream.message( + HNSW_COMPONENT, + "build graph from " + maxOrd + " vectors, with " + workers.length + " workers"); +} +List> futures = new ArrayList<>(); +for (int i = 0; i < workers.length; i++) { + int finalI = i; + futures.add( + exec.submit( + () -> { +try { + workers[finalI].run(maxOrd); +} catch (IOException e) { + throw new RuntimeException(e); +} + })); +} +Throwable exc = null; +for (Future future : futures) { + try { +future.get(); + } catch (InterruptedException e) { +var newException = new ThreadInterruptedException(e); +if (exc == null) { + exc = newException; +} else { + exc.addSuppressed(newException); +} + } catch (ExecutionException e) { +if (exc == null) { + exc = e.getCause(); +} else { + exc.addSuppressed(e.getCause()); +} + } +} +if (exc != null) { + // The error handling was copied from TaskExecutor. should we just use TaskExecutor instead? + throw IOUtils.rethrowAlways(exc); +} +return workers[0].getGraph(); + } + + @Override + public void addGraphNode(int node) throws IOException { +throw new UnsupportedOperationException("This builder is for merge only"); + } + + @Override + public void setInfoStream(InfoStream infoStream) { +this.infoStream = infoStream; +for (IHnswGraphBuilder worker : workers) { + worker.setInfoStream(infoStream); +} + } + + @Override + public OnHeapHnswGraph getGraph() { +return workers[0].getGraph(); + } + + private static final class ConcurrentMergeWorker extends HnswGraphBuilder { + +private final AtomicInteger workProgress; +private final BitSet initializedNodes; + +private ConcurrentM
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on code in PR #12660: URL: https://github.com/apache/lucene/pull/12660#discussion_r1367986729 ## lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java: ## @@ -33,7 +33,7 @@ * Builder for HNSW graph. See {@link HnswGraph} for a gloss on the algorithm and the meaning of the * hyper-parameters. */ -public class HnswGraphBuilder { +public class HnswGraphBuilder implements IHnswGraphBuilder { Review Comment: I'm not sure... But anyway let me keep the name and change it after other parts are reviewed? I'll add a `nocommit`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [I] Don't provide two ways to build an FST [lucene]
cavorite commented on issue #12695: URL: https://github.com/apache/lucene/issues/12695#issuecomment-1774229048 I'm be willing to work on this issues (as a way to get more familiar with Lucene's internal code base). First, I'd like to see if I'm understanding the work needed. So far, it seems that the refactoring is relatively straightforward to do: replace the invocations of the constructor of `FSTCompiler` by creating an instance of `Builder` and then calling the `build()` method. For instance, replacing the line: ``` FSTCompiler fstCompiler = new FSTCompiler<>(FST.INPUT_TYPE.BYTE4, outputs); ``` by this one: ``` FSTCompiler fstCompiler = new FSTCompiler.Builder<>(FST.INPUT_TYPE.BYTE4, outputs).build(); ``` There are about 40 usages of the constructor in the repositoy (including modules other than `core`), so we need to replace the usages in all of them and also mark the constructor as deprecated (so that it can be deleted in an upcoming major release). Would that be all what's necessary for this issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [I] Don't provide two ways to build an FST [lucene]
mikemccand commented on issue #12695: URL: https://github.com/apache/lucene/issues/12695#issuecomment-1774232380 Yes that's exactly the idea! Thank you @cavorite for tackling this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [I] Adding option to codec to disable patching in Lucene's PFOR encoding [lucene]
mikemccand commented on issue #12696: URL: https://github.com/apache/lucene/issues/12696#issuecomment-1774236604 > Should we just do more tests and start writing indexes without patching? Only a 4 percent disk savings? It is a lot of complexity, especially to vectorize. A runtime option is more expensive because then we have to make sure indexes encoded both ways can be read, it only adds more complexity imo +1 to remove patching entirely! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [PR] Concurrent HNSW Merge [lucene]
zhaih commented on PR #12660: URL: https://github.com/apache/lucene/pull/12660#issuecomment-1774469691 > I would be curious to see the contention times and also understand how this changes CPU usage vs. single-threaded. @msokolov as for CPU usage, I just tested with 1M docs, and on my laptop I can see when it's doing forceMerge it constantly ranges from 790~810%, while if a merge triggers when adding the document, the CPU can goes to 900%. I would like to see how it scales to, say 20 or even 40 cores, but unfortunately this laptop with 10 cores (including 2 efficient core) is the only one I have for now... BTW the forceMerge of the 1M index took `362125 ms`, while the contention time for that merge was `81847 ms` in CPU time, so average `10230 ms` wall time and thus `~2.8%` of total time. And array copy time was `12151 ms` in CPU time, and average `1519 ms` wall time and thus `~0.4%` of total time. So you're right, comparing with result of 100k documents, the contention overhead goes down when the graph is getting bigger. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org
Re: [I] Enable recursive graph bisection out of the box? [lucene]
jpountz commented on issue #12665: URL: https://github.com/apache/lucene/issues/12665#issuecomment-1774523421 These sound like great ideas! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org