[
https://issues.apache.org/jira/browse/HADOOP-18521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17639972#comment-17639972
]
ASF GitHub Bot commented on HADOOP-18521:
-----------------------------------------
anmolanmol1234 commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1033420242
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -555,7 +579,7 @@ int readRemote(long position, byte[] b, int offset, int
length, TracingContext t
throw new FileNotFoundException(ere.getMessage());
}
}
- throw new IOException(ex);
+ throw ex;
Review Comment:
Any specific reason for changing the exception type from IOException to
AzureBlobFileSystemException ?
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -851,4 +880,67 @@ private void resetReadBufferManager(int bufferSize, int
threshold) {
// by successive tests can lead to OOM based on the dev VM/machine
capacity.
System.gc();
}
+
+ /**
+ * The first readahead closes the stream.
+ */
+ @Test
+ public void testStreamCloseInFirstReadAhead() throws Exception {
+ describe("close a stream during prefetch, verify outcome is good");
+
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ AbfsInputStream inputStream = getAbfsInputStream(client, getMethodName());
+ ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+
+ final long initialInProgressBlocksDiscarded =
bufferManager.getInProgressBlocksDiscarded();
+
+ // on first read, the op succeeds but the stream is closed, which
+ // means that the request should be considered a failure
+ doAnswer(invocation -> {
+ LOG.info("in read call with {}", inputStream);
+ inputStream.close();
+ return successOp;
+ }).doReturn(successOp)
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class), any(TracingContext.class));
+
+ // kick these off before the read() to avoid various race conditions.
+ queueReadAheads(inputStream);
+
+ // AbfsInputStream Read would have waited for the read-ahead for the
requested offset
+ // as we are testing from ReadAheadManager directly, sleep for a sec to
+ // get the read ahead threads to complete
+ waitForPrefetchCompletion();
+
+ // this triggers prefetching, which closes the stream while the read
+ // is queued. which causes the prefetch to not return.
+ // which triggers a blocking read, which will then fail.
+ intercept(PathIOException.class, FSExceptionMessages.STREAM_IS_CLOSED, ()
-> {
+ // should fail
+ int bytesRead = inputStream.read(new byte[ONE_KB]);
+ // diagnostics info if failure wasn't raised
+ return "read " + bytesRead + " bytes from " + inputStream;
+ });
+
+ Assertions.assertThat(bufferManager.getCompletedReadListCopy())
+ .filteredOn(rb -> rb.getStream() == inputStream)
+ .describedAs("list of completed reads")
+ .isEmpty();
+ IOStatisticsStore ios = inputStream.getIOStatistics();
+ assertThatStatisticCounter(ios, STREAM_READ_PREFETCH_BLOCKS_DISCARDED)
+ .describedAs("blocks discarded by %s", inputStream)
+ .isGreaterThan(0);
+
+ // at least one of the blocks was discarded in progress.
+ // this is guaranteed because the mockito callback must have been invoked
+ // by the prefetcher
+ Assertions.assertThat(bufferManager.getInProgressBlocksDiscarded())
+ .describedAs("in progress blocks discarded")
+ .isGreaterThan(initialInProgressBlocksDiscarded);
+ }
+
}
Review Comment:
nit: Add line at the end of the file.
##########
hadoop-tools/hadoop-azure/src/test/resources/log4j.properties:
##########
@@ -26,6 +26,8 @@
log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG
log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG
log4j.logger.org.apache.hadoop.fs.azurebfs.contracts.services.TracingService=TRACE
log4j.logger.org.apache.hadoop.fs.azurebfs.services.AbfsClient=DEBUG
+log4j.logger.org.apache.hadoop.fs.azurebfs.services.ReadBufferManager=TRACE
Review Comment:
Was this added for testing as this might add a lot of logging ?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java:
##########
@@ -162,4 +182,120 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed)
{
this.isAnyByteConsumed = isAnyByteConsumed;
}
+ @Override
+ public String toString() {
+ return super.toString() +
+ "{ status=" + status +
+ ", offset=" + offset +
+ ", length=" + length +
+ ", requestedLength=" + requestedLength +
+ ", bufferindex=" + bufferindex +
+ ", timeStamp=" + timeStamp +
+ ", isFirstByteConsumed=" + isFirstByteConsumed +
+ ", isLastByteConsumed=" + isLastByteConsumed +
+ ", isAnyByteConsumed=" + isAnyByteConsumed +
+ ", errException=" + errException +
+ ", stream=" + (stream != null ? stream.getStreamID() : "none") +
+ ", stream closed=" + isStreamClosed() +
+ ", latch=" + latch +
+ '}';
+ }
+
+ /**
+ * Is the stream closed.
+ * @return stream closed status.
+ */
+ public boolean isStreamClosed() {
+ return stream != null && stream.isClosed();
+ }
+
+ /**
+ * IOStatistics of stream.
+ * @return the stream's IOStatisticsStore.
+ */
+ public IOStatisticsStore getStreamIOStatistics() {
+ return stream.getIOStatistics();
+ }
+
+ /**
+ * Start using the buffer.
+ * Sets the byte consumption flags as appriopriate, then
+ * updates the stream statistics with the use of this buffer.
+ * @param offset offset in buffer where copy began
+ * @param bytesCopied bytes copied.
+ */
+ void dataConsumedByStream(int offset, int bytesCopied) {
+ boolean isFirstUse = !isAnyByteConsumed;
+ setAnyByteConsumed(true);
+ if (offset == 0) {
+ setFirstByteConsumed(true);
+ }
+ if (offset + bytesCopied == getLength()) {
+ setLastByteConsumed(true);
+ }
+ IOStatisticsStore iostats = getStreamIOStatistics();
+ if (isFirstUse) {
+ // first use, update the use
+ iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_USED, 1);
+ }
+ // every use, update the count of bytes read
+ iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_USED, bytesCopied);
+ }
+
+ /**
+ * The (completed) buffer was evicted; update stream statistics
+ * as appropriate.
+ */
+ void evicted() {
+ IOStatisticsStore iostats = getStreamIOStatistics();
+ iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_EVICTED, 1);
+ if (!isAnyByteConsumed()) {
+ // nothing was read, so consider it discarded.
+ iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1);
+ iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED,
getLength());
+ }
+ }
+
+ /**
+ * The (completed) buffer was discarded; no data was read.
+ */
+ void discarded() {
+ if (getBufferindex() >= 0) {
+ IOStatisticsStore iostats = getStreamIOStatistics();
+ iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1);
+ iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED,
getLength());
+ }
+ }
+
+ /**
+ * Release the buffer: update fields as appropriate.
+ */
+ void releaseBuffer() {
+ setBuffer(null);
+ setBufferindex(-1);
+ }
+
+
Review Comment:
nit: Extra line
> ABFS ReadBufferManager buffer sharing across concurrent HTTP requests
> ---------------------------------------------------------------------
>
> Key: HADOOP-18521
> URL: https://issues.apache.org/jira/browse/HADOOP-18521
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs/azure
> Affects Versions: 3.3.2, 3.3.3, 3.3.4
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Critical
> Labels: pull-request-available
>
> AbfsInputStream.close() can trigger the return of buffers used for active
> prefetch GET requests into the ReadBufferManager free buffer pool.
> A subsequent prefetch by a different stream in the same process may acquire
> this same buffer. This can lead to risk of corruption of its own prefetched
> data, data which may then be returned to that other thread.
> On releases without the fix for this (3.3.2+), the bug can be avoided by
> disabling all prefetching
> {code}
> fs.azure.readaheadqueue.depth = 0
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]