pranavsaxena-microsoft commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1019839338
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -454,31 +588,65 @@ ReadBuffer getNextBlockToRead() throws
InterruptedException {
*/
void doneReading(final ReadBuffer buffer, final ReadBufferStatus result,
final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed read file {} for offset {}
outcome {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), result,
bytesActuallyRead);
- }
- synchronized (this) {
- // If this buffer has already been purged during
- // close of InputStream then we don't update the lists.
- if (inProgressList.contains(buffer)) {
- inProgressList.remove(buffer);
+ LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {};
{}",
+ buffer.getStream().getPath(), buffer.getOffset(),
bytesActuallyRead, buffer);
+ }
+ // decrement counter.
+ buffer.prefetchFinished();
+
+ try {
+ synchronized (this) {
+ // remove from the list
+ if (!inProgressList.remove(buffer)) {
+ // this is a sign of inconsistent state, so a major problem
+ String message =
+ String.format("Read completed from an operation not declared as
in progress %s",
+ buffer);
+ LOGGER.warn(message);
+ // release the buffer (which may raise an exception)
+ placeBufferOnFreeList("read not in progress", buffer);
+ // report the failure
+ throw new IllegalStateException(message);
+ }
+
+ boolean shouldFreeBuffer = false;
+ String freeBufferReason = "";
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setLength(bytesActuallyRead);
} else {
- freeList.push(buffer.getBufferindex());
- // buffer will be deleted as per the eviction policy.
+ // read failed or there was no data, -the buffer can be returned to
the free list.
+ shouldFreeBuffer = true;
+ freeBufferReason = "failed read";
}
// completed list also contains FAILED read buffers
// for sending exception message to clients.
buffer.setStatus(result);
buffer.setTimeStamp(currentTimeMillis());
- completedReadList.add(buffer);
+ if (!buffer.isStreamClosed()) {
+ // completed reads are added to the list.
+ LOGGER.trace("Adding buffer to completed list {}", buffer);
+ completedReadList.add(buffer);
Review Comment:
Lets not add buffer in completedList in the cases where we are going to add
in freeList(due to byteRead == 0).
- Got exception:
```
2022-11-10 20:48:22,516 TRACE [ABFS-prefetch-7]: services.ReadBufferManager
(ReadBufferManager.java:doneReading(591)) - ReadBufferWorker completed file
/testfilefb393e327a88 for offset 4194304 bytes 0;
org.apache.hadoop.fs.azurebfs.services.ReadBuffer@6777a743{
status=READING_IN_PROGRESS, offset=4194304, length=0, requestedLength=4194304,
bufferindex=0, timeStamp=0, isFirstByteConsumed=false,
isLastByteConsumed=false, isAnyByteConsumed=false, errException=null,
stream=9d85521627a8, stream closed=false}
2022-11-10 20:48:22,516 TRACE [ABFS-prefetch-7]: services.ReadBufferManager
(ReadBufferManager.java:doneReading(633)) - Adding buffer to completed list
org.apache.hadoop.fs.azurebfs.services.ReadBuffer@6777a743{ status=AVAILABLE,
offset=4194304, length=0, requestedLength=4194304, bufferindex=0,
timeStamp=86052487, isFirstByteConsumed=false, isLastByteConsumed=false,
isAnyByteConsumed=false, errException=null, stream=9d85521627a8, stream
closed=false}
2022-11-10 20:48:22,516 DEBUG [ABFS-prefetch-7]: services.ReadBufferManager
(ReadBufferManager.java:placeBufferOnFreeList(407)) - Returning buffer index 0
to free list for 'failed read'; owner
org.apache.hadoop.fs.azurebfs.services.ReadBuffer@6777a743{ status=AVAILABLE,
offset=4194304, length=0, requestedLength=4194304, bufferindex=0,
timeStamp=86052487, isFirstByteConsumed=false, isLastByteConsumed=false,
isAnyByteConsumed=false, errException=null, stream=9d85521627a8, stream
closed=false}
2022-11-10 20:48:22,517 TRACE [ABFS-prefetch-7]: services.ReadBufferWorker
(ReadBufferWorker.java:run(95)) - Exception received:
org.apache.hadoop.fs.PathIOException: `/testfilefb393e327a88': Input/output
error: Buffer index 0 found in buffer collection completedReadList
at
org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker.run(ReadBufferWorker.java:93)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalStateException: Buffer index 0 found in buffer
collection completedReadList
at
org.apache.hadoop.util.Preconditions.checkState(Preconditions.java:298)
at
org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.verifyByteBufferNotInCollection(ReadBufferManager.java:471)
at
org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.verifyByteBufferNotInUse(ReadBufferManager.java:457)
at
org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.placeBufferOnFreeList(ReadBufferManager.java:413)
at
org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.doneReading(ReadBufferManager.java:646)
at
org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker.run(ReadBufferWorker.java:87)
... 1 more
```
Reason:
https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L629
->
https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L641
->
https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L413
->
https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L457
-> exception.
- Double addition in freeList:
-- Let in doneReading, prefetch-thread reaches
https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L415
and somehow contextSwitch happens and this thread doesn't get CPU for some
time. Meanwhile, the buffer added in completedList gets picked up for eviction
and its run and added in freeList. Now, the prefetchThread gets CPU again and
runs
https://github.com/steveloughran/hadoop/blob/1ee18eeb4922d18168bd1fc8ec4a5c75610447cc/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L415-L422
and adds in freeList.
-- Wrote an experiment for the same:
https://github.com/pranavsaxena-microsoft/hadoop/commit/7b6ac1558fa12c46217a296f197bf51a8b86c10b
```
2022-11-10 22:22:57,147 TRACE [Thread-28]: services.ReadBufferManager
(ReadBufferManager.java:lambda$init$0(147)) - INCONSISTENCY!! on index 8
Exception in thread "Thread-16" java.lang.AssertionError: At index4194304
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.assertTrue(Assert.java:42)
at org.junit.Assert.assertFalse(Assert.java:65)
at
org.apache.hadoop.fs.azurebfs.ITestPartialRead.lambda$purgeIssue$0(ITestPartialRead.java:154)
at java.lang.Thread.run(Thread.java:750)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]