RussellSpitzer commented on PR #13880:
URL: https://github.com/apache/iceberg/pull/13880#issuecomment-3224651328

   Test For Memory Leak, still working on nailing down where this is happening 
but it's unrelated to the test parameterization.
   
   ```
   diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java
   index a6b5166b3..316ef762e 100644
   --- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java
   +++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java
   @@ -18,9 +18,163 @@
     */
    package org.apache.iceberg.spark.source;
    
   +import static org.apache.iceberg.Files.localOutput;
   +import static org.assertj.core.api.Assertions.assertThat;
   +
   +import java.io.File;
   +import java.io.IOException;
   +import java.lang.management.BufferPoolMXBean;
   +import java.lang.management.ManagementFactory;
   +import java.nio.file.Path;
   +import java.util.List;
   +import java.util.UUID;
   +import org.apache.avro.generic.GenericData;
   +import org.apache.hadoop.conf.Configuration;
   +import org.apache.iceberg.DataFile;
   +import org.apache.iceberg.DataFiles;
   +import org.apache.iceberg.FileFormat;
   +import org.apache.iceberg.PartitionSpec;
   +import org.apache.iceberg.Table;
   +import org.apache.iceberg.hadoop.HadoopTables;
   +import org.apache.iceberg.io.FileAppender;
   +import org.apache.iceberg.parquet.Parquet;
   +import org.apache.iceberg.spark.data.RandomData;
   +import org.apache.iceberg.types.Types;
   +import org.apache.spark.sql.Dataset;
   +import org.apache.spark.sql.Row;
   +import org.junit.jupiter.api.Test;
   +import org.junit.jupiter.api.io.TempDir;
   +
    public class TestParquetVectorizedScan extends TestParquetScan {
   +  
   +  private static final Configuration CONF = new Configuration();
   +  
   +  @TempDir private Path temp;
   +
      @Override
      protected boolean vectorized() {
        return true;
      }
   +
   +  /**
   +   * Test to verify that direct memory used during vectorized parquet 
reading is properly released.
   +   * This creates a large (128MB) parquet file, reads it using the 
vectorized reader, collects all
   +   * results, and verifies that direct memory is released after the read 
operation completes.
   +   */
   +  @Test
   +  public void testDirectMemoryReleaseAfterLargeVectorizedRead() throws 
IOException {
   +    // Create a schema with enough columns to generate significant data
   +    org.apache.iceberg.Schema schema =
   +        new org.apache.iceberg.Schema(
   +            Types.NestedField.required(1, "id", Types.LongType.get()),
   +            Types.NestedField.required(2, "data1", Types.StringType.get()),
   +            Types.NestedField.required(3, "data2", Types.StringType.get()),
   +            Types.NestedField.required(4, "data3", Types.StringType.get()),
   +            Types.NestedField.required(5, "data4", Types.StringType.get()),
   +            Types.NestedField.required(6, "data5", Types.StringType.get()),
   +            Types.NestedField.required(7, "number1", 
Types.DoubleType.get()),
   +            Types.NestedField.required(8, "number2", 
Types.DoubleType.get()));
   +
   +    File location = temp.resolve("memory_leak_test").toFile();
   +
   +    HadoopTables tables = new HadoopTables(CONF);
   +    Table table = tables.create(schema, PartitionSpec.unpartitioned(), 
location.toString());
   +    configureTable(table);
   +
   +    List<GenericData.Record> records = RandomData.generateList(schema, 
1000000, 42L);
   +
   +    // Write the large parquet file
   +    File dataFolder = new File(table.location(), "data");
   +    File parquetFile = new File(dataFolder, 
FileFormat.PARQUET.addExtension(UUID.randomUUID().toString()));
   +
   +    try (FileAppender<GenericData.Record> writer =
   +        Parquet.write(localOutput(parquetFile)).schema(schema).build()) {
   +      writer.addAll(records);
   +    }
   +
   +    // Verify the file is actually large enough (~128MB)
   +    long fileSizeBytes = parquetFile.length();
   +    assertThat(fileSizeBytes)
   +        .as("Generated file should be at least 50MB")
   +        .isGreaterThan(50L * 1024 * 1024);
   +
   +    DataFile file =
   +        DataFiles.builder(PartitionSpec.unpartitioned())
   +            .withFileSizeInBytes(fileSizeBytes)
   +            .withPath(parquetFile.toString())
   +            .withRecordCount(records.size())
   +            .build();
   +
   +    table.newAppend().appendFile(file).commit();
   +
   +    // Get direct memory usage before reading
   +    long directMemoryBefore = getDirectMemoryUsed();
   +
   +    // Read the file using vectorized parquet reader and collect all results
   +    Dataset<Row> df = spark.read().format("iceberg").load(table.location());
   +    List<Row> rows = df.collectAsList();
   +
   +    // Get direct memory usage after reading but before cleanup
   +    long directMemoryAfterRead = getDirectMemoryUsed();
   +
   +    // Verify we read the expected number of rows
   +    assertThat(rows).as("Should contain all 
records").hasSize(records.size());
   +
   +    // Clear the collected data to release references
   +    rows = null;
   +    df = null;
   +
   +    // Force garbage collection to ensure any memory that should be 
released is released
   +    System.gc();
   +    System.gc();
   +
   +    // Wait a bit for GC to complete
   +    try {
   +      Thread.sleep(1000);
   +    } catch (InterruptedException e) {
   +      Thread.currentThread().interrupt();
   +    }
   +
   +    // Get direct memory usage after cleanup
   +    long directMemoryAfterCleanup = getDirectMemoryUsed();
   +
   +    // Calculate memory increases
   +    long memoryIncreaseFromRead = directMemoryAfterRead - 
directMemoryBefore;
   +    long memoryLeakAfterCleanup = directMemoryAfterCleanup - 
directMemoryBefore;
   +
   +    // Log memory usage for debugging
   +    System.out.printf(
   +        "Direct memory usage - Before: %d bytes, After read: %d bytes, 
After cleanup: %d bytes%n",
   +        directMemoryBefore, directMemoryAfterRead, 
directMemoryAfterCleanup);
   +    System.out.printf(
   +        "Memory increase from read: %d bytes, Potential leak: %d bytes%n",
   +        memoryIncreaseFromRead, memoryLeakAfterCleanup);
   +
   +    // We expect some memory to be used during reading (this verifies the 
test is actually testing something meaningful)
   +    assertThat(memoryIncreaseFromRead)
   +        .as("Reading a large file should use some direct memory")
   +        .isGreaterThan(0);
   +
   +    // The key assertion: after cleanup, direct memory usage should return 
close to the initial level
   +    // We allow for some small variance (1MB) due to JVM internals and 
other concurrent operations
   +    long allowableMemoryVariance = 1024 * 1024; // 1MB
   +    assertThat(memoryLeakAfterCleanup)
   +        .as("Direct memory should be released after reading (potential 
memory leak detected)")
   +        .isLessThanOrEqualTo(allowableMemoryVariance);
   +  }
   +
   +  /**
   +   * Gets the current direct memory usage by summing up all direct buffer 
pools.
   +   */
   +  private long getDirectMemoryUsed() {
   +    List<BufferPoolMXBean> bufferPoolMXBeans =
   +        ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
   +    long directMemory = 0;
   +    for (BufferPoolMXBean bufferPoolMXBean : bufferPoolMXBeans) {
   +      if (bufferPoolMXBean.getName().equals("direct")) {
   +        directMemory += bufferPoolMXBean.getMemoryUsed();
   +      }
   +    }
   +    return directMemory;
   +  }
    }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to