javanna commented on code in PR #15936:
URL: https://github.com/apache/lucene/pull/15936#discussion_r3371738091


##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/TopGroups.java:
##########
@@ -290,4 +296,151 @@ public static <T> TopGroups<T> merge(
           totalMaxScore);
     }
   }
+
+  private record MergedBlockGroup(Object[] topValues, int shardIndex, int 
groupIndex) {}
+
+  private static class GroupComparator implements Comparator<MergedBlockGroup> 
{
+    @SuppressWarnings("rawtypes")
+    public final FieldComparator[] comparators;
+
+    public final int[] reversed;
+
+    @SuppressWarnings({"rawtypes"})
+    public GroupComparator(Sort groupSort) {
+      final SortField[] sortFields = groupSort.getSort();
+      comparators = new FieldComparator[sortFields.length];
+      reversed = new int[sortFields.length];
+      for (int compIDX = 0; compIDX < sortFields.length; compIDX++) {
+        final SortField sortField = sortFields[compIDX];
+        comparators[compIDX] = sortField.getComparator(1, Pruning.NONE);
+        reversed[compIDX] = sortField.getReverse() ? -1 : 1;
+      }
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked"})
+    public int compare(MergedBlockGroup group, MergedBlockGroup other) {
+      if (group == other) {
+        return 0;
+      }
+      final Object[] groupValues = group.topValues;
+      final Object[] otherValues = other.topValues;
+      for (int compIDX = 0; compIDX < comparators.length; compIDX++) {
+        final int c =
+            reversed[compIDX]
+                * comparators[compIDX].compareValues(groupValues[compIDX], 
otherValues[compIDX]);
+        if (c != 0) {
+          return c;
+        }
+      }
+
+      assert group.shardIndex != other.shardIndex;
+      return group.shardIndex - other.shardIndex;
+    }
+  }
+
+  /**
+   * Merge TopGroups that are partitioned into blocks per shard. This method 
assumes that within
+   * each shard, the groups are sorted according to the groupSort.
+   *
+   * @param shardGroups list of TopGroups, one per shard.
+   * @param groupSort The {@link Sort} used to sort the groups. The top sorted 
document within each
+   *     * group according to groupSort, determines how that group sorts 
against other groups. This

Review Comment:
   could you fix indentation here and the the javadoc format?



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/TopGroups.java:
##########
@@ -290,4 +296,151 @@ public static <T> TopGroups<T> merge(
           totalMaxScore);
     }
   }
+
+  private record MergedBlockGroup(Object[] topValues, int shardIndex, int 
groupIndex) {}
+
+  private static class GroupComparator implements Comparator<MergedBlockGroup> 
{
+    @SuppressWarnings("rawtypes")
+    public final FieldComparator[] comparators;
+
+    public final int[] reversed;

Review Comment:
   do these fields need to be public?



##########
lucene/grouping/src/test/org/apache/lucene/search/grouping/AbstractGroupingTestCase.java:
##########
@@ -16,22 +16,141 @@
  */
 package org.apache.lucene.search.grouping;
 
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexReaderContext;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
 import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.tests.analysis.MockAnalyzer;
 import org.apache.lucene.tests.index.RandomIndexWriter;
 import org.apache.lucene.tests.util.LuceneTestCase;
 import org.apache.lucene.tests.util.TestUtil;
 import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 
 /** Base class for grouping related tests. */
 // TODO (MvG) : The grouping tests contain a lot of code duplication. Try to 
move the common code to
 // this class..
 public abstract class AbstractGroupingTestCase extends LuceneTestCase {
 
+  protected static ExecutorService testExecutor;
+
+  @BeforeClass
+  public static void setUpExecutor() {
+    testExecutor = Executors.newFixedThreadPool(2, new 
NamedThreadFactory("GroupingTest"));
+  }
+
+  @AfterClass
+  public static void tearDownExecutor() {
+    TestUtil.shutdownExecutorService(testExecutor);
+    testExecutor = null;
+  }
+
+  /**
+   * Creates an IndexSearcher with random concurrency. For intra-segment 
concurrency, slices are
+   * aligned on block boundaries using the provided lastDocPerGroup query so 
that no doc block is
+   * ever split across slices.
+   */
+  protected IndexSearcher newIndexSearcher(IndexReader reader, Query 
lastDocPerGroupQuery)

Review Comment:
   I think I see why you had to add the following code to support intra segment 
concurrency. I wonder if this is necessary as part of this PR. I am not sure we 
should have this copy of the slices method and custom randomizations around 
concurrency applied. Could we instead use the `newSearcher` method and provide 
`INTER_SEGMENT` to it as concurrency mode, so that intra-segment is not tested, 
and add a comment that intra segment requires more work? We have this in a few 
others places already



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollectorManager.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.Weight;
+
+/**
+ * A {@link CollectorManager} for {@link BlockGroupingCollector} that merges 
results from multiple
+ * collectors into a single {@link TopGroups}. This is intended for use with 
concurrent search,
+ * where each slice is searched by a separate {@link BlockGroupingCollector}.
+ *
+ * <p>Documents must be indexed as blocks using {@link
+ * org.apache.lucene.index.IndexWriter#addDocuments 
IndexWriter.addDocuments()} or {@link
+ * org.apache.lucene.index.IndexWriter#updateDocuments 
IndexWriter.updateDocuments()}.
+ *
+ * <p><b>NOTE</b>: All documents in a group block must be processed by the 
same {@link
+ * BlockGroupingCollector} instance. This means that the {@link
+ * org.apache.lucene.search.IndexSearcher}'s slices must not split a segment 
in a way that places
+ * documents from the same block into different slices. The default {@link
+ * org.apache.lucene.search.IndexSearcher#slices} implementation 
(inter-segment only) satisfies this
+ * constraint. If intra-segment concurrency is desired, the caller must 
override {@link
+ * org.apache.lucene.search.IndexSearcher#slices} to ensure each doc block 
falls entirely within one
+ * slice.
+ *
+ * <p>See {@link BlockGroupingCollector} for more details.
+ *
+ * <p>Example usage:
+ *
+ * <pre class="prettyprint">
+ * IndexSearcher searcher = ...; // your IndexSearcher
+ * Query lastDocPerGroupQuery = new TermQuery(new Term("groupEnd", "true"));
+ * Weight lastDocPerGroup = searcher.createWeight(
+ *     searcher.rewrite(lastDocPerGroupQuery), ScoreMode.COMPLETE_NO_SCORES, 
1);
+ *
+ * BlockGroupingCollectorManager&lt;BytesRef&gt; manager = new 
BlockGroupingCollectorManager&lt;&gt;(
+ *     Sort.RELEVANCE,   // groupSort
+ *     0,                // groupOffset
+ *     10,               // topNGroups
+ *     true,             // needsScores
+ *     lastDocPerGroup,
+ *     Sort.RELEVANCE,   // withinGroupSort
+ *     0,                // withinGroupOffset
+ *     5);               // maxDocsPerGroup
+ *
+ * TopGroups&lt;BytesRef&gt; result = searcher.search(query, manager);
+ * </pre>
+ *
+ * @lucene.experimental
+ */
+public class BlockGroupingCollectorManager<T>
+    implements CollectorManager<BlockGroupingCollector, TopGroups<T>> {
+
+  private final Sort groupSort;
+  private final int groupOffset;
+  private final int topNGroups;
+  private final boolean needsScores;
+  private final Weight lastDocPerGroup;
+
+  private final Sort withinGroupSort;
+  private final int withinGroupOffset;
+  private final int maxDocsPerGroup;
+
+  /**
+   * Creates a new BlockGroupingCollectorManager.
+   *
+   * @param groupSort the sort used to rank groups
+   * @param groupOffset the offset into the groups to start returning from
+   * @param topNGroups the number of top groups to collect
+   * @param needsScores whether scores are needed (must be true if groupSort 
or withinGroupSort uses
+   *     scores)
+   * @param lastDocPerGroup a {@link Weight} that matches the last document in 
each group block
+   * @param withinGroupSort the sort used to rank documents within each group
+   * @param withinGroupOffset the offset into each group's documents to start 
returning from
+   * @param maxDocsPerGroup the maximum number of documents to return per group
+   */
+  public BlockGroupingCollectorManager(
+      Sort groupSort,
+      int groupOffset,
+      int topNGroups,
+      boolean needsScores,
+      Weight lastDocPerGroup,
+      Sort withinGroupSort,
+      int withinGroupOffset,
+      int maxDocsPerGroup) {
+    if (groupOffset < 0) {
+      throw new IllegalArgumentException("groupOffset must be >= 0 (got " + 
groupOffset + ")");
+    }
+
+    if (topNGroups < 1) {
+      throw new IllegalArgumentException("topNGroups must be >= 1 (got " + 
topNGroups + ")");
+    }
+
+    if (withinGroupOffset < 0) {
+      throw new IllegalArgumentException(
+          "withinGroupOffset must be >= 0 (got " + withinGroupOffset + ")");
+    }
+
+    if (maxDocsPerGroup < 1) {
+      throw new IllegalArgumentException(
+          "maxDocsPerGroup must be >= 1 (got " + maxDocsPerGroup + ")");
+    }
+
+    if (withinGroupSort.getSort().length == 0) {

Review Comment:
   Sorry, my bad: I think this type of validation for the two Sort instances is 
redundant. `Sort` already does this in its constructor. We can remove it then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to