aokolnychyi commented on code in PR #9841:
URL: https://github.com/apache/iceberg/pull/9841#discussion_r1906083743


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.Iterator;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class ColumnarBatchUtil {
+
+  private ColumnarBatchUtil() {}
+
+  /**
+   * Applies deletes to a given {@link ColumnarBatch}. This method calculates 
rowIdMapping based on
+   * deletes and then update the rowIdMapping on each of the ColumnVector in 
the ColumnarBatch.
+   *
+   * @param columnarBatch The columnar batch to which the deletes are to be 
applied.
+   * @param deletes The delete filter containing delete information.
+   * @param isDeleted An array indicating whether the row in the batch is 
deleted.
+   * @param numRowsToRead The number of rows to be read and processed from the 
columnar batch.
+   * @param rowStartPosInBatch The starting position of the row in the batch.
+   * @param hasIsDeletedColumn Indicates whether the columnar batch includes 
_deleted column.
+   */
+  public static void applyDeletesToColumnarBatch(

Review Comment:
   I think we inherit the existing loading logic that is too complicated. 
First, we mix `isDeleted` and `rowIdMapping` cases. Second, we create 
`ColumnarBatch` prior to having all column vectors (e.g. isDeleted array).
   
   What we add methods like this to the util class instead? Only one of them 
will be needed in a query, right? We either mark records as removed or hide 
them.
   
   ```
   public static Pair<int[], Integer> buildRowIdMapping(
       ColumnVector[] vectors,
       DeleteFilter<InternalRow> deletes,
       long rowStartPosInBatch,
       int batchSize) {
     if (deletes == null) {
       return null;
     }
   
     PositionDeleteIndex deletedPositions = deletes.deletedRowPositions();
     Predicate<InternalRow> eqDeleteFilter = deletes.eqDeletedRowFilter();
     ColumnarBatchRow row = new ColumnarBatchRow(vectors);
     int[] rowIdMapping = new int[batchSize];
     int liveRowId = 0;
   
     for (int rowId = 0; rowId < batchSize; rowId++) {
       long pos = rowStartPosInBatch + rowId;
       row.rowId = rowId;
       if (isDeleted(pos, row, deletedPositions, eqDeleteFilter)) {
         deletes.incrementDeleteCount();
       } else {
         rowIdMapping[liveRowId] = rowId;
         liveRowId++;
       }
     }
   
     return liveRowId == batchSize ? null : Pair.of(rowIdMapping, liveRowId);
   }
   
   public static boolean[] buildIsDeleted(
       ColumnVector[] vectors,
       DeleteFilter<InternalRow> deletes,
       long rowStartPosInBatch,
       int batchSize) {
     boolean[] isDeleted = new boolean[batchSize];
   
     if (deletes == null) {
       return isDeleted;
     }
   
     PositionDeleteIndex deletedPositions = deletes.deletedRowPositions();
     Predicate<InternalRow> eqDeleteFilter = deletes.eqDeletedRowFilter();
     ColumnarBatchRow row = new ColumnarBatchRow(vectors);
   
     for (int rowId = 0; rowId < batchSize; rowId++) {
       long pos = rowStartPosInBatch + rowId;
       row.rowId = rowId;
       isDeleted[rowId] = isDeleted(pos, row, deletedPositions, eqDeleteFilter);
     }
   
     return isDeleted;
   }
   
   // use separate if statements to reduce the chance of speculative execution 
for equality tests
   private static boolean isDeleted(
       long pos,
       InternalRow row,
       PositionDeleteIndex deletedPositions,
       Predicate<InternalRow> eqDeleteFilter) {
     if (deletedPositions != null && deletedPositions.isDeleted(pos)) {
       return true;
     }
   
     if (!eqDeleteFilter.test(row)) {
       return true;
     }
   
     return false;
   }
   ```
   
   Then our loading logic can look like:
   - Initialize the vector array.
   - Load all data vectors (leaving metadata vectors as null).
   - If you need to discard deleted records, call `buildRowIdMapping` and 
either wrap loaded data vectors into other vectors or mutate them in place via 
`setRowIdMapping`.
   - If you need to mark deleted records, call `buildIsDeleted` to compute the 
flags.
   - Load all metadata vectors (we will have the `is_deleted` array fully 
populated now).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to