aokolnychyi commented on code in PR #11933:
URL: https://github.com/apache/iceberg/pull/11933#discussion_r1924082052


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.Arrays;
+import java.util.function.Predicate;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.ColumnarBatchRow;
+
+public class ColumnarBatchUtil {
+
+  private ColumnarBatchUtil() {}
+
+  /**
+   * Build a row id mapping inside a batch, which skips deleted rows. Here is 
an example of how we
+   * delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- 
Original status of the row
+   * id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted 
array Position delete 2,
+   * 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records 
to 6] [F,F,T,F,F,F,T,F]
+   * -- After applying position deletes
+   *
+   * @param vectors The array of ColumnVectors for the batch.
+   * @param deletes The delete filter containing delete information.
+   * @param rowStartPosInBatch The starting position of the row in the batch.
+   * @param batchSize The size of the batch.
+   * @return the mapping array and the new num of rows in a batch, null if no 
row is deleted
+   */
+  public static Pair<int[], Integer> buildRowIdMapping(
+      ColumnVector[] vectors,
+      DeleteFilter<InternalRow> deletes,
+      long rowStartPosInBatch,
+      int batchSize) {
+    if (deletes == null) {
+      return null;
+    }
+
+    PositionDeleteIndex deletedPositions = deletes.deletedRowPositions();
+    Predicate<InternalRow> eqDeleteFilter = deletes.eqDeletedRowFilter();
+    ColumnarBatchRow row = new ColumnarBatchRow(vectors);
+    int[] rowIdMapping = new int[batchSize];
+    int liveRowId = 0;
+
+    for (int rowId = 0; rowId < batchSize; rowId++) {
+      long pos = rowStartPosInBatch + rowId;
+      row.rowId = rowId;
+      if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) {
+        deletes.incrementDeleteCount();
+      } else {
+        rowIdMapping[liveRowId] = rowId;
+        liveRowId++;
+      }
+    }
+
+    return liveRowId == batchSize ? null : Pair.of(rowIdMapping, liveRowId);
+  }
+
+  /**
+   * Constructs an array indicating whether each row in a batch is deleted 
based on the specified
+   * delete filters. This method processes the column vectors and applies the 
delete filters to
+   * determine the deleted status for each row starting from a specified row 
position within the
+   * batch.
+   *
+   * @param vectors The array of ColumnVectors for the batch.
+   * @param deletes The delete filter containing information about which rows 
should be deleted.
+   * @param rowStartPosInBatch The starting position of the row in the batch, 
used to calculate the
+   *     absolute position of the rows in the context of the entire dataset.
+   * @param batchSize The number of rows in the current batch.
+   * @return An array of boolean values to indicate if a row is deleted or not
+   */
+  public static boolean[] buildIsDeleted(
+      ColumnVector[] vectors,
+      DeleteFilter<InternalRow> deletes,
+      long rowStartPosInBatch,
+      int batchSize) {
+    boolean[] isDeleted = new boolean[batchSize];
+
+    if (deletes == null) {
+      return isDeleted;
+    }
+
+    PositionDeleteIndex deletedPositions = deletes.deletedRowPositions();
+    Predicate<InternalRow> eqDeleteFilter = deletes.eqDeletedRowFilter();
+    ColumnarBatchRow row = new ColumnarBatchRow(vectors);
+
+    for (int rowId = 0; rowId < batchSize; rowId++) {
+      long pos = rowStartPosInBatch + rowId;
+      row.rowId = rowId;
+      if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) {
+        deletes.incrementDeleteCount();
+        isDeleted[rowId] = true;
+      }
+    }
+
+    return isDeleted;
+  }
+
+  private static boolean isDeleted(
+      long pos,
+      InternalRow row,
+      PositionDeleteIndex deletedPositions,
+      Predicate<InternalRow> eqDeleteFilter) {
+    // use separate if statements to reduce the chance of speculative 
execution for equality tests
+    if (deletedPositions != null && deletedPositions.isDeleted(pos)) {
+      return true;
+    }
+
+    if (eqDeleteFilter != null && !eqDeleteFilter.test(row)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Removes extra columns added for processing equality delete filters that 
are not part of the
+   * final query output.
+   *
+   * <p>During query execution, additional columns may be included in the 
schema to evaluate
+   * equality delete filters. For example, if the table schema contains 
columns C1, C2, C3, C4, and
+   * C5, and the query is 'SELECT C5 FROM table' while equality delete filters 
are applied on C3 and
+   * C4, the processing schema includes C5, C3, and C4. These extra columns 
(C3 and C4) are needed
+   * to identify rows to delete but are not included in the final result.
+   *
+   * <p>This method removes these extra columns from the end of {@code 
arrowColumnVectors}, ensuring
+   * only the expected columns remain.
+   *
+   * @param deletes The delete filter containing delete information.
+   * @param arrowColumnVectors the array of column vectors representing query 
result data
+   * @param columnarBatch the original {@code ColumnarBatch} containing query 
results
+   * @return a new {@code ColumnarBatch} with extra columns removed, or the 
original batch if no
+   *     extra columns were found
+   */
+  public static ColumnarBatch removeExtraColumns(
+      DeleteFilter<InternalRow> deletes,
+      ColumnVector[] arrowColumnVectors,

Review Comment:
   Actually, can we discard columns prior to constructing the batch now? It 
should be possible in the new logic, right? If so, this method can accept 
`ColumnVector[] columns` instead of `ColumnarBatch`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to