dchristle commented on issue #3703:
URL: https://github.com/apache/iceberg/issues/3703#issuecomment-1345717163
@RussellSpitzer We have also hit this issue after doing a large copy of rows
into a single Iceberg table. We could have avoided it by more carefully
partitioning before the insert, but it's too late for that. There are
definitely too many files -- we've cleaned them up via calling
`rewriteDataFiles` in a loop over subsets of the table. But we can't expire the
snapshots in a similar piece-by-piece way. The driver reports that it cannot
broadcast more than 8GB -- our driver memory is already set much higher than
this, so increasing driver memory won't fix it.
Is there any way to partially distribute this operation to executors? For
instance, in rough pseudo-code just based on copying the code from the original
post (and making it more Scala-esque),
```
val joinCond = nameEqual.and(actualContains);
val orphanFiles = actualFileDF.join(validFileDF, joinCond,
"leftanti").persist(DISK_ONLY)
val orphanFileCount = orphanFiles.count()
orphanFiles
.repartition(Math.ceil(orphanFileCount/1000))
.map{fileBatch =>
Tasks.forEach(fileBatch)
.noRetry()
.suppressFailureWhenFinished()
.onFailure((file, exec) => LOG.warn("Failed to delete file: {}",
file, exec))
.run(deleteFunc::accept)
}
```
Adding a small `sleep` call would limit the QPS against the distributed file
store, if that's a concern.
But without a change like this, I'm not sure how we can expire old
snapshots/delete orphan files. Are there any other workarounds, besides
increasing driver memory?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]