gsmiller commented on code in PR #15823:
URL: https://github.com/apache/lucene/pull/15823#discussion_r3010445924


##########
lucene/core/src/java/org/apache/lucene/util/PriorityQueue.java:
##########
@@ -174,6 +177,38 @@ public void addAll(Collection<T> elements) {
     }
   }
 
+  /**
+   * Adds all elements of the stream into the queue. This method should be 
preferred over calling
+   * {@link #add(Object)} in loop if all elements are known in advance as it 
builds queue faster.
+   *
+   * <p>If one needs to map or filter element in the iteration of elements in 
this method, call this
+   * method with elements wrapped by {@link Stream#map(Function)} or {@link
+   * Stream#filter(Predicate)}, etc. In these cases, this method should be 
preferred over calling
+   * {@link #addAll(Collection)}.
+   *
+   * <p>If one tries to add more objects than the maxSize passed in the 
constructor, an {@link
+   * ArrayIndexOutOfBoundsException} is thrown. Which may result in parts of 
elements added into the
+   * queue, but the heap is still stay in correct state. In this case, if 
caller wants to readd or
+   * {@link #updateTop(Object)} with remaining elements, it should use a new 
stream, and use {@link
+   * Stream#skip(long)} to skip consumed elements with the delta size of queue.
+   */
+  public void addAll(Stream<T> elements) {
+    // Heap with size S always takes first S elements of the array,
+    // and thus it's safe to fill array further - no actual non-sentinel value 
will be overwritten.
+    try {
+      elements.forEachOrdered(
+          element -> {
+            this.heap[size + 1] = element;
+            this.size++;

Review Comment:
   Thanks for using `forEachOrdered` here to be more technically correct. This 
does make me wonder if we have a potential concurrency bug lurking though. If 
the caller passes a parallel stream implementation, I think we have a currency 
bug in how we read-then-update `this.size` right? I'm not a stream expert, so 
maybe not? But I think so? If so, let's add javadoc?  Or maybe we chain a call 
to `#sequential`, like `elements.sequential().forEachOrdered`?



##########
lucene/core/src/test/org/apache/lucene/util/TestPriorityQueue.java:
##########
@@ -159,6 +160,57 @@ public void testInsertWithOverflow() {
     assertThat(pq.top(), equalTo(2));
   }
 
+  public void testAddAllWithStream() {
+    PriorityQueue<Integer> pq = new IntegerQueue(6);
+    List<String> elements = new ArrayList<>();
+    for (String s : Arrays.asList("a", "b", "c", "d", "e", "f")) {
+      if (random().nextBoolean()) {
+        elements.addFirst(s);
+      } else {
+        elements.addLast(s);
+      }
+    }
+
+    pq.addAll(elements.stream().map(String::hashCode));
+    assertEquals("a".hashCode(), pq.top().intValue());
+  }
+
+  public void testAddAllWithStreamNotFitIntoQueue() {
+    PriorityQueue<Integer> pq = new IntegerQueue(10);
+    List<String> elements = new ArrayList<>();
+    for (String s : Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", 
"j", "k")) {
+      if (random().nextBoolean()) {
+        elements.addFirst(s);
+      } else {
+        elements.addLast(s);
+      }
+    }
+
+    assertThrows(
+        "Cannot add 11 elements to a queue with remaining capacity: 10",
+        ArrayIndexOutOfBoundsException.class,
+        () -> pq.addAll(elements.stream().map(String::hashCode)));
+    // Partly added.
+    assertEquals(10, pq.size());
+    assertHeap(pq);
+  }
+
+  private void assertHeap(PriorityQueue<Integer> pq) {
+    // TODO: Maybe change getHeapArray return type T[].
+    Object[] heapArray = pq.getHeapArray();
+    // The loop goes down to 1 as heap is 1-based not 0-based.
+    for (int i = (heapArray.length >>> 1); i >= 1; i--) {
+      int left = i << 1;
+      int right = left + 1;
+      if (right < heapArray.length) {
+        assert (Integer) heapArray[i] <= (Integer) heapArray[right];

Review Comment:
   nit: This test will silently not work if asserts are disabled in the jvm. 
Let's use the idiomatic juint assert methods instead. I'd have this assertHeap 
method return a boolean then use assertTrue from the calling method.



##########
lucene/core/src/test/org/apache/lucene/util/TestPriorityQueue.java:
##########
@@ -159,6 +160,57 @@ public void testInsertWithOverflow() {
     assertThat(pq.top(), equalTo(2));
   }
 
+  public void testAddAllWithStream() {
+    PriorityQueue<Integer> pq = new IntegerQueue(6);
+    List<String> elements = new ArrayList<>();
+    for (String s : Arrays.asList("a", "b", "c", "d", "e", "f")) {
+      if (random().nextBoolean()) {
+        elements.addFirst(s);
+      } else {
+        elements.addLast(s);
+      }
+    }
+
+    pq.addAll(elements.stream().map(String::hashCode));
+    assertEquals("a".hashCode(), pq.top().intValue());
+  }
+
+  public void testAddAllWithStreamNotFitIntoQueue() {
+    PriorityQueue<Integer> pq = new IntegerQueue(10);
+    List<String> elements = new ArrayList<>();
+    for (String s : Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", 
"j", "k")) {
+      if (random().nextBoolean()) {
+        elements.addFirst(s);
+      } else {
+        elements.addLast(s);
+      }
+    }
+
+    assertThrows(
+        "Cannot add 11 elements to a queue with remaining capacity: 10",
+        ArrayIndexOutOfBoundsException.class,
+        () -> pq.addAll(elements.stream().map(String::hashCode)));
+    // Partly added.
+    assertEquals(10, pq.size());
+    assertHeap(pq);

Review Comment:
   This approach works, but you could also pop elements off the heap in a loop 
and assert the ordered nature of what comes out. It's nice that you avoid 
mutating the heap here with your check, but since it's the last thing we're 
doing in a unit test, it might be simpler to just pop stuff off the heap and 
check rather than writing custom heap traversal logic? I don't have strong 
opinions I suppose. Take-it-or-leave it :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to