dmgcodevil opened a new issue, #5675:
URL: https://github.com/apache/iceberg/issues/5675

   ### Query engine
   
   Flink
   
   ### Question
   
   We have a streaming Flink job that continously consumes records from Kafka 
and stores them into Iceberg. 
   The 
[RewriteDataFilesAction](https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java)
 is getting slower over time and `TaskManager` is starting to fails with OOM. 
Is it possible to limit the number of data files for compaction? I have the 
following ideas:
   
   1. Modify 
https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java#L49
 to limit a number of `FileScanTask` in `CombinedScanTask`. We can create some 
sort of _wrapper_:
   
   ```java
       class BoundedCombinedScanTask implements CombinedScanTask {
           private final CombinedScanTask original;
           private final int limit;
   
           public BoundedCombinedScanTask(CombinedScanTask original, int limit) 
{
               this.original = original;
               this.limit = limit;
           }
   
           @Override
           public Collection<FileScanTask> files() {
               return 
original.files().stream().limit(limit).collect(Collectors.toList());
           }
   
           @Override
           public CombinedScanTask asCombinedScanTask() {
               return original;
           }
       }
   ```
   
   Flink code will look as follows:
   
   ```java
   protected List<DataFile> rewriteDataForTasks(final List<CombinedScanTask> 
combinedScanTasks) {
           final List<CombinedScanTask boundedCombinedScanTasks = 
combinedScanTasks.stream(t -> new BoundedCombinedScanTask(t, 
100)).collect(Collectors.toList());
           int size = boundedCombinedScanTasks .size();
           int parallelism = Math.min(size, this.maxParallelism);
           DataStream<CombinedScanTask> dataStream = 
this.env.fromCollection(boundedCombinedScanTasks );
           RowDataRewriter rowDataRewriter = new RowDataRewriter(this.table(), 
this.caseSensitive(), this.fileIO(), this.encryptionManager());
   
           try {
               return rowDataRewriter.rewriteDataForTasks(dataStream, 
parallelism);
           } catch (Exception var7) {
               throw new RuntimeException("Rewrite data file error.", var7);
           }
       }
   ```
   
   2. Modify 
[BaseRewriteDataFilesAction](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java)
 
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java#L268
   
   So that each group will contain at most N files.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to