singhpk234 commented on code in PR #12260:
URL: https://github.com/apache/iceberg/pull/12260#discussion_r1955584774


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -309,6 +312,49 @@ private static StreamingOffset 
determineStartingOffset(Table table, Long fromTim
     }
   }
 
+  private static int getMaxFiles(ReadLimit readLimit) {
+    if (readLimit instanceof ReadMaxFiles) {
+      return ((ReadMaxFiles) readLimit).maxFiles();
+    }
+
+    if (readLimit instanceof CompositeReadLimit) {
+      // We do not expect a CompositeReadLimit to contain a nested 
CompositeReadLimit.
+      // In fact, it should only be a composite of two or more of ReadMinRows, 
ReadMaxRows and
+      // ReadMaxFiles, with no more than one of each.
+      ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
+      for (int i = 0; i < limits.length; i++) {
+        ReadLimit limit = limits[i];
+        if (limit instanceof ReadMaxFiles) {
+          return ((ReadMaxFiles) limit).maxFiles();
+        }
+      }

Review Comment:
   [minor] can we use this ? 
   ```suggestion
         for (ReadLimit limit: limits) {
           if (limit instanceof ReadMaxFiles) {
             return ((ReadMaxFiles) limit).maxFiles();
           }
         }
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -458,7 +502,7 @@ public ReadLimit getDefaultReadLimit() {
         && maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
       ReadLimit[] readLimits = new ReadLimit[2];
       readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch);
-      readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch);
+      readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch);

Review Comment:
   Thank you for catching ! This got missed, as we don't take the Readlimit we 
get from latestOffset API but rather from the configs which are set in 
constructor earlier!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to