[ 
https://issues.apache.org/jira/browse/HADOOP-19901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HADOOP-19901:
------------------------------------
    Labels: pull-request-available  (was: )

> ChecksumFileSystem.readVectored leaks buffers allocated through caller's 
> IntFunction allocator
> ----------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-19901
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19901
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs
>    Affects Versions: 3.5.0, 3.4.3
>            Reporter: Ismaël Mejía
>            Priority: Major
>              Labels: pull-request-available
>
> h3. Summary
> When {{ChecksumFileSystem.readVectored()}} is called with checksum 
> verification enabled (the default for {{LocalFileSystem}}), it allocates 
> buffers for *both* file data ranges and checksum ranges through the 
> caller-provided {{IntFunction<ByteBuffer> allocate}} function. However, the 
> checksum buffers are only used temporarily for verification and are never 
> released back to the caller. The caller has no reference to these buffers and 
> no mechanism to release them.
> This was discovered in Apache Parquet Java while upgrading from Hadoop 3.3.0 
> to 3.4.3 and testing with {{TrackingByteBufferAllocator}}, which detected 
> leaked {{ByteBuffer}} allocations.
> h3. Root cause
> In {{ChecksumFSInputChecker.readVectored()}} ([ChecksumFileSystem.java, 
> trunk|https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]):
> {code:java}
> @Override
> public void readVectored(final List<? extends FileRange> ranges,
>     final IntFunction<ByteBuffer> allocate,
>     final Consumer<ByteBuffer> release) throws IOException {
>   // ...
>   sums.readVectored(checksumRanges, allocate, release);   // allocates 
> checksum buffers via caller's allocator
>   datas.readVectored(dataRanges, allocate, release);       // allocates data 
> buffers via caller's allocator
>   for (CombinedFileRange checksumRange : checksumRanges) {
>     for (FileRange dataRange : checksumRange.getUnderlying()) {
>       CompletableFuture<ByteBuffer> result =
>           checksumRange.getData().thenCombineAsync(dataRange.getData(),
>               (sumBuffer, dataBuffer) ->
>                   checkBytes(sumBuffer, checksumRange.getOffset(),
>                       dataBuffer, dataRange.getOffset(), bytesPerSum, file));
>       for (FileRange original : ((CombinedFileRange) 
> dataRange).getUnderlying()) {
>         original.setData(result.thenApply(
>             (b) -> VectoredReadUtils.sliceTo(b, dataRange.getOffset(), 
> original)));
>       }
>     }
>   }
> }
> {code}
> Two problems:
> # *Checksum buffers are never released.* {{sums.readVectored(checksumRanges, 
> allocate, release)}} allocates buffers through the caller's {{allocate}} 
> function to read checksum data. After {{checkBytes()}} verifies the data, the 
> checksum buffers ({{sumBuffer}}) are no longer needed, but they are never 
> passed to {{release}} and are invisible to the caller. They leak.
> # *The 2-arg API provides no release mechanism.* The 2-arg overload passes a 
> no-op release:
>    {code:java}
>    public void readVectored(List<? extends FileRange> ranges,
>                             IntFunction<ByteBuffer> allocate) throws 
> IOException {
>        readVectored(ranges, allocate, (b) -> { });
>    }
>    {code}
>    Even callers using the 3-arg API don't benefit, because 
> {{ChecksumFileSystem}} itself never calls {{release}} on the checksum buffers 
> -- it only passes {{release}} down to the underlying streams.
> h3. How this was discovered
> Apache Parquet Java uses a {{TrackingByteBufferAllocator}} in tests that 
> wraps the real allocator and tracks all allocations. When the allocator is 
> closed, it throws {{LeakedByteBufferException}} if any allocated buffers were 
> not released. After upgrading Hadoop from 3.3.0 to 3.4.3, the following test 
> classes started failing with buffer leak errors in the vectored I/O path:
> * {{TestRecordLevelFilters}} (15 tests)
> * {{TestColumnIndexFiltering}} (24 tests) 
> * {{TestParquetReader}} (6+ tests)
> The allocation stacktrace showed:
> {code}
> TrackingByteBufferAllocator.allocate
>   -> VectorIOBufferPool.getBuffer
>     -> RawLocalFileSystem$AsyncHandler.initiateRead
> {code}
> Parquet's {{readVectored()}} method passes a {{ByteBufferAllocator}} to 
> Hadoop, but Hadoop uses it for internal temporary allocations (checksum 
> ranges) that are invisible to the caller.
> h3. Workaround in Parquet
> We implemented a "capturing allocator" pattern that wraps the allocator to 
> track all buffers allocated during {{readVectored()}}, then registers them 
> all for release:
> {code:java}
> List<ByteBuffer> allocatedBuffers = new ArrayList<>();
> ByteBufferAllocator capturingAllocator = new ByteBufferAllocator() {
>     @Override
>     public ByteBuffer allocate(int size) {
>         ByteBuffer buf = options.getAllocator().allocate(size);
>         allocatedBuffers.add(buf);
>         return buf;
>     }
>     // ...
> };
> try {
>     f.readVectored(ranges, capturingAllocator);
>     // ... process futures ...
> } finally {
>     builder.addBuffersToRelease(allocatedBuffers);
> }
> {code}
> This ensures all buffers allocated through the caller's allocator are 
> eventually released, regardless of whether they are returned in a future or 
> used internally by ChecksumFileSystem. See [parquet-java commit 
> fc0586d68|https://github.com/apache/parquet-java/commit/fc0586d68].
> h3. Suggested fixes
> *Option A (minimal): Release checksum buffers after verification.*
> In {{ChecksumFSInputChecker.readVectored()}}, after {{checkBytes()}} 
> completes, call {{release}} on the checksum buffer:
> {code:java}
> CompletableFuture<ByteBuffer> result =
>     checksumRange.getData().thenCombineAsync(dataRange.getData(),
>         (sumBuffer, dataBuffer) -> {
>             ByteBuffer verified = checkBytes(sumBuffer, 
> checksumRange.getOffset(),
>                 dataBuffer, dataRange.getOffset(), bytesPerSum, file);
>             release.accept(sumBuffer);  // release checksum buffer after 
> verification
>             return verified;
>         });
> {code}
> *Option B (comprehensive): Don't use the caller's allocator for internal 
> temporaries.*
> ChecksumFileSystem should allocate its own temporary buffers for checksum 
> data instead of using the caller-provided allocator. The caller's allocator 
> is intended for buffers that the caller will own and manage. Using it for 
> internal temporaries violates that expectation.
> {code:java}
> // Use internal allocation for checksums, not the caller's allocator
> sums.readVectored(checksumRanges, ByteBuffer::allocate, (b) -> { });
> // Only use caller's allocator for data ranges
> datas.readVectored(dataRanges, allocate, release);
> {code}
> *Option C (API improvement): Extend the API to support paired 
> allocate/release.*
> The current {{IntFunction<ByteBuffer>}} allocator is one-way -- there's no 
> way for Hadoop to release a buffer it allocated through the caller's 
> function. HADOOP-19303 added a {{Consumer<ByteBuffer> release}} parameter, 
> but it's separate from the allocate function and {{ChecksumFileSystem}} 
> doesn't use it for its own intermediate buffers. A paired allocator/releaser 
> interface (similar to Parquet's {{ByteBufferAllocator}} with both 
> {{allocate}} and {{release}} methods) would make the lifecycle explicit.
> h3. Related issues
> * *HADOOP-19303* (VectorIO API to support releasing buffers on failure) -- 
> Added the 3-arg {{readVectored}} with {{release}} Consumer, but 
> {{ChecksumFileSystem}} doesn't call {{release}} on checksum buffers.
> * *HADOOP-18296* (Memory fragmentation in ChecksumFileSystem Vectored IO) -- 
> Fixed range merging fragmentation, but did not address checksum buffer leaks.
> * *PARQUET-2171* (Implement vectored IO in parquet file format) -- The 
> Parquet side implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to