quux00 commented on code in PR #12523:
URL: https://github.com/apache/lucene/pull/12523#discussion_r1340643119


##########
lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java:
##########
@@ -267,11 +266,130 @@ protected LeafSlice[] slices(List<LeafReaderContext> 
leaves) {
             return slices.toArray(new LeafSlice[0]);
           }
         };
-    searcher.search(new MatchAllDocsQuery(), 10);
+    TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
+    assertTrue(topDocs.totalHits.value > 0);
     if (leaves.size() <= 1) {
       assertEquals(0, numExecutions.get());
     } else {
       assertEquals(leaves.size(), numExecutions.get());
     }
   }
+
+  /**
+   * Tests that when IndexerSearcher runs concurrent searches on multiple 
slices if any Exception is
+   * thrown by one of the slice tasks, it will not return until all tasks have 
completed.
+   *
+   * <p>Without a larger refactoring of the Lucene IndexSearcher and/or 
TaskExecutor there isn't a
+   * clean deterministic way to test this. This test is probabilistic using 
short timeouts in the
+   * tasks that do not throw an Exception.
+   */
+  public void testMultipleSegmentsOnTheExecutorWithException() {
+    List<LeafReaderContext> leaves = reader.leaves();
+    int fixedThreads = leaves.size() == 1 ? 1 : leaves.size() / 2;
+
+    ExecutorService fixedThreadPoolExecutor =
+        Executors.newFixedThreadPool(fixedThreads, new 
NamedThreadFactory("concurrent-slices"));
+
+    IndexSearcher searcher =
+        new IndexSearcher(reader, fixedThreadPoolExecutor) {
+          @Override
+          protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+            ArrayList<LeafSlice> slices = new ArrayList<>();
+            for (LeafReaderContext ctx : leaves) {
+              slices.add(new LeafSlice(Arrays.asList(ctx)));
+            }
+            return slices.toArray(new LeafSlice[0]);
+          }
+        };
+
+    try {
+      AtomicInteger callsToScorer = new AtomicInteger(0);
+      int numExceptions = leaves.size() == 1 ? 1 : 
RandomizedTest.randomIntBetween(1, 2);
+      MatchAllOrThrowExceptionQuery query =
+          new MatchAllOrThrowExceptionQuery(numExceptions, callsToScorer);
+      RuntimeException exc = expectThrows(RuntimeException.class, () -> 
searcher.search(query, 10));
+      // if the TaskExecutor didn't wait for all tasks to finish, this assert 
would frequently fail
+      assertEquals(leaves.size(), callsToScorer.get());
+      assertThat(
+          exc.getMessage(), 
Matchers.containsString("MatchAllOrThrowExceptionQuery Exception"));
+    } finally {
+      TestUtil.shutdownExecutorService(fixedThreadPoolExecutor);
+    }
+  }
+
+  private static class MatchAllOrThrowExceptionQuery extends Query {
+
+    private final AtomicInteger numExceptionsToThrow;
+    private final Query delegate;
+    private final AtomicInteger callsToScorer;
+
+    /**
+     * Throws an Exception out of the {@code scorer} method the first {@code 
numExceptions} times it
+     * is called. Otherwise, it delegates all calls to the MatchAllDocsQuery.
+     *
+     * @param numExceptions number of exceptions to throw from scorer method
+     * @param callsToScorer where to record the number of times the {@code 
scorer} method has been
+     *     called
+     */
+    public MatchAllOrThrowExceptionQuery(int numExceptions, AtomicInteger 
callsToScorer) {
+      this.numExceptionsToThrow = new AtomicInteger(numExceptions);
+      this.callsToScorer = callsToScorer;
+      this.delegate = new MatchAllDocsQuery();
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, 
float boost)
+        throws IOException {
+      Weight matchAllWeight = delegate.createWeight(searcher, scoreMode, 
boost);
+
+      return new Weight(delegate) {
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return matchAllWeight.isCacheable(ctx);
+        }
+
+        @Override
+        public Explanation explain(LeafReaderContext context, int doc) throws 
IOException {
+          return matchAllWeight.explain(context, doc);
+        }
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          if (numExceptionsToThrow.getAndDecrement() > 0) {
+            callsToScorer.getAndIncrement();
+            throw new RuntimeException("MatchAllOrThrowExceptionQuery 
Exception");
+          } else {
+            // A small sleep before incrementing the callsToScorer counter 
allows
+            // the task with the Exception to be thrown and if 
TaskExecutor.invokeAll
+            // does not wait until all tasks have finished, then the 
callsToScorer
+            // counter will not match the total number of tasks (or rather 
usually will
+            // not match, since there is a race condition that makes it 
probabilistic).
+            RandomizedTest.sleep(25);

Review Comment:
   I removed the sleep and added a CountDownLatch that acts to wait until all 
exceptions have been thrown before the `scorer` methods that are NOT throwing 
Exceptions have to wait before they increment the `callsToScorer` counter.
   
   With the original implementation of `TaskExecutor#invokeAll` this test fails 
about 80% of the time on my machine (I ran it about 100 times), whereas it 
always passes with the new impl of `invokeAll`. I don't see how to make it 
fully deterministically fail with the old `invokeAll` since we need the 
Exception to always get thrown by the "Exception case" scorers _before_ the 
non-Exception-case scorers finish.  You could put in a latch that the test 
waits upon that gates the non-Exception-case scorers (don't finish until the 
test has caught the Exception from invokeAll) - that would deterministically 
fail with the old version of `invokeAll`, but would never pass with the new 
code (it would hang indefinitely).
   
   Let me know if you see a clean way to make it deterministic with both impls 
of `invokeAll`.



##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -50,16 +51,21 @@ final <T> List<T> invokeAll(Collection<RunnableFuture<T>> 
tasks) {
     for (Runnable task : tasks) {
       executor.execute(task);
     }
+
+    RuntimeException exc = null;
     final List<T> results = new ArrayList<>();
     for (Future<T> future : tasks) {
       try {
         results.add(future.get());
       } catch (InterruptedException e) {
-        throw new ThreadInterruptedException(e);
+        exc = new ThreadInterruptedException(e);
       } catch (ExecutionException e) {
-        throw new RuntimeException(e.getCause());
+        exc = new RuntimeException(e.getCause());

Review Comment:
   I added this change and the tests are passing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to