jasonf20 commented on PR #9222:
URL: https://github.com/apache/iceberg/pull/9222#issuecomment-1841290690
Hi @nastra
I’m not very familiar with this codebase and adding tests that will fail at
the right time would take me a long time. Unfortunately, I can’t do that right
now.
However, below is the code I used to reproduce the issue locally:
```scala
import java.util
import java.util.UUID
import scala.collection.JavaConverters._
import org.apache.iceberg.aws.glue.GlueCatalog
import org.apache.iceberg.catalog._
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.data.parquet.GenericParquetWriter
import org.apache.iceberg.parquet.Parquet
import org.apache.iceberg.types.Types
import org.apache.iceberg.{DataFile, PartitionSpec, Schema, Table, data}
object TestRewriteCommits {
def main(args: Array[String]): Unit = {
val catalog = new GlueCatalog()
catalog.initialize("iceberg", Map.empty[String, String].asJava)
val schema = new Schema(
Types.NestedField.required(1, "id", Types.StringType.get()),
);
val tableName = "temp4"
val tableId = TableIdentifier.of("prod_iceberg", tableName)
val basePath = s"s3://s3-bucket-path/ice/tables/${tableName}/"
val tableProperties: util.Map[String, String] = Map(
"format-version" -> "2",
"commit.retry.num-retries" -> "0" //turn off retries for more control
during testing process
).asJava
if (!catalog.tableExists(tableId)) {
catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(),
basePath, tableProperties)
}
val table = catalog.loadTable(tableId)
val addedFiles = (1 to 2).map(i => {
val file: DataFile = writeFile(basePath, table)
val append = table.newAppend()
append.appendFile(file)
append.commit()
file
})
val transaction = table.newTransaction()
val rewrite = transaction.newRewrite()
addedFiles.foreach(rewrite.deleteFile)
rewrite.addFile(writeFile(basePath, table))
rewrite.commit()
try {
// Make sure this commit fails (I failed it by breaking at
glue.updateTable(updateTableRequest.build()); and changing the table from
athena.
transaction.commitTransaction()
} catch {
case e: Throwable =>
// This retry will run successfully but the result will not contain
the data file added during the rewrite.
transaction.commitTransaction()
}
}
private def writeFile(basePath: String, table: Table) = {
val writer = Parquet.writeData(
table.io().newOutputFile(basePath + UUID.randomUUID().toString +
".parquet"))
.forTable(table)
.overwrite(true)
.createWriterFunc(GenericParquetWriter.buildWriter)
.build[data.Record]()
writer.write(Iterable(GenericRecord.create(table.schema()).copy("id",
"1")).asJava)
writer.close()
val file = writer.toDataFile
file
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]