[
https://issues.apache.org/jira/browse/HADOOP-19901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083041#comment-18083041
]
Ismaël Mejía commented on HADOOP-19901:
---------------------------------------
[[email protected]] this may interest you
> ChecksumFileSystem.readVectored leaks buffers allocated through caller's
> IntFunction allocator
> ----------------------------------------------------------------------------------------------
>
> Key: HADOOP-19901
> URL: https://issues.apache.org/jira/browse/HADOOP-19901
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs
> Affects Versions: 3.5.0, 3.4.3
> Reporter: Ismaël Mejía
> Priority: Major
>
> h3. Summary
> When {{ChecksumFileSystem.readVectored()}} is called with checksum
> verification enabled (the default for {{LocalFileSystem}}), it allocates
> buffers for *both* file data ranges and checksum ranges through the
> caller-provided {{IntFunction<ByteBuffer> allocate}} function. However, the
> checksum buffers are only used temporarily for verification and are never
> released back to the caller. The caller has no reference to these buffers and
> no mechanism to release them.
> This was discovered in Apache Parquet Java while upgrading from Hadoop 3.3.0
> to 3.4.3 and testing with {{TrackingByteBufferAllocator}}, which detected
> leaked {{ByteBuffer}} allocations.
> h3. Root cause
> In {{ChecksumFSInputChecker.readVectored()}} ([ChecksumFileSystem.java,
> trunk|https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]):
> {code:java}
> @Override
> public void readVectored(final List<? extends FileRange> ranges,
> final IntFunction<ByteBuffer> allocate,
> final Consumer<ByteBuffer> release) throws IOException {
> // ...
> sums.readVectored(checksumRanges, allocate, release); // allocates
> checksum buffers via caller's allocator
> datas.readVectored(dataRanges, allocate, release); // allocates data
> buffers via caller's allocator
> for (CombinedFileRange checksumRange : checksumRanges) {
> for (FileRange dataRange : checksumRange.getUnderlying()) {
> CompletableFuture<ByteBuffer> result =
> checksumRange.getData().thenCombineAsync(dataRange.getData(),
> (sumBuffer, dataBuffer) ->
> checkBytes(sumBuffer, checksumRange.getOffset(),
> dataBuffer, dataRange.getOffset(), bytesPerSum, file));
> for (FileRange original : ((CombinedFileRange)
> dataRange).getUnderlying()) {
> original.setData(result.thenApply(
> (b) -> VectoredReadUtils.sliceTo(b, dataRange.getOffset(),
> original)));
> }
> }
> }
> }
> {code}
> Two problems:
> # *Checksum buffers are never released.* {{sums.readVectored(checksumRanges,
> allocate, release)}} allocates buffers through the caller's {{allocate}}
> function to read checksum data. After {{checkBytes()}} verifies the data, the
> checksum buffers ({{sumBuffer}}) are no longer needed, but they are never
> passed to {{release}} and are invisible to the caller. They leak.
> # *The 2-arg API provides no release mechanism.* The 2-arg overload passes a
> no-op release:
> {code:java}
> public void readVectored(List<? extends FileRange> ranges,
> IntFunction<ByteBuffer> allocate) throws
> IOException {
> readVectored(ranges, allocate, (b) -> { });
> }
> {code}
> Even callers using the 3-arg API don't benefit, because
> {{ChecksumFileSystem}} itself never calls {{release}} on the checksum buffers
> -- it only passes {{release}} down to the underlying streams.
> h3. How this was discovered
> Apache Parquet Java uses a {{TrackingByteBufferAllocator}} in tests that
> wraps the real allocator and tracks all allocations. When the allocator is
> closed, it throws {{LeakedByteBufferException}} if any allocated buffers were
> not released. After upgrading Hadoop from 3.3.0 to 3.4.3, the following test
> classes started failing with buffer leak errors in the vectored I/O path:
> * {{TestRecordLevelFilters}} (15 tests)
> * {{TestColumnIndexFiltering}} (24 tests)
> * {{TestParquetReader}} (6+ tests)
> The allocation stacktrace showed:
> {code}
> TrackingByteBufferAllocator.allocate
> -> VectorIOBufferPool.getBuffer
> -> RawLocalFileSystem$AsyncHandler.initiateRead
> {code}
> Parquet's {{readVectored()}} method passes a {{ByteBufferAllocator}} to
> Hadoop, but Hadoop uses it for internal temporary allocations (checksum
> ranges) that are invisible to the caller.
> h3. Workaround in Parquet
> We implemented a "capturing allocator" pattern that wraps the allocator to
> track all buffers allocated during {{readVectored()}}, then registers them
> all for release:
> {code:java}
> List<ByteBuffer> allocatedBuffers = new ArrayList<>();
> ByteBufferAllocator capturingAllocator = new ByteBufferAllocator() {
> @Override
> public ByteBuffer allocate(int size) {
> ByteBuffer buf = options.getAllocator().allocate(size);
> allocatedBuffers.add(buf);
> return buf;
> }
> // ...
> };
> try {
> f.readVectored(ranges, capturingAllocator);
> // ... process futures ...
> } finally {
> builder.addBuffersToRelease(allocatedBuffers);
> }
> {code}
> This ensures all buffers allocated through the caller's allocator are
> eventually released, regardless of whether they are returned in a future or
> used internally by ChecksumFileSystem. See [parquet-java commit
> fc0586d68|https://github.com/apache/parquet-java/commit/fc0586d68].
> h3. Suggested fixes
> *Option A (minimal): Release checksum buffers after verification.*
> In {{ChecksumFSInputChecker.readVectored()}}, after {{checkBytes()}}
> completes, call {{release}} on the checksum buffer:
> {code:java}
> CompletableFuture<ByteBuffer> result =
> checksumRange.getData().thenCombineAsync(dataRange.getData(),
> (sumBuffer, dataBuffer) -> {
> ByteBuffer verified = checkBytes(sumBuffer,
> checksumRange.getOffset(),
> dataBuffer, dataRange.getOffset(), bytesPerSum, file);
> release.accept(sumBuffer); // release checksum buffer after
> verification
> return verified;
> });
> {code}
> *Option B (comprehensive): Don't use the caller's allocator for internal
> temporaries.*
> ChecksumFileSystem should allocate its own temporary buffers for checksum
> data instead of using the caller-provided allocator. The caller's allocator
> is intended for buffers that the caller will own and manage. Using it for
> internal temporaries violates that expectation.
> {code:java}
> // Use internal allocation for checksums, not the caller's allocator
> sums.readVectored(checksumRanges, ByteBuffer::allocate, (b) -> { });
> // Only use caller's allocator for data ranges
> datas.readVectored(dataRanges, allocate, release);
> {code}
> *Option C (API improvement): Extend the API to support paired
> allocate/release.*
> The current {{IntFunction<ByteBuffer>}} allocator is one-way -- there's no
> way for Hadoop to release a buffer it allocated through the caller's
> function. HADOOP-19303 added a {{Consumer<ByteBuffer> release}} parameter,
> but it's separate from the allocate function and {{ChecksumFileSystem}}
> doesn't use it for its own intermediate buffers. A paired allocator/releaser
> interface (similar to Parquet's {{ByteBufferAllocator}} with both
> {{allocate}} and {{release}} methods) would make the lifecycle explicit.
> h3. Related issues
> * *HADOOP-19303* (VectorIO API to support releasing buffers on failure) --
> Added the 3-arg {{readVectored}} with {{release}} Consumer, but
> {{ChecksumFileSystem}} doesn't call {{release}} on checksum buffers.
> * *HADOOP-18296* (Memory fragmentation in ChecksumFileSystem Vectored IO) --
> Fixed range merging fragmentation, but did not address checksum buffer leaks.
> * *PARQUET-2171* (Implement vectored IO in parquet file format) -- The
> Parquet side implementation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]