and124578963 opened a new issue, #4590:
URL: https://github.com/apache/datafusion-comet/issues/4590

   ### Describe the bug
   
   Comet native Iceberg scan can return duplicate rows when an Iceberg 
`FileScanTask` byte range splits a Parquet file that contains a single row 
group.
   The issue appears when:
   - the Iceberg table has a Parquet data file with one row group;
   - the file is planned into multiple byte-range scan tasks, e.g. via 
`split-size` smaller than row group size;
   - Comet uses `CometIcebergNativeScanExec`.
   
   Expected behavior: the row group should be read by exactly one split.
   Actual behavior: the same row group is read by multiple split tasks, so 
matching rows are returned multiple times.
   Vanilla Spark/Iceberg does not duplicate rows.
   
   ### Steps to reproduce
   
   Minimal Scala regression test:
   
   ```scala
   test("native Iceberg scan does not duplicate a row group split by byte 
range") {
     assume(icebergAvailable, "Iceberg not available in classpath")
     withTempIcebergDir { warehouseDir =>
       withSQLConf(
         "spark.sql.catalog.split_cat" -> 
"org.apache.iceberg.spark.SparkCatalog",
         "spark.sql.catalog.split_cat.type" -> "hadoop",
         "spark.sql.catalog.split_cat.warehouse" -> 
warehouseDir.getAbsolutePath,
         CometConf.COMET_ENABLED.key -> "true",
         CometConf.COMET_EXEC_ENABLED.key -> "true",
         CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
         val dataPath = 
s"${warehouseDir.getAbsolutePath}/single_row_group_parquet"
         spark
           .sql("SELECT CAST(0 AS INT) AS id, repeat('x', 1024) AS payload")
           .coalesce(1)
           .write
           .mode("overwrite")
           .parquet(dataPath)
         spark.sql("""
           CREATE TABLE split_cat.db.single_row_group_split (
             id INT,
             payload STRING
           ) USING iceberg
         """)
         val parquetFiles = new File(dataPath)
           .listFiles()
           .filter(file => file.getName.startsWith("part-") && 
file.getName.endsWith(".parquet"))
         assert(parquetFiles.length == 1)
         val sourceParquetFile = parquetFiles.head
         val catalog = spark.sessionState.catalogManager.catalog("split_cat")
         val ident =
           org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"), 
"single_row_group_split")
         val table = catalog
           .asInstanceOf[org.apache.iceberg.spark.SparkCatalog]
           .loadTable(ident)
           .asInstanceOf[org.apache.iceberg.spark.source.SparkTable]
           .table()
         val dataFile = org.apache.iceberg.DataFiles
           .builder(table.spec())
           .withPath(sourceParquetFile.getAbsolutePath)
           .withFormat(org.apache.iceberg.FileFormat.PARQUET)
           .withFileSizeInBytes(sourceParquetFile.length())
           .withRecordCount(1)
           .build()
         table.newAppend().appendFile(dataFile).commit()
         val df = spark.read
           .format("iceberg")
           .option("split-size", "64")
           .option("file-open-cost", "64")
           .load("split_cat.db.single_row_group_split")
           .where("id = 0")
           .select("id")
         val rows = df.collect()
         assert(rows.length == 1, s"Expected 1 row, got ${rows.length}: 
${rows.mkString(", ")}")
       }
     }
   }
   ```
   
   ### Actual behavior
   
   The query returns the same row multiple times.
   
   Example failure:
   ```text
     Expected 1 row, got 4: [0], [0], [0], [0]
   ```
   
   
   ### Expected behavior
   
   The query should return exactly one row:
   ```text
     [0]
   ```
   
   ### Additional context
   
   The same Parquet row group appears to be selected by multiple byte-range 
tasks.
   This happens because the native iceberg-rust reader treats row group 
selection as byte-range overlap:
   
   ```text
     row_group_start < split_end && split_start < row_group_end
   ```
   
   For a single row group split into N byte ranges, all N ranges overlap the 
row group, so all N tasks read and emit it.
   
   Parquet Java / vanilla Spark avoids this by assigning each row group to 
exactly one split, using row-group midpoint ownership semantics. In 
parquet-java, split filtering keeps a row group only when the split's range 
contains the row group's midpoint:
   
   ```java
   long midPoint = startIndex + totalSize / 2;
   if (filter.contains(midPoint)) {
     newRowGroups.add(rowGroup);
   }
   ```
   
   
   ### Environment
   
   - Comet version: current local 0.16.0-SNAPSHOT / 0.16.0.001
   - Spark: 3.5.8
   - Iceberg catalog: Hadoop catalog
   - Native Iceberg scan enabled:
   ```text
     spark.comet.scan.icebergNative.enabled=true
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to