[
https://issues.apache.org/jira/browse/HADOOP-18521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630800#comment-17630800
]
ASF GitHub Bot commented on HADOOP-18521:
-----------------------------------------
pranavsaxena-microsoft commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1017458506
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -534,17 +676,31 @@ void callTryEvict() {
tryEvict();
}
-
/**
* Purging the buffers associated with an {@link AbfsInputStream}
* from {@link ReadBufferManager} when stream is closed.
+ * Before HADOOP-18521 this would purge in progress reads, which
+ * would return the active buffer to the free pool while it was
+ * still in use.
* @param stream input stream.
*/
public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
+
+ // remove from the queue
+ int before = readAheadQueue.size();
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
- purgeList(stream, completedReadList);
- purgeList(stream, inProgressList);
+ int readaheadPurged = readAheadQueue.size() - before;
Review Comment:
By the thread reaches this line, maybe some more blocks would be added in
readAheadQueue, this may bloat the metric. Also, before should >=
readAheadQueue.size() (in case no additional blocks are ahead), this would
result in negative addition.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -454,31 +588,39 @@ ReadBuffer getNextBlockToRead() throws
InterruptedException {
*/
void doneReading(final ReadBuffer buffer, final ReadBufferStatus result,
final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed read file {} for offset {}
outcome {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), result,
bytesActuallyRead);
- }
- synchronized (this) {
- // If this buffer has already been purged during
- // close of InputStream then we don't update the lists.
- if (inProgressList.contains(buffer)) {
- inProgressList.remove(buffer);
+ LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
+ buffer.getStream().getPath(), buffer.getOffset(),
bytesActuallyRead);
+ }
+ try {
+ synchronized (this) {
+ checkState(inProgressList.remove(buffer),
+ "Read completed from an operation not declared as in progress %s",
buffer);
+ // If this buffer has already been purged during
+ // close of InputStream then we don't update the lists.
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setLength(bytesActuallyRead);
} else {
- freeList.push(buffer.getBufferindex());
- // buffer will be deleted as per the eviction policy.
+ // there is no data, so it is immediately returned to the free list.
+ placeBufferOnFreeList("failed read", buffer);
Review Comment:
This may result in IllegalStateException propogating to AbfsInputStream.
This line will add the buffer into freeList, from which this index shall be
taken by readBuffer b1.
Now, after sometime, let this buffer from completedList needs to be evicted,
it would come to
https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L408,
two things can happen:
1. freeList still has this index: it will throw IllegalStateException
2. freeList doesn't have: it will throw IllegalStateException from
https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L411.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -454,31 +588,39 @@ ReadBuffer getNextBlockToRead() throws
InterruptedException {
*/
void doneReading(final ReadBuffer buffer, final ReadBufferStatus result,
final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed read file {} for offset {}
outcome {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), result,
bytesActuallyRead);
- }
- synchronized (this) {
- // If this buffer has already been purged during
- // close of InputStream then we don't update the lists.
- if (inProgressList.contains(buffer)) {
- inProgressList.remove(buffer);
+ LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
+ buffer.getStream().getPath(), buffer.getOffset(),
bytesActuallyRead);
+ }
+ try {
+ synchronized (this) {
+ checkState(inProgressList.remove(buffer),
+ "Read completed from an operation not declared as in progress %s",
buffer);
+ // If this buffer has already been purged during
+ // close of InputStream then we don't update the lists.
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setLength(bytesActuallyRead);
} else {
- freeList.push(buffer.getBufferindex());
- // buffer will be deleted as per the eviction policy.
+ // there is no data, so it is immediately returned to the free list.
+ placeBufferOnFreeList("failed read", buffer);
Review Comment:
Made a suggestive-change, which prevents this:
https://github.com/pranavsaxena-microsoft/hadoop/commit/0d09a0de501bdc928139263075f82feb064fd6bc
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -454,31 +588,39 @@ ReadBuffer getNextBlockToRead() throws
InterruptedException {
*/
void doneReading(final ReadBuffer buffer, final ReadBufferStatus result,
final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed read file {} for offset {}
outcome {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), result,
bytesActuallyRead);
- }
- synchronized (this) {
- // If this buffer has already been purged during
- // close of InputStream then we don't update the lists.
- if (inProgressList.contains(buffer)) {
- inProgressList.remove(buffer);
+ LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
+ buffer.getStream().getPath(), buffer.getOffset(),
bytesActuallyRead);
+ }
+ try {
+ synchronized (this) {
+ checkState(inProgressList.remove(buffer),
+ "Read completed from an operation not declared as in progress %s",
buffer);
+ // If this buffer has already been purged during
+ // close of InputStream then we don't update the lists.
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setLength(bytesActuallyRead);
} else {
- freeList.push(buffer.getBufferindex());
- // buffer will be deleted as per the eviction policy.
+ // there is no data, so it is immediately returned to the free list.
+ placeBufferOnFreeList("failed read", buffer);
Review Comment:
Test for the same:
https://github.com/pranavsaxena-microsoft/hadoop/commit/18da3752f3f72a953cecba0525a01bfab6be89ee.
In seperate run:
```
java.lang.IllegalStateException: Buffer 14 returned to free buffer list by
non-owner ReadBuffer{status=AVAILABLE, offset=4194304, length=0,
requestedLength=4194304, bufferindex=14, timeStamp=46807492,
isFirstByteConsumed=false, isLastByteConsumed=false, isAnyByteConsumed=false,
errException=org.apache.hadoop.fs.PathIOException: `/testfilef6b6f93ac245':
Input/output error: Buffer index 14 found in buffer collection
completedReadList,
stream=org.apache.hadoop.fs.azurebfs.services.AbfsInputStream@652e2419{counters=((stream_read_bytes_backwards_on_seek=0)
(stream_read_operations=1) (remote_read_op=2)
(stream_read_seek_backward_operations=0) (action_http_get_request.failures=0)
(action_http_get_request=0) (bytes_read_buffer=0) (stream_read_bytes=0)
(seek_in_buffer=0) (remote_bytes_read=0) (stream_read_seek_bytes_skipped=0)
(stream_read_seek_operations=2) (read_ahead_bytes_read=0)
(stream_read_seek_forward_operations=2));
gauges=();
minimums=((action_http_get_request.failures.min=-1)
(action_http_get_request.min=-1));
maximums=((action_http_get_request.max=-1)
(action_http_get_request.failures.max=-1));
means=((action_http_get_request.failures.mean=(samples=0, sum=0,
mean=0.0000)) (action_http_get_request.mean=(samples=0, sum=0, mean=0.0000)));
}AbfsInputStream@(1697522713){StreamStatistics{counters=((remote_bytes_read=0)
(stream_read_seek_backward_operations=0) (remote_read_op=2)
(stream_read_seek_forward_operations=2) (bytes_read_buffer=0)
(seek_in_buffer=0) (stream_read_bytes=0) (stream_read_operations=1)
(read_ahead_bytes_read=0) (stream_read_bytes_backwards_on_seek=0)
(stream_read_seek_operations=2) (action_http_get_request.failures=0)
(stream_read_seek_bytes_skipped=0) (action_http_get_request=0));
gauges=();
minimums=((action_http_get_request.min=-1)
(action_http_get_request.failures.min=-1));
maximums=((action_http_get_request.failures.max=-1)
(action_http_get_request.max=-1));
means=((action_http_get_request.mean=(samples=0, sum=0, mean=0.0000))
(action_http_get_request.failures.mean=(samples=0, sum=0, mean=0.0000)));
}}}
at
org.apache.hadoop.util.Preconditions.checkState(Preconditions.java:298)
at
org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.verifyReadOwnsBufferAtIndex(ReadBufferManager.java:430)
at
org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.placeBufferOnFreeList(ReadBufferManager.java:411)
```
> ABFS ReadBufferManager buffer sharing across concurrent HTTP requests
> ---------------------------------------------------------------------
>
> Key: HADOOP-18521
> URL: https://issues.apache.org/jira/browse/HADOOP-18521
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs/azure
> Affects Versions: 3.3.2, 3.3.3, 3.3.4
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Critical
> Labels: pull-request-available
>
> AbfsInputStream.close() can trigger the return of buffers used for active
> prefetch GET requests into the ReadBufferManager free buffer pool.
> A subsequent prefetch by a different stream in the same process may acquire
> this same buffer. This can lead to risk of corruption of its own prefetched
> data, data which may then be returned to that other thread.
> On releases without the fix for this (3.3.2 to 3.3.4), the bug can be avoided
> by disabling all prefetching
> {code}
> fs.azure.readaheadqueue.depth
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]