stevenzwu commented on code in PR #13935:
URL: https://github.com/apache/iceberg/pull/13935#discussion_r2305505502


##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java:
##########
@@ -212,6 +215,10 @@ public VectorHolder read(VectorHolder reuse, int 
numValsToRead) {
   }
 
   private void allocateFieldVector(boolean dictionaryEncodedVector) {
+    // There is a possibility that a vector of a different type was in use 
earlier, so close it if so.
+    if (this.vec != null) {
+      vec.close();
+    }

Review Comment:
   nit: newline



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java:
##########
@@ -56,7 +58,8 @@ public static ColumnarBatchReader buildReader(
       Schema expectedSchema,
       MessageType fileSchema,
       Map<Integer, ?> idToConstant,
-      DeleteFilter<InternalRow> deleteFilter) {
+      DeleteFilter<InternalRow> deleteFilter,
+      BufferAllocator bufferAllocator) {
     return (ColumnarBatchReader)
         TypeWithSchemaVisitor.visit(
             expectedSchema.asStruct(),

Review Comment:
   should we make this new factory method package private and add the 
`@VisibleFoTesting` annotation?



##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java:
##########
@@ -161,28 +164,34 @@ void assertRecordsMatch(
       boolean reuseContainers,
       int batchSize)
       throws IOException {
-    Parquet.ReadBuilder readBuilder =
-        Parquet.read(inputFile)
-            .project(schema)
-            .recordsPerBatch(batchSize)
-            .createBatchedReaderFunc(
-                type ->
-                    VectorizedSparkParquetReaders.buildReader(
-                        schema, type, Maps.newHashMap(), null));
-    if (reuseContainers) {
-      readBuilder.reuseContainers();
-    }
-    try (CloseableIterable<ColumnarBatch> batchReader = readBuilder.build()) {
-      Iterator<GenericData.Record> expectedIter = expected.iterator();
-      Iterator<ColumnarBatch> batches = batchReader.iterator();
-      int numRowsRead = 0;
-      while (batches.hasNext()) {
-        ColumnarBatch batch = batches.next();
-        numRowsRead += batch.numRows();
-        TestHelpers.assertEqualsBatch(schema.asStruct(), expectedIter, batch);
-      }
-      assertThat(numRowsRead).isEqualTo(expectedSize);
-    }
+    assertNoLeak(
+        "testRecordsMatch",

Review Comment:
   don't we need to pass in the test method name here?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java:
##########
@@ -67,7 +70,17 @@ public static ColumnarBatchReader buildReader(
                 NullCheckingForGet.NULL_CHECKING_ENABLED,
                 idToConstant,
                 ColumnarBatchReader::new,
-                deleteFilter));
+                deleteFilter,
+                bufferAllocator));
+  }
+
+  public static ColumnarBatchReader buildReader(
+      Schema expectedSchema,
+      MessageType fileSchema,
+      Map<Integer, ?> idToConstant,
+      DeleteFilter<InternalRow> deleteFilter) {
+    return buildReader(
+        expectedSchema, fileSchema, idToConstant, deleteFilter, 
ArrowAllocation.rootAllocator());

Review Comment:
   do we also need to change the `buildCometReader` below?



##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java:
##########
@@ -336,4 +345,15 @@ public void testUuidReads() throws Exception {
     }
     assertRecordsMatch(schema, numRows, data, dataFile.toInputFile(), false, 
BATCH_SIZE);
   }
+
+  protected void assertNoLeak(String testName, Consumer<BufferAllocator> 
testFunction) {

Review Comment:
   beautiful solution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to