javanna commented on code in PR #15936:
URL: https://github.com/apache/lucene/pull/15936#discussion_r3266083896


##########
lucene/grouping/src/test/org/apache/lucene/search/grouping/TestBlockGrouping.java:
##########
@@ -77,6 +77,64 @@ public void testSimple() throws IOException {
     shard.close();
   }
 
+  public void testShardedBlockGrouping() throws IOException {

Review Comment:
   Are we testing with search concurrency on in this test + the existing tests?



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollectorManager.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.Weight;
+
+/**
+ * A {@link CollectorManager} for {@link BlockGroupingCollector} that merges 
results from multiple
+ * collectors into a single {@link TopGroups}. This is intended for use with 
concurrent search,
+ * where each segment is searched by a separate {@link BlockGroupingCollector}.
+ *
+ * <p>Documents must be indexed as blocks using {@link
+ * org.apache.lucene.index.IndexWriter#addDocuments 
IndexWriter.addDocuments()} or {@link
+ * org.apache.lucene.index.IndexWriter#updateDocuments 
IndexWriter.updateDocuments()}.
+ *
+ * <p>See {@link BlockGroupingCollector} for more details.
+ *
+ * @lucene.experimental
+ */
+public class BlockGroupingCollectorManager
+    implements CollectorManager<BlockGroupingCollector, TopGroups<?>> {
+
+  private final Sort groupSort;
+  private final int topNGroups;
+  private final boolean needsScores;
+  private final Weight lastDocPerGroup;
+
+  private final Sort withinGroupSort;
+  private final int groupOffset;
+  private final int withinGroupOffset;
+  private final int maxDocsPerGroup;
+
+  private final List<BlockGroupingCollector> collectors;
+
+  public BlockGroupingCollectorManager(
+      Sort groupSort,
+      int topNGroups,
+      boolean needsScores,
+      Weight lastDocPerGroup,
+      Sort withinGroupSort,
+      int groupOffset,
+      int withinGroupOffset,
+      int maxDocsPerGroup) {
+    this.groupSort = groupSort;
+    this.topNGroups = topNGroups;
+    this.needsScores = needsScores;
+    this.lastDocPerGroup = lastDocPerGroup;
+    this.collectors = new ArrayList<>();
+    this.withinGroupSort = withinGroupSort;
+    this.groupOffset = groupOffset;
+    this.withinGroupOffset = withinGroupOffset;
+    this.maxDocsPerGroup = maxDocsPerGroup;
+  }
+
+  @Override
+  public BlockGroupingCollector newCollector() throws IOException {
+    BlockGroupingCollector collector =
+        new BlockGroupingCollector(groupSort, topNGroups, needsScores, 
lastDocPerGroup);
+    collectors.add(collector);
+    return collector;
+  }
+
+  @Override
+  public TopGroups<?> reduce(Collection<BlockGroupingCollector> collectors) 
throws IOException {
+    if (collectors.isEmpty()) {
+      return null;
+    }
+
+    if (collectors.size() == 1) {
+      return collectors
+          .iterator()
+          .next()
+          .getTopGroups(withinGroupSort, groupOffset, withinGroupOffset, 
maxDocsPerGroup);
+    }

Review Comment:
   Avoid returning null above and remove this conditional? Is it necessary?



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/TopGroups.java:
##########
@@ -280,4 +287,123 @@ public static <T> TopGroups<T> merge(
           totalMaxScore);
     }
   }
+
+  private record MergedBlockGroup(Object[] topValues, int shardIndex, int 
groupIndex) {}
+
+  private static class GroupComparator implements Comparator<MergedBlockGroup> 
{
+    @SuppressWarnings("rawtypes")
+    public final FieldComparator[] comparators;
+
+    public final int[] reversed;
+
+    @SuppressWarnings({"rawtypes"})
+    public GroupComparator(Sort groupSort) {
+      final SortField[] sortFields = groupSort.getSort();
+      comparators = new FieldComparator[sortFields.length];
+      reversed = new int[sortFields.length];
+      for (int compIDX = 0; compIDX < sortFields.length; compIDX++) {
+        final SortField sortField = sortFields[compIDX];
+        comparators[compIDX] = sortField.getComparator(1, Pruning.NONE);
+        reversed[compIDX] = sortField.getReverse() ? -1 : 1;
+      }
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked"})
+    public int compare(MergedBlockGroup group, MergedBlockGroup other) {
+      if (group == other) {
+        return 0;
+      }
+      final Object[] groupValues = group.topValues;
+      final Object[] otherValues = other.topValues;
+      for (int compIDX = 0; compIDX < comparators.length; compIDX++) {
+        final int c =
+            reversed[compIDX]
+                * comparators[compIDX].compareValues(groupValues[compIDX], 
otherValues[compIDX]);
+        if (c != 0) {
+          return c;
+        }
+      }
+
+      assert group.shardIndex != other.shardIndex;
+      return group.shardIndex - other.shardIndex;
+    }
+  }
+
+  /**
+   * Merge TopGroups that are partitioned into blocks per shard. This method 
assumes that within
+   * each shard, the groups are sorted according to the groupSort.
+   *
+   * @param shardGroups list of TopGroups, one per shard.
+   * @param groupSort The {@link Sort} used to sort the groups. The top sorted 
document within each
+   *     * group according to groupSort, determines how that group sorts 
against other groups. This
+   *     * must be non-null, ie, if you want to groupSort by relevance use 
Sort.RELEVANCE.
+   * @param groupOffset Which group to start from.
+   * @param topNGroups How many top groups to keep.
+   * @return TopGroups instance or null if there are no groups.
+   */
+  public static TopGroups<?> mergeBlockGroups(

Review Comment:
   We could use generics at our advantage here, to enforce consistency



##########
lucene/CHANGES.txt:
##########
@@ -115,6 +115,8 @@ Improvements
 
 * GITHUB#15225: Improve package documentation for org.apache.lucene.util. 
(Syed Mohammad Saad)
 
+* GITHUB#15936: Introduce BlockGroupingCollectorManager to parallelize search 
when using BlockGroupingCollector. (Binlong Gao)

Review Comment:
   Move this to 10.5 ? I think it makes sense to backport it



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/TopGroups.java:
##########
@@ -280,4 +287,123 @@ public static <T> TopGroups<T> merge(
           totalMaxScore);
     }
   }
+
+  private record MergedBlockGroup(Object[] topValues, int shardIndex, int 
groupIndex) {}
+
+  private static class GroupComparator implements Comparator<MergedBlockGroup> 
{
+    @SuppressWarnings("rawtypes")
+    public final FieldComparator[] comparators;
+
+    public final int[] reversed;
+
+    @SuppressWarnings({"rawtypes"})
+    public GroupComparator(Sort groupSort) {
+      final SortField[] sortFields = groupSort.getSort();
+      comparators = new FieldComparator[sortFields.length];
+      reversed = new int[sortFields.length];
+      for (int compIDX = 0; compIDX < sortFields.length; compIDX++) {
+        final SortField sortField = sortFields[compIDX];
+        comparators[compIDX] = sortField.getComparator(1, Pruning.NONE);
+        reversed[compIDX] = sortField.getReverse() ? -1 : 1;
+      }
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked"})
+    public int compare(MergedBlockGroup group, MergedBlockGroup other) {
+      if (group == other) {
+        return 0;
+      }
+      final Object[] groupValues = group.topValues;
+      final Object[] otherValues = other.topValues;
+      for (int compIDX = 0; compIDX < comparators.length; compIDX++) {
+        final int c =
+            reversed[compIDX]
+                * comparators[compIDX].compareValues(groupValues[compIDX], 
otherValues[compIDX]);
+        if (c != 0) {
+          return c;
+        }
+      }
+
+      assert group.shardIndex != other.shardIndex;
+      return group.shardIndex - other.shardIndex;
+    }
+  }
+
+  /**
+   * Merge TopGroups that are partitioned into blocks per shard. This method 
assumes that within
+   * each shard, the groups are sorted according to the groupSort.
+   *
+   * @param shardGroups list of TopGroups, one per shard.
+   * @param groupSort The {@link Sort} used to sort the groups. The top sorted 
document within each
+   *     * group according to groupSort, determines how that group sorts 
against other groups. This
+   *     * must be non-null, ie, if you want to groupSort by relevance use 
Sort.RELEVANCE.
+   * @param groupOffset Which group to start from.
+   * @param topNGroups How many top groups to keep.
+   * @return TopGroups instance or null if there are no groups.
+   */
+  public static TopGroups<?> mergeBlockGroups(
+      List<TopGroups<?>> shardGroups, Sort groupSort, int groupOffset, int 
topNGroups) {
+    if (shardGroups.isEmpty()) {
+      return null;
+    }
+
+    int totalGroupCount = 0;
+    int totalHitCount = 0;
+    int totalGroupedHitCount = 0;
+    for (TopGroups<?> sg : shardGroups) {
+      totalGroupCount += sg.totalGroupCount;
+      totalHitCount += sg.totalHitCount;
+    }
+
+    // k-way merge
+    GroupComparator groupComp = new GroupComparator(groupSort);
+    NavigableSet<MergedBlockGroup> queue = new TreeSet<>(groupComp);
+
+    // init queue
+    for (int idx = 0; idx < shardGroups.size(); idx++) {
+      GroupDocs<?> firstGroupDocs = shardGroups.get(idx).groups[0];
+      queue.add(new MergedBlockGroup(firstGroupDocs.groupSortValues(), idx, 
0));
+    }
+
+    float maxScore = 
shardGroups.get(queue.first().shardIndex).groups[0].maxScore();

Review Comment:
   is the maxScore only read once from the first group?



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollectorManager.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.Weight;
+
+/**
+ * A {@link CollectorManager} for {@link BlockGroupingCollector} that merges 
results from multiple
+ * collectors into a single {@link TopGroups}. This is intended for use with 
concurrent search,
+ * where each segment is searched by a separate {@link BlockGroupingCollector}.
+ *
+ * <p>Documents must be indexed as blocks using {@link
+ * org.apache.lucene.index.IndexWriter#addDocuments 
IndexWriter.addDocuments()} or {@link
+ * org.apache.lucene.index.IndexWriter#updateDocuments 
IndexWriter.updateDocuments()}.
+ *
+ * <p>See {@link BlockGroupingCollector} for more details.
+ *
+ * @lucene.experimental
+ */
+public class BlockGroupingCollectorManager
+    implements CollectorManager<BlockGroupingCollector, TopGroups<?>> {
+
+  private final Sort groupSort;
+  private final int topNGroups;
+  private final boolean needsScores;
+  private final Weight lastDocPerGroup;
+
+  private final Sort withinGroupSort;
+  private final int groupOffset;
+  private final int withinGroupOffset;
+  private final int maxDocsPerGroup;
+
+  private final List<BlockGroupingCollector> collectors;

Review Comment:
   can we remove this list? I don't think that tracking the collectors getting 
created brings any value.



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollectorManager.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.Weight;
+
+/**
+ * A {@link CollectorManager} for {@link BlockGroupingCollector} that merges 
results from multiple
+ * collectors into a single {@link TopGroups}. This is intended for use with 
concurrent search,
+ * where each segment is searched by a separate {@link BlockGroupingCollector}.
+ *
+ * <p>Documents must be indexed as blocks using {@link
+ * org.apache.lucene.index.IndexWriter#addDocuments 
IndexWriter.addDocuments()} or {@link
+ * org.apache.lucene.index.IndexWriter#updateDocuments 
IndexWriter.updateDocuments()}.
+ *
+ * <p>See {@link BlockGroupingCollector} for more details.
+ *
+ * @lucene.experimental
+ */
+public class BlockGroupingCollectorManager

Review Comment:
   We could use a T generic type for the manager that's the same as the 
TopGroups?



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollectorManager.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.Weight;
+
+/**
+ * A {@link CollectorManager} for {@link BlockGroupingCollector} that merges 
results from multiple
+ * collectors into a single {@link TopGroups}. This is intended for use with 
concurrent search,
+ * where each segment is searched by a separate {@link BlockGroupingCollector}.
+ *
+ * <p>Documents must be indexed as blocks using {@link
+ * org.apache.lucene.index.IndexWriter#addDocuments 
IndexWriter.addDocuments()} or {@link
+ * org.apache.lucene.index.IndexWriter#updateDocuments 
IndexWriter.updateDocuments()}.
+ *
+ * <p>See {@link BlockGroupingCollector} for more details.
+ *
+ * @lucene.experimental
+ */
+public class BlockGroupingCollectorManager
+    implements CollectorManager<BlockGroupingCollector, TopGroups<?>> {
+
+  private final Sort groupSort;
+  private final int topNGroups;
+  private final boolean needsScores;
+  private final Weight lastDocPerGroup;
+
+  private final Sort withinGroupSort;
+  private final int groupOffset;
+  private final int withinGroupOffset;
+  private final int maxDocsPerGroup;
+
+  private final List<BlockGroupingCollector> collectors;
+
+  public BlockGroupingCollectorManager(
+      Sort groupSort,
+      int topNGroups,
+      boolean needsScores,
+      Weight lastDocPerGroup,
+      Sort withinGroupSort,
+      int groupOffset,
+      int withinGroupOffset,
+      int maxDocsPerGroup) {
+    this.groupSort = groupSort;
+    this.topNGroups = topNGroups;
+    this.needsScores = needsScores;
+    this.lastDocPerGroup = lastDocPerGroup;
+    this.collectors = new ArrayList<>();
+    this.withinGroupSort = withinGroupSort;
+    this.groupOffset = groupOffset;
+    this.withinGroupOffset = withinGroupOffset;
+    this.maxDocsPerGroup = maxDocsPerGroup;
+  }
+
+  @Override
+  public BlockGroupingCollector newCollector() throws IOException {
+    BlockGroupingCollector collector =
+        new BlockGroupingCollector(groupSort, topNGroups, needsScores, 
lastDocPerGroup);
+    collectors.add(collector);
+    return collector;
+  }
+
+  @Override
+  public TopGroups<?> reduce(Collection<BlockGroupingCollector> collectors) 
throws IOException {
+    if (collectors.isEmpty()) {
+      return null;
+    }
+
+    if (collectors.size() == 1) {
+      return collectors
+          .iterator()
+          .next()
+          .getTopGroups(withinGroupSort, groupOffset, withinGroupOffset, 
maxDocsPerGroup);
+    }
+
+    // Merge results from multiple collectors
+    List<TopGroups<?>> shardGroupsList = new ArrayList<>();
+    for (BlockGroupingCollector collector : collectors) {
+      TopGroups<?> topGroups =
+          collector.getTopGroups(withinGroupSort, 0, withinGroupOffset, 
maxDocsPerGroup);
+      if (topGroups != null) {
+        shardGroupsList.add(topGroups);
+      }
+    }
+
+    if (shardGroupsList.isEmpty()) {
+      return null;

Review Comment:
   avoid returning null?



##########
lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollectorManager.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.lucene.search.CollectorManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.Weight;
+
+/**
+ * A {@link CollectorManager} for {@link BlockGroupingCollector} that merges 
results from multiple
+ * collectors into a single {@link TopGroups}. This is intended for use with 
concurrent search,
+ * where each segment is searched by a separate {@link BlockGroupingCollector}.
+ *
+ * <p>Documents must be indexed as blocks using {@link
+ * org.apache.lucene.index.IndexWriter#addDocuments 
IndexWriter.addDocuments()} or {@link
+ * org.apache.lucene.index.IndexWriter#updateDocuments 
IndexWriter.updateDocuments()}.
+ *
+ * <p>See {@link BlockGroupingCollector} for more details.
+ *
+ * @lucene.experimental
+ */
+public class BlockGroupingCollectorManager
+    implements CollectorManager<BlockGroupingCollector, TopGroups<?>> {
+
+  private final Sort groupSort;
+  private final int topNGroups;
+  private final boolean needsScores;
+  private final Weight lastDocPerGroup;
+
+  private final Sort withinGroupSort;
+  private final int groupOffset;
+  private final int withinGroupOffset;
+  private final int maxDocsPerGroup;
+
+  private final List<BlockGroupingCollector> collectors;
+
+  public BlockGroupingCollectorManager(
+      Sort groupSort,
+      int topNGroups,
+      boolean needsScores,
+      Weight lastDocPerGroup,
+      Sort withinGroupSort,
+      int groupOffset,
+      int withinGroupOffset,
+      int maxDocsPerGroup) {
+    this.groupSort = groupSort;
+    this.topNGroups = topNGroups;
+    this.needsScores = needsScores;
+    this.lastDocPerGroup = lastDocPerGroup;
+    this.collectors = new ArrayList<>();
+    this.withinGroupSort = withinGroupSort;
+    this.groupOffset = groupOffset;
+    this.withinGroupOffset = withinGroupOffset;
+    this.maxDocsPerGroup = maxDocsPerGroup;
+  }
+
+  @Override
+  public BlockGroupingCollector newCollector() throws IOException {
+    BlockGroupingCollector collector =
+        new BlockGroupingCollector(groupSort, topNGroups, needsScores, 
lastDocPerGroup);
+    collectors.add(collector);
+    return collector;
+  }
+
+  @Override
+  public TopGroups<?> reduce(Collection<BlockGroupingCollector> collectors) 
throws IOException {
+    if (collectors.isEmpty()) {
+      return null;
+    }
+
+    if (collectors.size() == 1) {
+      return collectors
+          .iterator()
+          .next()
+          .getTopGroups(withinGroupSort, groupOffset, withinGroupOffset, 
maxDocsPerGroup);
+    }
+
+    // Merge results from multiple collectors
+    List<TopGroups<?>> shardGroupsList = new ArrayList<>();
+    for (BlockGroupingCollector collector : collectors) {
+      TopGroups<?> topGroups =
+          collector.getTopGroups(withinGroupSort, 0, withinGroupOffset, 
maxDocsPerGroup);
+      if (topGroups != null) {

Review Comment:
   is this a real-life scenario? And what about top groups with empty groups 
array?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to