aokolnychyi commented on code in PR #9841:
URL: https://github.com/apache/iceberg/pull/9841#discussion_r1575416837


##########
api/src/main/java/org/apache/iceberg/ReaderType.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+public enum ReaderType {

Review Comment:
   I don't think this belongs in `api` module. If we can use Comet readers 
without Spark, then this enum should go in `org.apache.iceberg.parquet` package 
in `data` module. If Comet can be used only with Spark, then it should go into 
Spark module and be called `ParquetReaderType`.



##########
build.gradle:
##########
@@ -45,6 +45,7 @@ buildscript {
   }
 }
 
+String sparkMajorVersion = '3.4'

Review Comment:
   I hope we can soon have a snapshot for Comet jar independent of Spark to 
clean up deps here.
   We can't have `parquet` module depend on a jar with any Spark deps.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java:
##########
@@ -51,22 +53,37 @@ public class VectorizedSparkParquetReaders {
 
   private VectorizedSparkParquetReaders() {}
 
-  public static ColumnarBatchReader buildReader(
+  public static VectorizedReader buildReader(

Review Comment:
   Why change this? Why not return `ColumnarBatchReader` here and 
`CometColumnarBatchReader` below?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/BaseColumnBatchLoader.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.Iterator;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+@SuppressWarnings("checkstyle:VisibilityModifier")

Review Comment:
   These changes would require a bit more time to review. I'll do that 
tomorrow. I think we would want to restructure the original implementation a 
bit. Not a concern for now.



##########
spark/v3.4/build.gradle:
##########
@@ -70,8 +70,11 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
       exclude group: 'io.netty', module: 'netty-buffer'
       exclude group: 'io.netty', module: 'netty-common'
       exclude group: 'org.roaringbitmap'
+      exclude group: 'org.apache.comet'
     }
 
+    compileOnly 
"org.apache.comet:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.1.0-SNAPSHOT"

Review Comment:
   +1 for exploring that.



##########
gradle.properties:
##########
@@ -20,8 +20,8 @@ systemProp.defaultFlinkVersions=1.18
 systemProp.knownFlinkVersions=1.16,1.17,1.18
 systemProp.defaultHiveVersions=2
 systemProp.knownHiveVersions=2,3
-systemProp.defaultSparkVersions=3.5

Review Comment:
   These changes should be reverted in the final PR.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -353,4 +354,12 @@ private boolean executorCacheLocalityEnabledInternal() {
         
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_LOCALITY_ENABLED_DEFAULT)
         .parse();
   }
+
+  public ReaderType getReaderType() {

Review Comment:
   Iceberg doesn't use `get` prefixes in methods.
   I also think it is better to call it `parquetReaderType()`.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java:
##########
@@ -27,6 +28,10 @@ private SparkSQLProperties() {}
   // Controls whether vectorized reads are enabled
   public static final String VECTORIZATION_ENABLED = 
"spark.sql.iceberg.vectorization.enabled";
 
+  // Controls whether which reader to use for vectorization

Review Comment:
   I don't think the comment is accurate.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java:
##########
@@ -196,6 +201,33 @@ private Duration toDuration(String time) {
     }
   }
 
+  class ReadTypeConfParser extends ConfParser<ReadTypeConfParser, ReaderType> {

Review Comment:
   Instead of adding a new `ConfParser`, we previously used `stringConf()` to 
parse the string value and then convert that string to enum. Take a look at 
`dataPlanningMode()` method in `SparkReadConf`.
   
   If you believe a dedicated conf parser will help readability, we can add a 
generic parser for enums.
   
   ```
   class EnumConfParser<T> extends ConfParser<EnumConfParser<T>, T> {
     private final Function<String, T> toEnum;
     private T defaultValue;
   
     EnumConfParser(Function<String, T> toEnum) {
       this.toEnum = toEnum;
     }
   
     @Override
     protected EnumConfParser<T> self() {
       return this;
     }
   
     public EnumConfParser<T> defaultValue(T value) {
       this.defaultValue = value;
       return self();
     }
   
     public T parse() {
       Preconditions.checkArgument(defaultValue != null, "Default value cannot 
be null");
       return parse(toEnum, defaultValue);
     }
   
     public T parseOptional() {
       return parse(toEnum, defaultValue);
     }
   }
   ```
   
   You would call it like below.
   
   ```
   public ReaderType parquetReaderType() {
     return confParser
         .enumConf(ReaderType::fromName)
         .sessionConf(SparkSQLProperties.PARQUET_READER_TYPE)
         .defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT)
         .parse();
   }
   ```



##########
spark/v3.4/build.gradle:
##########
@@ -70,8 +70,11 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
       exclude group: 'io.netty', module: 'netty-buffer'
       exclude group: 'io.netty', module: 'netty-common'
       exclude group: 'org.roaringbitmap'
+      exclude group: 'org.apache.comet'

Review Comment:
   We don't anticipate Spark to bundle Comet, do we?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java:
##########
@@ -27,6 +28,10 @@ private SparkSQLProperties() {}
   // Controls whether vectorized reads are enabled
   public static final String VECTORIZATION_ENABLED = 
"spark.sql.iceberg.vectorization.enabled";
 
+  // Controls whether which reader to use for vectorization
+  public static final String READER_TYPE = 
"spark.sql.iceberg.parquet.reader-type";

Review Comment:
   Shall we call it `PARQUET_READER_TYPE`?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java:
##########
@@ -86,9 +92,15 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
         .project(requiredSchema)
         .split(start, length)
         .createBatchedReaderFunc(
-            fileSchema ->
-                VectorizedSparkParquetReaders.buildReader(
-                    requiredSchema, fileSchema, idToConstant, deleteFilter))
+            fileSchema -> {
+              if (this.readerType == ReaderType.COMET) {

Review Comment:
   We only use `this.` when setting values, not accessing.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java:
##########
@@ -49,7 +52,9 @@ public PartitionReader<ColumnarBatch> 
createColumnarReader(InputPartition inputP
     SparkInputPartition partition = (SparkInputPartition) inputPartition;
 
     if (partition.allTasksOfType(FileScanTask.class)) {
-      return new BatchDataReader(partition, batchSize);
+      BatchDataReader batchDataReader = new BatchDataReader(partition, 
batchSize);
+      batchDataReader.setReadType(readType);

Review Comment:
   I feel like it should be passed to `BatchDataReader`'s constructor. 



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java:
##########
@@ -115,11 +115,11 @@ private String[][] computePreferredLocations() {
   public PartitionReaderFactory createReaderFactory() {
     if (useParquetBatchReads()) {
       int batchSize = readConf.parquetBatchSize();
-      return new SparkColumnarReaderFactory(batchSize);
+      return new SparkColumnarReaderFactory(batchSize, 
readConf.getReaderType());

Review Comment:
   We will need to think whether a new class holding batchSize, reader type, 
and any future variables makes sense. Let me think about this and come back 
tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to