[
https://issues.apache.org/jira/browse/HADOOP-18296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17966764#comment-17966764
]
ASF GitHub Bot commented on HADOOP-18296:
-----------------------------------------
YanivKunda commented on code in PR #7732:
URL: https://github.com/apache/hadoop/pull/7732#discussion_r2143605258
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.io.ByteBufferPool;
+
+/**
+ * A wrapper {@link ByteBufferPool} implementation that tracks whether all
allocated buffers
+ * are released.
+ * <p>
+ * It throws the related exception at {@link #close()} if any buffer remains
un-released.
+ * It also clears the buffers at release so if they continued being used it'll
generate errors.
+ * <p>
+ * To be used for testing only.
+ * <p>
+ * The stacktraces of the allocation are not stored by default because
+ * it can significantly decreases the unit test performance.
+ * Configuring this class to log at DEBUG will trigger their collection.
+ * @see ByteBufferAllocationStacktraceException
+ * <p>
+ * Adapted from Parquet class {@code
org.apache.parquet.bytes.TrackingByteBufferAllocator}.
+ */
+public final class TrackingByteBufferPool implements ByteBufferPool,
AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TrackingByteBufferPool.class);
+
+ /**
+ * Wrap an existing allocator with this tracking allocator.
+ * @param allocator allocator to wrap.
+ * @return a new allocator.
+ */
+ public static TrackingByteBufferPool wrap(ByteBufferPool allocator) {
+ return new TrackingByteBufferPool(allocator);
+ }
+
+ /**
+ * Key for the tracker map.
+ * This uses the identity hash code of the buffer as the hash code
+ * for the map.
+ */
+ private static class Key {
+
+ private final int hashCode;
+
+ private final ByteBuffer buffer;
+
+ Key(ByteBuffer buffer) {
+ hashCode = System.identityHashCode(buffer);
+ this.buffer = buffer;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Key key = (Key) o;
+ return this.buffer == key.buffer;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public String toString() {
+ return buffer.toString();
+ }
+ }
+
+ public static class LeakDetectorHeapByteBufferPoolException
+ extends RuntimeException {
+
+ private LeakDetectorHeapByteBufferPoolException(String msg) {
+ super(msg);
+ }
+
+ private LeakDetectorHeapByteBufferPoolException(String msg, Throwable
cause) {
+ super(msg, cause);
+ }
+
+ private LeakDetectorHeapByteBufferPoolException(
+ String message,
+ Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+ }
+
+ /**
+ * Strack trace of allocation as saved in the tracking map.
+ */
+ public static final class ByteBufferAllocationStacktraceException
+ extends LeakDetectorHeapByteBufferPoolException {
+
+ /**
+ * Single stack trace instance to use when DEBUG is not enabled.
+ */
+ private static final ByteBufferAllocationStacktraceException
WITHOUT_STACKTRACE =
+ new ByteBufferAllocationStacktraceException(false);
+
+ /**
+ * Create a stack trace for the map, either using the shared static one
+ * or a dynamically created one.
+ * @return a stack
+ */
+ private static ByteBufferAllocationStacktraceException create() {
+ return LOG.isDebugEnabled()
+ ? new ByteBufferAllocationStacktraceException()
+ : WITHOUT_STACKTRACE;
+ }
+
+ private ByteBufferAllocationStacktraceException() {
+ super("Allocation stacktrace of the first ByteBuffer:");
+ }
+
+ /**
+ * Private constructor to for the singleton {@link #WITHOUT_STACKTRACE},
+ * telling develoers how to see a trace per buffer.
+ */
+ private ByteBufferAllocationStacktraceException(boolean unused) {
+ super("Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for
stack traces",
+ null,
+ false,
+ false);
+ }
+ }
+
+ /**
+ * Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)}
if the
+ * buffer to release was not in the hash map.
+ */
+ public static final class ReleasingUnallocatedByteBufferException
+ extends LeakDetectorHeapByteBufferPoolException {
+
+ private ReleasingUnallocatedByteBufferException(final ByteBuffer b) {
+ super(String.format("Releasing a ByteBuffer instance that is not
allocated"
+ + " by this buffer pool or already been released: %s size %d", b,
b.capacity()));
+ }
+ }
+
+ /**
+ * Exception raised in {@link TrackingByteBufferPool#close()} if there
+ * was an unreleased buffer.
+ */
+ public static final class LeakedByteBufferException
+ extends LeakDetectorHeapByteBufferPoolException {
+
+ private final int count;
+
+ private LeakedByteBufferException(int count,
ByteBufferAllocationStacktraceException e) {
+ super(count + " ByteBuffer object(s) is/are remained unreleased"
+ + " after closing this buffer pool.", e);
+ this.count = count;
+ }
+
+ /**
+ * Get the number of unreleased buffers.
+ * @return number of unreleased buffers
+ */
+ public int getCount() {
+ return count;
+ }
+ }
+
+ /**
+ * Tracker of allocations.
+ * <p>
+ * The key maps by the object id of the buffer, and refers to either a
common stack trace
+ * or one dynamically created for each allocation.
+ */
+ private final Map<Key, ByteBufferAllocationStacktraceException> allocated =
Review Comment:
I think this can be replaced with `java.util.IdentityHashMap`, which will
make the `Key` class redundant:
```suggestion
private final Map<ByteBuffer, ByteBufferAllocationStacktraceException>
allocated = new IdentityHashMap();
```
> Memory fragmentation in ChecksumFileSystem Vectored IO implementation.
> ----------------------------------------------------------------------
>
> Key: HADOOP-18296
> URL: https://issues.apache.org/jira/browse/HADOOP-18296
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: common
> Affects Versions: 3.4.0
> Reporter: Mukund Thakur
> Assignee: Steve Loughran
> Priority: Minor
> Labels: fs, pull-request-available
>
> As we have implemented merging of ranges in the ChecksumFSInputChecker
> implementation of vectored IO api, it can lead to memory fragmentation. Let
> me explain by example.
>
> Suppose client requests for 3 ranges.
> 0-500, 700-1000 and 1200-1500.
> Now because of merging, all the above ranges will get merged into one and we
> will allocate a big byte buffer of 0-1500 size but return sliced byte buffers
> for the desired ranges.
> Now once the client is done reading all the ranges, it will only be able to
> free the memory for requested ranges and memory of the gaps will never be
> released for eg here (500-700 and 1000-1200).
>
> Note this only happens for direct byte buffers.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]