jasonf20 opened a new issue, #9227: URL: https://github.com/apache/iceberg/issues/9227
### Apache Iceberg version 1.4.2 (latest release) ### Query engine Athena ### Please describe the bug 🐞 Since the following PR: https://github.com/apache/iceberg/pull/6335 FastAppend and subclasses of MergingSnapshotProducer will skip newly added data files during manual retries. This happens because the [cached value is set to an empty list instead of null](https://github.com/ConeyLiu/iceberg/blob/73d4abbc863d7fe5a0076b09c662a18569c92d4b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L920) during `cleanUncommittedAppends` and then during retry when `newDataFilesAsManifests` is called the logic is skipped and no data files are returned. The result can be partially applied changes if the user does manual retries of commits. For example, the following code will produce a rewrite that applies the deletes but does not add the new file: ```scala import java.util import java.util.UUID import scala.collection.JavaConverters._ import org.apache.iceberg.aws.glue.GlueCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.data.GenericRecord import org.apache.iceberg.data.parquet.GenericParquetWriter import org.apache.iceberg.parquet.Parquet import org.apache.iceberg.types.Types import org.apache.iceberg.{DataFile, PartitionSpec, Schema, Table, data} object TestRewriteCommits { def main(args: Array[String]): Unit = { val catalog = new GlueCatalog() catalog.initialize("iceberg", Map.empty[String, String].asJava) val schema = new Schema( Types.NestedField.required(1, "id", Types.StringType.get()), ); val tableName = "temp4" val tableId = TableIdentifier.of("prod_iceberg", tableName) val basePath = s"s3://s3-bucket-path/ice/tables/${tableName}/" val tableProperties: util.Map[String, String] = Map( "format-version" -> "2", "commit.retry.num-retries" -> "0" //turn off retries for more control during testing process ).asJava if (!catalog.tableExists(tableId)) { catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), basePath, tableProperties) } val table = catalog.loadTable(tableId) val addedFiles = (1 to 2).map(i => { val file: DataFile = writeFile(basePath, table) val append = table.newAppend() append.appendFile(file) append.commit() file }) val transaction = table.newTransaction() val rewrite = transaction.newRewrite() addedFiles.foreach(rewrite.deleteFile) rewrite.addFile(writeFile(basePath, table)) rewrite.commit() try { // Make sure this commit fails (I failed it by breaking at glue.updateTable(updateTableRequest.build()); and changing the table from athena. transaction.commitTransaction() } catch { case e: Throwable => // This retry will run successfully but the result will not contain the data file added during the rewrite. transaction.commitTransaction() } } private def writeFile(basePath: String, table: Table) = { val writer = Parquet.writeData( table.io().newOutputFile(basePath + UUID.randomUUID().toString + ".parquet")) .forTable(table) .overwrite(true) .createWriterFunc(GenericParquetWriter.buildWriter) .build[data.Record]() writer.write(Iterable(GenericRecord.create(table.schema()).copy("id", "1")).asJava) writer.close() val file = writer.toDataFile file } } ``` I think fixing this can be done by either setting the cached value to null like it was before or by forbidding calling `commit` more than once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org