aokolnychyi commented on code in PR #11086: URL: https://github.com/apache/iceberg/pull/11086#discussion_r1757656126
########## core/src/main/java/org/apache/iceberg/SnapshotProducer.java: ########## @@ -554,6 +562,84 @@ protected boolean cleanupAfterCommit() { return true; } + protected List<ManifestFile> writeDataManifests(List<DataFile> files, PartitionSpec spec) { + return writeDataManifests(files, null /* inherit data seq */, spec); + } + + protected List<ManifestFile> writeDataManifests( + List<DataFile> files, Long dataSeq, PartitionSpec spec) { + return writeManifests(files, group -> writeDataFileGroup(group, dataSeq, spec)); + } + + private List<ManifestFile> writeDataFileGroup( + List<DataFile> files, Long dataSeq, PartitionSpec spec) { + RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec); + + try (RollingManifestWriter<DataFile> closableWriter = writer) { + if (dataSeq != null) { + files.forEach(file -> closableWriter.add(file, dataSeq)); + } else { + files.forEach(closableWriter::add); + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to write data manifests"); + } + + return writer.toManifestFiles(); + } + + protected List<ManifestFile> writeDeleteManifests( + List<DeleteFileHolder> files, PartitionSpec spec) { + return writeManifests(files, group -> writeDeleteFileGroup(group, spec)); + } + + private List<ManifestFile> writeDeleteFileGroup( + List<DeleteFileHolder> files, PartitionSpec spec) { + RollingManifestWriter<DeleteFile> writer = newRollingDeleteManifestWriter(spec); + + try (RollingManifestWriter<DeleteFile> closableWriter = writer) { + for (DeleteFileHolder file : files) { + if (file.dataSequenceNumber() != null) { + closableWriter.add(file.deleteFile(), file.dataSequenceNumber()); + } else { + closableWriter.add(file.deleteFile()); + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to write delete manifests"); + } + + return writer.toManifestFiles(); + } + + private static <F> List<ManifestFile> writeManifests( + List<F> files, Function<List<F>, List<ManifestFile>> writeFunc) { + int groupSize = manifestFileGroupSize(ThreadPools.WORKER_THREAD_POOL_SIZE, files.size()); + List<List<F>> groups = Lists.partition(files, groupSize); + Queue<ManifestFile> manifests = Queues.newConcurrentLinkedQueue(); + Tasks.foreach(groups) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(ThreadPools.getWorkerPool()) + .run(group -> manifests.addAll(writeFunc.apply(group))); + return ImmutableList.copyOf(manifests); + } + + /** + * Calculates how many files can be processed concurrently depending on the provided parallelism Review Comment: We will have a follow-up PR to make `ManifestWriter` smarter and avoid creating a new file for a few entires. This should cover 1. That said, it shouldn't be a big deal even today as the parallelism is always limited. 10K is actually the min target number of files that can be handled by a single rolling manifest writer, not the max. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org