aokolnychyi commented on code in PR #9841: URL: https://github.com/apache/iceberg/pull/9841#discussion_r1585438330
########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java: ########## @@ -32,23 +32,27 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> { private final int batchSize; + private final ParquetReaderType parquetReaderType; Review Comment: Instead of passing this variable, let's create `BatchReadConf` and pass a reference to it here. I'd consider adding `batchReadConf()` method in `SparkReadConf` with the following fields (if you want, we can add a builder for `BatchReadConf` as well). ``` class BatchReadConf { int orcBatchSize() {...} ParquetReaderType parquetReaderType() {...} int parquetBatchSize() {...} } ``` ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java: ########## @@ -74,48 +71,23 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { return columnarBatch; } - private class ColumnBatchLoader { - private final int numRowsToRead; - // the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when - // there is no deletes - private int[] rowIdMapping; - // the array to indicate if a row is deleted or not, it is null when there is no "_deleted" - // metadata column - private boolean[] isDeleted; + private class ColumnBatchLoader extends BaseColumnBatchLoader { ColumnBatchLoader(int numRowsToRead) { - Preconditions.checkArgument( - numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); - this.numRowsToRead = numRowsToRead; - if (hasIsDeletedColumn) { - isDeleted = new boolean[numRowsToRead]; - } + super(numRowsToRead, hasIsDeletedColumn, deletes, rowStartPosInBatch); } - ColumnarBatch loadDataToColumnBatch() { + @Override + public ColumnarBatch loadDataToColumnBatch() { int numRowsUndeleted = initRowIdMapping(); - ColumnVector[] arrowColumnVectors = readDataToColumnVectors(); - - ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors); - newColumnarBatch.setNumRows(numRowsUndeleted); - - if (hasEqDeletes()) { - applyEqDelete(newColumnarBatch); - } - - if (hasIsDeletedColumn && rowIdMapping != null) { - // reset the row id mapping array, so that it doesn't filter out the deleted rows - for (int i = 0; i < numRowsToRead; i++) { - rowIdMapping[i] = i; - } - newColumnarBatch.setNumRows(numRowsToRead); - } - + ColumnarBatch newColumnarBatch = Review Comment: Minor: Redundant local var? ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java: ########## @@ -115,11 +115,11 @@ private String[][] computePreferredLocations() { public PartitionReaderFactory createReaderFactory() { if (useParquetBatchReads()) { int batchSize = readConf.parquetBatchSize(); - return new SparkColumnarReaderFactory(batchSize); + return new SparkColumnarReaderFactory(batchSize, readConf.parquetReaderType()); Review Comment: This is where we will call `readConf.batchReadConf()` or whatever we want to call it. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/BaseColumnBatchLoader.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized; + +import java.util.Iterator; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +@SuppressWarnings("checkstyle:VisibilityModifier") Review Comment: We would want to structure this a bit differently. Let me think more. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/comet/CometColumnReader.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized.comet; Review Comment: Question: Is there a benefit in having these changes as a separate package? I think the naming separates these classes enough. If we have it in the same package with built-in Parquet readers, we will avoid the need to make some classes public. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java: ########## @@ -27,6 +27,11 @@ private SparkSQLProperties() {} // Controls whether vectorized reads are enabled public static final String VECTORIZATION_ENABLED = "spark.sql.iceberg.vectorization.enabled"; + // Controls which parquet reader to use for vectorization: either Iceberg's parquet reader or Review Comment: I think just stating `Controls which Parquet reader to use` should be enough. We may use that beyond vectorized reads in the future. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/ParquetReaderType.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +public enum ParquetReaderType { Review Comment: Does this have to be public? ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java: ########## @@ -32,23 +32,27 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.ParquetReaderType; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> { private final int batchSize; + private final ParquetReaderType parquetReaderType; Review Comment: Then we will have a reference to `BatchReadConf` in `BaseBatchReader` as opposed to having separate `batchSize`, `parquetReaderType`, and future fields we will have to add. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java: ########## @@ -28,10 +29,12 @@ class SparkColumnarReaderFactory implements PartitionReaderFactory { private final int batchSize; + private final ParquetReaderType parquetReaderType; - SparkColumnarReaderFactory(int batchSize) { + SparkColumnarReaderFactory(int batchSize, ParquetReaderType readType) { Review Comment: This one will also get a reference to `BatchReadConf` as opposed to separate variables. The batch size validation can be moved to `BatchReadConf` itself. ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java: ########## @@ -49,7 +52,9 @@ public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition inputP SparkInputPartition partition = (SparkInputPartition) inputPartition; if (partition.allTasksOfType(FileScanTask.class)) { - return new BatchDataReader(partition, batchSize); + BatchDataReader batchDataReader = + new BatchDataReader(partition, batchSize, parquetReaderType); + return batchDataReader; Review Comment: Minor: Redundant local var, just return? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org