laskoviymishka commented on code in PR #16052:
URL: https://github.com/apache/iceberg/pull/16052#discussion_r3272095669


##########
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Coalesces consecutive position deletes into range inserts on a {@link 
PositionDeleteIndex}. */
+public final class PositionDeleteRangeConsumer {

Review Comment:
   this is a V2-only optimization, V3 DVs go through 
`PositionDeleteIndex.deserialize` and don't touch this code, and the V3 spec 
asks writers to merge old position delete files into DVs at maintenance.
   
   i'd want a sentence in the class doc naming the workload this is meant to 
help, otherwise in a year or two it'll be hard to know whether to keep tuning 
it or let it bit-rot alongside V2 position deletes. wdyt?



##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -177,7 +177,7 @@ private static PositionDeleteIndex toPositionIndex(
       CloseableIterable<Long> posDeletes, List<DeleteFile> files) {
     try (CloseableIterable<Long> deletes = posDeletes) {
       PositionDeleteIndex positionDeleteIndex = new 
BitmapPositionDeleteIndex(files);
-      deletes.forEach(positionDeleteIndex::delete);
+      PositionDeleteRangeConsumer.forEach(deletes, positionDeleteIndex);

Review Comment:
   i'm a little confused why the sibling `toPositionIndexes` is left untouched 
in this PR. It's what `BaseDeleteLoader.readPosDeletes` calls for the cached 
multi-file path, so arguably the more-trafficked of the two.
   
   either the benchmark wins translate to real workloads and that loop wants 
the same treatment, or they don't and I'm not sure why we want it on this path. 
Thoughts?



##########
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Coalesces consecutive position deletes into range inserts on a {@link 
PositionDeleteIndex}. */
+public final class PositionDeleteRangeConsumer {
+
+  /**
+   * Batch size for {@link #forEach}. Sized to fit comfortably in L1 (512 
bytes). Smaller buffers
+   * miss the bulk-path branch elision; larger buffers add allocation cost 
without improving the
+   * inner-loop throughput (the {@code acceptAll} body is the same regardless 
of slice length).
+   */
+  private static final int FOREACH_BATCH_SIZE = 64;
+
+  private PositionDeleteRangeConsumer() {}
+
+  /**
+   * Drains {@code positions} into a {@link RangeAccumulator} and flushes.
+   *
+   * <p>Boxed positions are buffered into a small primitive slice and then 
handed to {@link
+   * RangeAccumulator#acceptAll(long[], int, int)}, which keeps the 
sniff/escape state machine out
+   * of the inner loop. Compared to per-element {@link 
RangeAccumulator#accept(long)}, this gives a
+   * ~12% reduction in run-to-run time on dense inputs and -- more importantly 
-- removes the JIT
+   * inlining sensitivity that produces a 2 ms standard deviation on the 
per-element path.
+   */
+  public static void forEach(Iterable<Long> positions, PositionDeleteIndex 
target) {
+    RangeAccumulator acc = new RangeAccumulator(target);
+    long[] buffer = new long[FOREACH_BATCH_SIZE];
+    int filled = 0;
+    for (Long pos : positions) {
+      buffer[filled++] = pos;
+      if (filled == FOREACH_BATCH_SIZE) {
+        acc.acceptAll(buffer, 0, FOREACH_BATCH_SIZE);
+        filled = 0;
+      }
+    }
+    if (filled > 0) {
+      acc.acceptAll(buffer, 0, filled);
+    }
+    acc.flush();
+  }
+
+  /**
+   * Coalesces consecutive positions into range deletes on the target index. 
The first {@value
+   * #SNIFF_SIZE} positions are inspected; if more than {@value 
#BOUNDARY_THRESHOLD_PERCENT}% cross
+   * gaps, the accumulator falls back to per-position deletes for the rest of 
its life.
+   *
+   * <p>Single-threaded; one instance per target index. Callers that already 
have positions in a
+   * primitive {@code long[]} should call {@link #acceptAll(long[], int, int)} 
directly -- the bulk
+   * path keeps the state-machine dispatch out of the inner loop. {@link 
#accept(long)} exists for
+   * truly streaming callers that do not buffer; {@link 
PositionDeleteRangeConsumer#forEach} is the
+   * standard entry for boxed iterables and handles its own small primitive 
batching internally.
+   */
+  public static final class RangeAccumulator {
+
+    private static final int SNIFF_SIZE = 256;
+    private static final int BOUNDARY_THRESHOLD_PERCENT = 30;
+
+    private final PositionDeleteIndex target;
+    private boolean hasRun;
+    private long rangeStart;
+    private long lastPosition;
+
+    private int processed;
+    private int boundaries;
+    private boolean escaped;
+
+    public RangeAccumulator(PositionDeleteIndex target) {
+      Preconditions.checkArgument(target != null, "Invalid target index: 
null");
+      this.target = target;
+    }
+
+    public void accept(long pos) {
+      if (escaped) {
+        target.delete(pos);
+        return;
+      }
+      if (!hasRun) {
+        initRun(pos);
+        return;
+      }
+      coalesceSniff(pos);
+      if (processed == SNIFF_SIZE && shouldEscape()) {
+        enterEscape();
+      }
+    }
+
+    /**
+     * Bulk variant of {@link #accept(long)}. Runs the entire sniff/coalesce 
loop inside this method
+     * so the per-element work in steady state is identical to the original 
tight inline loop -- one
+     * gap-check branch and one position store, with no per-call frame. The 
small private helpers
+     * are inlined by HotSpot on the hot path.
+     */
+    public void acceptAll(long[] positions, int from, int to) {
+      Preconditions.checkArgument(positions != null, "Invalid positions array: 
null");
+      Preconditions.checkPositionIndexes(from, to, positions.length);
+      if (from >= to) {
+        return;
+      }
+
+      int cursor = from;
+
+      if (escaped) {
+        drainEscaped(positions, cursor, to);
+        return;
+      }
+
+      if (!hasRun) {
+        initRun(positions[cursor++]);
+      }
+
+      while (cursor < to && processed < SNIFF_SIZE) {
+        coalesceSniff(positions[cursor++]);
+      }
+
+      if (processed == SNIFF_SIZE && shouldEscape()) {
+        enterEscape();
+        drainEscaped(positions, cursor, to);
+        return;
+      }
+
+      while (cursor < to) {
+        coalesce(positions[cursor++]);
+      }
+    }
+
+    /** Emits the active run, if any. The escape decision is sticky across 
flushes. */
+    public void flush() {
+      if (hasRun) {
+        emit();
+        hasRun = false;
+      }
+    }
+
+    /** Starts a new active run anchored at {@code first}. */
+    private void initRun(long first) {
+      rangeStart = first;
+      lastPosition = first;
+      hasRun = true;
+      processed = 1;
+    }
+
+    /** Extends the active run with {@code pos} during sniffing; counts gaps 
to inform escape. */
+    private void coalesceSniff(long pos) {
+      if (pos - lastPosition != 1) {
+        boundaries++;
+        emit();
+        rangeStart = pos;
+      }
+      lastPosition = pos;
+      processed++;
+    }
+
+    /** Extends the active run with {@code pos} after sniffing has decided not 
to escape. */
+    private void coalesce(long pos) {
+      if (pos - lastPosition != 1) {
+        emit();
+        rangeStart = pos;
+      }
+      lastPosition = pos;
+    }
+
+    /** True if the sniffed prefix has too many gaps to make coalescing 
worthwhile. */
+    private boolean shouldEscape() {
+      return boundaries * 100 > (SNIFF_SIZE - 1) * BOUNDARY_THRESHOLD_PERCENT;

Review Comment:
   position delete files are spec-sorted by `(file_path, pos)`, so within a 
single file the stream is monotonic, at which point a greedy `pos == lastEnd + 
1` check gets us coalescing without the sniff window, boundary counter, or 
escape mode.
   
   did you measure that shape? my intuition is the 2% sparse regression is 
mostly the sniff cost itself, not per-element branching so if that holds, we 
keep the dense win and drop the two magic constants. wdyt?



##########
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Coalesces consecutive position deletes into range inserts on a {@link 
PositionDeleteIndex}. */
+public final class PositionDeleteRangeConsumer {

Review Comment:
   is this meant as a public API? the only caller is in-package, and the 
`accept`/`acceptAll`/`flush` shape plus the sniff-stickiness contract feel like 
internals to me.
   
   id make this and `RangeAccumulator` package-private, keeps our hands free if 
the design shifts based on the questions above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to