aokolnychyi commented on code in PR #9841:
URL: https://github.com/apache/iceberg/pull/9841#discussion_r1585438330


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java:
##########
@@ -32,23 +32,27 @@
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.ParquetReaderType;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBatch, T> {
   private final int batchSize;
+  private final ParquetReaderType parquetReaderType;

Review Comment:
   Instead of passing this variable, let's create `BatchReadConf` and pass a 
reference to it here. I'd consider adding `batchReadConf()` method in 
`SparkReadConf` with the following fields (if you want, we can add a builder 
for `BatchReadConf` as well).
   
   ```
   class BatchReadConf {
     int orcBatchSize() {...}
     ParquetReaderType parquetReaderType() {...}
     int parquetBatchSize() {...}
   }
   ```



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -74,48 +71,23 @@ public final ColumnarBatch read(ColumnarBatch reuse, int 
numRowsToRead) {
     return columnarBatch;
   }
 
-  private class ColumnBatchLoader {
-    private final int numRowsToRead;
-    // the rowId mapping to skip deleted rows for all column vectors inside a 
batch, it is null when
-    // there is no deletes
-    private int[] rowIdMapping;
-    // the array to indicate if a row is deleted or not, it is null when there 
is no "_deleted"
-    // metadata column
-    private boolean[] isDeleted;
+  private class ColumnBatchLoader extends BaseColumnBatchLoader {
 
     ColumnBatchLoader(int numRowsToRead) {
-      Preconditions.checkArgument(
-          numRowsToRead > 0, "Invalid number of rows to read: %s", 
numRowsToRead);
-      this.numRowsToRead = numRowsToRead;
-      if (hasIsDeletedColumn) {
-        isDeleted = new boolean[numRowsToRead];
-      }
+      super(numRowsToRead, hasIsDeletedColumn, deletes, rowStartPosInBatch);
     }
 
-    ColumnarBatch loadDataToColumnBatch() {
+    @Override
+    public ColumnarBatch loadDataToColumnBatch() {
       int numRowsUndeleted = initRowIdMapping();
-
       ColumnVector[] arrowColumnVectors = readDataToColumnVectors();
-
-      ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);
-      newColumnarBatch.setNumRows(numRowsUndeleted);
-
-      if (hasEqDeletes()) {
-        applyEqDelete(newColumnarBatch);
-      }
-
-      if (hasIsDeletedColumn && rowIdMapping != null) {
-        // reset the row id mapping array, so that it doesn't filter out the 
deleted rows
-        for (int i = 0; i < numRowsToRead; i++) {
-          rowIdMapping[i] = i;
-        }
-        newColumnarBatch.setNumRows(numRowsToRead);
-      }
-
+      ColumnarBatch newColumnarBatch =

Review Comment:
   Minor: Redundant local var?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java:
##########
@@ -115,11 +115,11 @@ private String[][] computePreferredLocations() {
   public PartitionReaderFactory createReaderFactory() {
     if (useParquetBatchReads()) {
       int batchSize = readConf.parquetBatchSize();
-      return new SparkColumnarReaderFactory(batchSize);
+      return new SparkColumnarReaderFactory(batchSize, 
readConf.parquetReaderType());

Review Comment:
   This is where we will call `readConf.batchReadConf()` or whatever we want to 
call it.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/BaseColumnBatchLoader.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.Iterator;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+@SuppressWarnings("checkstyle:VisibilityModifier")

Review Comment:
   We would want to structure this a bit differently. Let me think more.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometColumnReader.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized.comet;

Review Comment:
   Question: Is there a benefit in having these changes as a separate package? 
I think the naming separates these classes enough. If we have it in the same 
package with built-in Parquet readers, we will avoid the need to make some 
classes public.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java:
##########
@@ -27,6 +27,11 @@ private SparkSQLProperties() {}
   // Controls whether vectorized reads are enabled
   public static final String VECTORIZATION_ENABLED = 
"spark.sql.iceberg.vectorization.enabled";
 
+  // Controls which parquet reader to use for vectorization: either Iceberg's 
parquet reader or

Review Comment:
   I think just stating `Controls which Parquet reader to use` should be 
enough. We may use that beyond vectorized reads in the future.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetReaderType.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+public enum ParquetReaderType {

Review Comment:
   Does this have to be public?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java:
##########
@@ -32,23 +32,27 @@
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.ParquetReaderType;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBatch, T> {
   private final int batchSize;
+  private final ParquetReaderType parquetReaderType;

Review Comment:
   Then we will have a reference to `BatchReadConf` in `BaseBatchReader` as 
opposed to having separate `batchSize`, `parquetReaderType`, and future fields 
we will have to add.
   
   



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java:
##########
@@ -28,10 +29,12 @@
 
 class SparkColumnarReaderFactory implements PartitionReaderFactory {
   private final int batchSize;
+  private final ParquetReaderType parquetReaderType;
 
-  SparkColumnarReaderFactory(int batchSize) {
+  SparkColumnarReaderFactory(int batchSize, ParquetReaderType readType) {

Review Comment:
   This one will also get a reference to `BatchReadConf` as opposed to separate 
variables. The batch size validation can be moved to `BatchReadConf` itself.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java:
##########
@@ -49,7 +52,9 @@ public PartitionReader<ColumnarBatch> 
createColumnarReader(InputPartition inputP
     SparkInputPartition partition = (SparkInputPartition) inputPartition;
 
     if (partition.allTasksOfType(FileScanTask.class)) {
-      return new BatchDataReader(partition, batchSize);
+      BatchDataReader batchDataReader =
+          new BatchDataReader(partition, batchSize, parquetReaderType);
+      return batchDataReader;

Review Comment:
   Minor: Redundant local var, just return?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to