dmgcodevil opened a new issue, #5675: URL: https://github.com/apache/iceberg/issues/5675
### Query engine Flink ### Question We have a streaming Flink job that continously consumes records from Kafka and stores them into Iceberg. The [RewriteDataFilesAction](https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java) is getting slower over time and `TaskManager` is starting to fails with OOM. Is it possible to limit the number of data files for compaction? I have the following ideas: 1. Modify https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java#L49 to limit a number of `FileScanTask` in `CombinedScanTask`. We can create some sort of _wrapper_: ```java class BoundedCombinedScanTask implements CombinedScanTask { private final CombinedScanTask original; private final int limit; public BoundedCombinedScanTask(CombinedScanTask original, int limit) { this.original = original; this.limit = limit; } @Override public Collection<FileScanTask> files() { return original.files().stream().limit(limit).collect(Collectors.toList()); } @Override public CombinedScanTask asCombinedScanTask() { return original; } } ``` Flink code will look as follows: ```java protected List<DataFile> rewriteDataForTasks(final List<CombinedScanTask> combinedScanTasks) { final List<CombinedScanTask boundedCombinedScanTasks = combinedScanTasks.stream(t -> new BoundedCombinedScanTask(t, 100)).collect(Collectors.toList()); int size = boundedCombinedScanTasks .size(); int parallelism = Math.min(size, this.maxParallelism); DataStream<CombinedScanTask> dataStream = this.env.fromCollection(boundedCombinedScanTasks ); RowDataRewriter rowDataRewriter = new RowDataRewriter(this.table(), this.caseSensitive(), this.fileIO(), this.encryptionManager()); try { return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism); } catch (Exception var7) { throw new RuntimeException("Rewrite data file error.", var7); } } ``` 2. Modify [BaseRewriteDataFilesAction](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java) https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java#L268 So that each group will contain at most N files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org