singhpk234 commented on code in PR #12260: URL: https://github.com/apache/iceberg/pull/12260#discussion_r1955584774
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -309,6 +312,49 @@ private static StreamingOffset determineStartingOffset(Table table, Long fromTim } } + private static int getMaxFiles(ReadLimit readLimit) { + if (readLimit instanceof ReadMaxFiles) { + return ((ReadMaxFiles) readLimit).maxFiles(); + } + + if (readLimit instanceof CompositeReadLimit) { + // We do not expect a CompositeReadLimit to contain a nested CompositeReadLimit. + // In fact, it should only be a composite of two or more of ReadMinRows, ReadMaxRows and + // ReadMaxFiles, with no more than one of each. + ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits(); + for (int i = 0; i < limits.length; i++) { + ReadLimit limit = limits[i]; + if (limit instanceof ReadMaxFiles) { + return ((ReadMaxFiles) limit).maxFiles(); + } + } Review Comment: [minor] can we use this ? ```suggestion for (ReadLimit limit: limits) { if (limit instanceof ReadMaxFiles) { return ((ReadMaxFiles) limit).maxFiles(); } } ``` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java: ########## @@ -458,7 +502,7 @@ public ReadLimit getDefaultReadLimit() { && maxRecordsPerMicroBatch != Integer.MAX_VALUE) { ReadLimit[] readLimits = new ReadLimit[2]; readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch); - readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch); + readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch); Review Comment: Thank you for catching ! This got missed, as we don't take the Readlimit we get from latestOffset API but rather from the configs which are set in constructor earlier! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org