sririshindra commented on code in PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#discussion_r1157857579


##########
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java:
##########
@@ -154,21 +177,47 @@ public T next() {
     }
 
     private void advance() {
-      while (shouldSkip[nextRowGroup]) {
-        nextRowGroup += 1;
-        reader.skipNextRowGroup();
-      }
-      PageReadStore pages;
       try {
-        pages = reader.readNextRowGroup();
-      } catch (IOException e) {
-        throw new RuntimeIOException(e);
+        Preconditions.checkNotNull(prefetchRowGroupFuture, "future should not 
be null");
+        PageReadStore pages = prefetchRowGroupFuture.get();
+
+        if (prefetchedRowGroup >= totalRowGroups) {
+          return;
+        }
+        Preconditions.checkState(
+            pages != null,
+            "advance() should have been only when there was at least one row 
group to read");
+        long rowPosition = rowGroupsStartRowPos[prefetchedRowGroup];
+        model.setRowGroupInfo(pages, 
columnChunkMetadata.get(prefetchedRowGroup), rowPosition);
+        nextRowGroupStart += pages.getRowCount();
+        prefetchedRowGroup += 1;
+        prefetchNextRowGroup(); // eagerly fetch the next row group

Review Comment:
   Why stop with just the next row group? Can we instead maintain a RowGroup 
queue of RowGroup futures, where we keep populating the queue with the next 
RowGroup continuously and asynchronously till we exhaust all available 
RowGroups in the file? 
   We can deque the RowGroup queue as needed. This way, we may reduce the 
likelihood of the `prefetchRowGroupFuture.get()` call ever having to wait for 
the future computation to finish.. What do you think?



##########
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java:
##########
@@ -109,11 +117,23 @@ public CloseableIterator<T> iterator() {
     private final int batchSize;
     private final List<Map<ColumnPath, ColumnChunkMetaData>> 
columnChunkMetadata;
     private final boolean reuseContainers;
-    private int nextRowGroup = 0;
     private long nextRowGroupStart = 0;
     private long valuesRead = 0;
     private T last = null;
     private final long[] rowGroupsStartRowPos;
+    private final int totalRowGroups;
+    private static final ExecutorService prefetchService =
+        MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor)
+                Executors.newFixedThreadPool(
+                    4,

Review Comment:
   Just curious as why you chose the number of threads in the pool to to be 
'4'. In this current implementation you are prefetching only one row group at a 
time. Correct? shouldn't one thread in the thread pool suffice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to