This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 58d1b6595ff HDDS-13974. Use the same gRPC steam for reading the same 
block. (#9369) Contributed by Stephen O'Donnell
58d1b6595ff is described below

commit 58d1b6595ff1bab9bb49d7bba9acabf83fd576c7
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Nov 25 17:35:04 2025 -0800

    HDDS-13974. Use the same gRPC steam for reading the same block. (#9369) 
Contributed by Stephen O'Donnell
---
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  | 179 ++---
 .../hdds/scm/storage/BlockExtendedInputStream.java |  97 +++
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |  93 +--
 .../hdds/scm/storage/StreamBlockInputStream.java   | 792 +++++++--------------
 .../client/io/BlockInputStreamFactoryImpl.java     |   8 +-
 .../scm/storage/DummyStreamBlockInputStream.java   | 143 ----
 .../hdds/scm/storage/TestBlockInputStream.java     |   4 +-
 .../scm/storage/TestStreamBlockInputStream.java    | 460 +++++++-----
 .../hadoop/hdds/scm/StreamingReadResponse.java     |  46 ++
 .../hadoop/hdds/scm/StreamingReaderSpi.java}       |  23 +-
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   |   9 +
 .../ContainerCommandResponseBuilders.java          |  20 +-
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  37 +-
 .../container/common/impl/HddsDispatcher.java      |  16 +-
 .../common/interfaces/ContainerDispatcher.java     |   3 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  | 136 ++--
 .../container/keyvalue/TestKeyValueHandler.java    |  93 ---
 .../src/main/proto/DatanodeClientProtocol.proto    |   6 +-
 .../rpc/read/TestStreamBlockInputStream.java       | 244 ++-----
 19 files changed, 939 insertions(+), 1470 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 09e01593feb..b6a3d00f010 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -44,8 +44,6 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -65,6 +63,7 @@
 import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.slf4j.Logger;
@@ -386,25 +385,9 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(
         });
   }
 
-  private XceiverClientReply sendCommandWithRetry(
-      ContainerCommandRequestProto request, List<Validator> validators)
-      throws IOException {
-    ContainerCommandResponseProto responseProto = null;
-    IOException ioException = null;
-
-    // In case of an exception or an error, we will try to read from the
-    // datanodes in the pipeline in a round-robin fashion.
-    XceiverClientReply reply = new XceiverClientReply(null);
+  private List<DatanodeDetails> sortDatanodes(ContainerCommandRequestProto 
request) throws IOException {
     List<DatanodeDetails> datanodeList = null;
-
-    DatanodeBlockID blockID = null;
-    if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
-      blockID = request.getGetBlock().getBlockID();
-    } else if  (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
-      blockID = request.getReadChunk().getBlockID();
-    } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
-      blockID = request.getGetSmallFile().getBlock().getBlockID();
-    }
+    DatanodeBlockID blockID = getRequestBlockID(request);
 
     if (blockID != null) {
       if (request.getCmdType() != ContainerProtos.Type.ReadChunk) {
@@ -442,6 +425,33 @@ private XceiverClientReply sendCommandWithRetry(
     if (!allInService) {
       datanodeList = sortDatanodeByOperationalState(datanodeList);
     }
+    return datanodeList;
+  }
+
+  private static DatanodeBlockID 
getRequestBlockID(ContainerCommandRequestProto request) {
+    DatanodeBlockID blockID = null;
+    if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
+      blockID = request.getGetBlock().getBlockID();
+    } else if  (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
+      blockID = request.getReadChunk().getBlockID();
+    } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
+      blockID = request.getGetSmallFile().getBlock().getBlockID();
+    } else if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
+      blockID = request.getReadBlock().getBlockID();
+    }
+    return blockID;
+  }
+
+  private XceiverClientReply sendCommandWithRetry(
+      ContainerCommandRequestProto request, List<Validator> validators)
+      throws IOException {
+    ContainerCommandResponseProto responseProto = null;
+    IOException ioException = null;
+
+    // In case of an exception or an error, we will try to read from the
+    // datanodes in the pipeline in a round-robin fashion.
+    XceiverClientReply reply = new XceiverClientReply(null);
+    List<DatanodeDetails> datanodeList = sortDatanodes(request);
 
     for (DatanodeDetails dn : datanodeList) {
       try {
@@ -453,11 +463,7 @@ private XceiverClientReply sendCommandWithRetry(
         // sendCommandAsyncCall will create a new channel and async stub
         // in case these don't exist for the specific datanode.
         reply.addDatanode(dn);
-        if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
-          responseProto = sendCommandReadBlock(request, 
dn).getResponse().get();
-        } else {
-          responseProto = sendCommandAsync(request, dn).getResponse().get();
-        }
+        responseProto = sendCommandAsync(request, dn).getResponse().get();
         if (validators != null && !validators.isEmpty()) {
           for (Validator validator : validators) {
             validator.accept(request, responseProto);
@@ -510,6 +516,66 @@ private XceiverClientReply sendCommandWithRetry(
     }
   }
 
+  /**
+   * Starts a streaming read operation, intended to read entire blocks from 
the datanodes. This method expects a
+   * {@link StreamingReaderSpi} to be passed in, which will be used to receive 
the streamed data from the datanode.
+   * Upon successfully starting the streaming read, a {@link 
StreamingReadResponse} is set into the pass StreamObserver,
+   * which contains information about the datanode used for the read, and the 
request observer that can be used to
+   * manage the stream (e.g., to cancel it if needed). A semaphore is acquired 
to limit the number of concurrent
+   * streaming reads so upon successful return of this method, the caller must 
ensure to call
+   * {@link #completeStreamRead(StreamingReadResponse)} to release the 
semaphore once the streaming read is complete.
+   * @param request The container command request to initiate the streaming 
read.
+   * @param streamObserver The observer that will handle the streamed 
responses.=
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public void streamRead(ContainerCommandRequestProto request,
+      StreamingReaderSpi streamObserver) throws IOException, 
InterruptedException {
+    List<DatanodeDetails> datanodeList = sortDatanodes(request);
+    IOException lastException = null;
+    for (DatanodeDetails dn : datanodeList) {
+      try {
+        checkOpen(dn);
+        semaphore.acquire();
+        XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
+        if (stub == null) {
+          throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Executing command {} on datanode {}", 
processForDebug(request), dn);
+        }
+        StreamObserver<ContainerCommandRequestProto> requestObserver = stub
+            .withDeadlineAfter(timeout, TimeUnit.SECONDS)
+            .send(streamObserver);
+        streamObserver.setStreamingReadResponse(new StreamingReadResponse(dn,
+            (ClientCallStreamObserver<ContainerCommandRequestProto>) 
requestObserver));
+        requestObserver.onNext(request);
+        requestObserver.onCompleted();
+        return;
+      } catch (IOException e) {
+        LOG.error("Failed to start streaming read to DataNode {}", dn, e);
+        semaphore.release();
+        lastException = e;
+      }
+    }
+    if (lastException != null) {
+      throw lastException;
+    } else {
+      throw new IOException("Failed to start streaming read to any available 
DataNodes");
+    }
+  }
+
+  /**
+   * This method should be called to indicate the end of streaming read. Its 
primary purpose is to release the
+   * semaphore acquired when starting the streaming read, but is also used to 
update any metrics or debug logs as
+   * needed.
+   */
+  @Override
+  public void completeStreamRead(StreamingReadResponse streamingReadResponse) {
+    semaphore.release();
+  }
+
   private static List<DatanodeDetails> sortDatanodeByOperationalState(
       List<DatanodeDetails> datanodeList) {
     List<DatanodeDetails> sortedDatanodeList = new ArrayList<>(datanodeList);
@@ -629,69 +695,6 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
     return new XceiverClientReply(replyFuture);
   }
 
-  public XceiverClientReply sendCommandReadBlock(
-      ContainerCommandRequestProto request, DatanodeDetails dn)
-      throws IOException, InterruptedException {
-
-    CompletableFuture<ContainerCommandResponseProto> future =
-        new CompletableFuture<>();
-    ContainerCommandResponseProto.Builder response =
-        ContainerCommandResponseProto.newBuilder();
-    ContainerProtos.ReadBlockResponseProto.Builder readBlock =
-        ContainerProtos.ReadBlockResponseProto.newBuilder();
-    checkOpen(dn);
-    DatanodeID dnId = dn.getID();
-    Type cmdType = request.getCmdType();
-    semaphore.acquire();
-    long requestTime = System.currentTimeMillis();
-    metrics.incrPendingContainerOpsMetrics(cmdType);
-
-    final StreamObserver<ContainerCommandRequestProto> requestObserver =
-        asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS)
-            .send(new StreamObserver<ContainerCommandResponseProto>() {
-              @Override
-              public void onNext(
-                  ContainerCommandResponseProto responseProto) {
-                if (responseProto.getResult() == Result.SUCCESS) {
-                  readBlock.addReadChunk(responseProto.getReadChunk());
-                } else {
-                  future.complete(
-                      ContainerCommandResponseProto.newBuilder(responseProto)
-                          .setCmdType(Type.ReadBlock).build());
-                }
-              }
-
-              @Override
-              public void onError(Throwable t) {
-                future.completeExceptionally(t);
-                metrics.decrPendingContainerOpsMetrics(cmdType);
-                metrics.addContainerOpsLatency(
-                    cmdType, Time.monotonicNow() - requestTime);
-                semaphore.release();
-              }
-
-              @Override
-              public void onCompleted() {
-                if (readBlock.getReadChunkCount() > 0) {
-                  future.complete(response.setReadBlock(readBlock)
-                      
.setCmdType(Type.ReadBlock).setResult(Result.SUCCESS).build());
-                }
-                if (!future.isDone()) {
-                  future.completeExceptionally(new IOException(
-                      "Stream completed but no reply for request " +
-                          processForDebug(request)));
-                }
-                metrics.decrPendingContainerOpsMetrics(cmdType);
-                metrics.addContainerOpsLatency(
-                    cmdType, System.currentTimeMillis() - requestTime);
-                semaphore.release();
-              }
-            });
-    requestObserver.onNext(request);
-    requestObserver.onCompleted();
-    return new XceiverClientReply(future);
-  }
-
   private synchronized void checkOpen(DatanodeDetails dn)
       throws IOException {
     if (closed) {
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
index 6753f600a91..dc47a9edaca 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
@@ -17,7 +17,20 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import java.io.IOException;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.thirdparty.io.grpc.Status;
 
 /**
  * Abstract class used as an interface for input streams related to Ozone
@@ -26,6 +39,8 @@
 public abstract class BlockExtendedInputStream extends ExtendedInputStream
     implements PartInputStream {
 
+  private static final org.slf4j.Logger LOG = 
org.slf4j.LoggerFactory.getLogger(BlockExtendedInputStream.class);
+
   public abstract BlockID getBlockID();
 
   @Override
@@ -38,4 +53,86 @@ public long getRemaining() {
 
   @Override
   public abstract long getPos();
+
+  protected Pipeline setPipeline(Pipeline pipeline) throws IOException {
+    if (pipeline == null) {
+      return null;
+    }
+    long replicaIndexes = 
pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();
+
+    if (replicaIndexes > 1) {
+      throw new IOException(String.format("Pipeline: %s has nodes containing 
different replica indexes.",
+          pipeline));
+    }
+
+    // irrespective of the container state, we will always read via Standalone 
protocol.
+    boolean okForRead = pipeline.getType() == 
HddsProtos.ReplicationType.STAND_ALONE
+            || pipeline.getType() == HddsProtos.ReplicationType.EC;
+    return okForRead ? pipeline : pipeline.copyForRead();
+  }
+
+  protected boolean shouldRetryRead(IOException cause, RetryPolicy 
retryPolicy, int retries) throws IOException {
+    RetryPolicy.RetryAction retryAction;
+    try {
+      retryAction = retryPolicy.shouldRetry(cause, retries, 0, true);
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+      if (retryAction.delayMillis > 0) {
+        try {
+          LOG.debug("Retry read after {}ms", retryAction.delayMillis);
+          Thread.sleep(retryAction.delayMillis);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          String msg = "Interrupted: action=" + retryAction.action + ", retry 
policy=" + retryPolicy;
+          throw new IOException(msg, e);
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  protected RetryPolicy getReadRetryPolicy(OzoneClientConfig config) {
+    return HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
+        TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
+  }
+
+  protected void refreshBlockInfo(IOException cause, BlockID blockID, 
AtomicReference<Pipeline> pipelineRef,
+      AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef, 
Function<BlockID, BlockLocationInfo> refreshFunction)
+      throws IOException {
+    LOG.info("Attempting to update pipeline and block token for block {} from 
pipeline {}: {}",
+        blockID, pipelineRef.get().getId(), cause.getMessage());
+    if (refreshFunction != null) {
+      LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
+      BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
+      if (blockLocationInfo == null) {
+        LOG.warn("No new block location info for block {}", blockID);
+      } else {
+        pipelineRef.set(setPipeline(blockLocationInfo.getPipeline()));
+        LOG.info("New pipeline for block {}: {}", blockID, 
blockLocationInfo.getPipeline());
+
+        tokenRef.set(blockLocationInfo.getToken());
+        if (blockLocationInfo.getToken() != null) {
+          OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
+          tokenId.readFromByteArray(tokenRef.get().getIdentifier());
+          LOG.info("A new token is added for block {}. Expiry: {}", blockID,
+              Instant.ofEpochMilli(tokenId.getExpiryDate()));
+        }
+      }
+    } else {
+      throw cause;
+    }
+  }
+
+  /**
+   * Check if this exception is because datanodes are not reachable.
+   */
+  protected boolean isConnectivityIssue(IOException ex) {
+    return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
+  }
+
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index c9731bcc461..6f6b513422f 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -22,11 +22,9 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -35,19 +33,16 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.security.token.Token;
-import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,14 +118,12 @@ public BlockInputStream(
     this.blockInfo = blockInfo;
     this.blockID = blockInfo.getBlockID();
     this.length = blockInfo.getLength();
-    setPipeline(pipeline);
+    pipelineRef.set(setPipeline(pipeline));
     tokenRef.set(token);
     this.verifyChecksum = config.isChecksumVerify();
     this.xceiverClientFactory = xceiverClientFactory;
     this.refreshFunction = refreshFunction;
-    this.retryPolicy =
-        HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
-            TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
+    this.retryPolicy = getReadRetryPolicy(config);
   }
 
   // only for unit tests
@@ -182,7 +175,7 @@ public synchronized void initialize() throws IOException {
         }
         catchEx = ex;
       }
-    } while (shouldRetryRead(catchEx));
+    } while (shouldRetryRead(catchEx, retryPolicy, ++retries));
 
     if (chunks == null) {
       throw catchEx;
@@ -215,37 +208,8 @@ public synchronized void initialize() throws IOException {
     }
   }
 
-  /**
-   * Check if this exception is because datanodes are not reachable.
-   */
-  private boolean isConnectivityIssue(IOException ex) {
-    return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
-  }
-
   private void refreshBlockInfo(IOException cause) throws IOException {
-    LOG.info("Attempting to update pipeline and block token for block {} from 
pipeline {}: {}",
-        blockID, pipelineRef.get().getId(), cause.getMessage());
-    if (refreshFunction != null) {
-      LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
-      BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
-      if (blockLocationInfo == null) {
-        LOG.warn("No new block location info for block {}", blockID);
-      } else {
-        setPipeline(blockLocationInfo.getPipeline());
-        LOG.info("New pipeline for block {}: {}", blockID,
-            blockLocationInfo.getPipeline());
-
-        tokenRef.set(blockLocationInfo.getToken());
-        if (blockLocationInfo.getToken() != null) {
-          OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
-          tokenId.readFromByteArray(tokenRef.get().getIdentifier());
-          LOG.info("A new token is added for block {}. Expiry: {}",
-              blockID, Instant.ofEpochMilli(tokenId.getExpiryDate()));
-        }
-      }
-    } else {
-      throw cause;
-    }
+    refreshBlockInfo(cause, blockID, pipelineRef, tokenRef, refreshFunction);
   }
 
   /**
@@ -277,26 +241,6 @@ protected BlockData getBlockDataUsingClient() throws 
IOException {
     return response.getBlockData();
   }
 
-  private void setPipeline(Pipeline pipeline) throws IOException {
-    if (pipeline == null) {
-      return;
-    }
-    long replicaIndexes = 
pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();
-
-    if (replicaIndexes > 1) {
-      throw new IOException(String.format("Pipeline: %s has nodes containing 
different replica indexes.",
-          pipeline));
-    }
-
-    // irrespective of the container state, we will always read via Standalone
-    // protocol.
-    boolean okForRead =
-        pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
-            || pipeline.getType() == HddsProtos.ReplicationType.EC;
-    Pipeline readPipeline = okForRead ? pipeline : pipeline.copyForRead();
-    pipelineRef.set(readPipeline);
-  }
-
   private static void validate(ContainerCommandResponseProto response)
       throws IOException {
     if (!response.hasGetBlock()) {
@@ -382,14 +326,14 @@ protected synchronized int 
readWithStrategy(ByteReaderStrategy strategy)
       } catch (SCMSecurityException ex) {
         throw ex;
       } catch (StorageContainerException e) {
-        if (shouldRetryRead(e)) {
+        if (shouldRetryRead(e, retryPolicy, ++retries)) {
           handleReadError(e);
           continue;
         } else {
           throw e;
         }
       } catch (IOException ex) {
-        if (shouldRetryRead(ex)) {
+        if (shouldRetryRead(ex, retryPolicy, ++retries)) {
           if (isConnectivityIssue(ex)) {
             handleReadError(ex);
           } else {
@@ -573,31 +517,6 @@ private synchronized void storePosition() {
     blockPosition = getPos();
   }
 
-  private boolean shouldRetryRead(IOException cause) throws IOException {
-    RetryPolicy.RetryAction retryAction;
-    try {
-      retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true);
-    } catch (IOException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-    if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
-      if (retryAction.delayMillis > 0) {
-        try {
-          LOG.debug("Retry read after {}ms", retryAction.delayMillis);
-          Thread.sleep(retryAction.delayMillis);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          String msg = "Interrupted: action=" + retryAction.action + ", retry 
policy=" + retryPolicy;
-          throw new IOException(msg, e);
-        }
-      }
-      return true;
-    }
-    return false;
-  }
-
   private void handleReadError(IOException cause) throws IOException {
     releaseClient();
     final List<ChunkInputStream> inputStreams = this.chunkStreams;
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
index 3db7fd8f660..cb2f80ca7c8 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
@@ -17,46 +17,36 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
-import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamingReadResponse;
+import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
-import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.security.token.Token;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,31 +56,26 @@
  */
 public class StreamBlockInputStream extends BlockExtendedInputStream
     implements Seekable, CanUnbuffer, ByteBufferReadable {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(StreamBlockInputStream.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamBlockInputStream.class);
+  private static final int EOF = -1;
+  private static final Throwable CANCELLED_EXCEPTION = new 
Throwable("Cancelled by client");
+
   private final BlockID blockID;
   private final long blockLength;
-  private final AtomicReference<Pipeline> pipelineRef =
-      new AtomicReference<>();
-  private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
-      new AtomicReference<>();
+  private final AtomicReference<Pipeline> pipelineRef = new 
AtomicReference<>();
+  private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef = 
new AtomicReference<>();
   private XceiverClientFactory xceiverClientFactory;
   private XceiverClientSpi xceiverClient;
 
-  private List<Long> bufferOffsets;
-  private int bufferIndex;
-  private long blockPosition = -1;
-  private List<ByteBuffer> buffers;
-  // Checks if the StreamBlockInputStream has already read data from the 
container.
-  private boolean allocated = false;
-  private long bufferOffsetWrtBlockData;
-  private long buffersSize;
-  private static final int EOF = -1;
-  private final List<XceiverClientSpi.Validator> validators;
+  private ByteBuffer buffer;
+  private long position = 0;
+  private boolean initialized = false;
+  private StreamingReader streamingReader;
+
   private final boolean verifyChecksum;
   private final Function<BlockID, BlockLocationInfo> refreshFunction;
   private final RetryPolicy retryPolicy;
-  private int retries;
+  private int retries = 0;
 
   public StreamBlockInputStream(
       BlockID blockID, long length, Pipeline pipeline,
@@ -100,17 +85,12 @@ public StreamBlockInputStream(
       OzoneClientConfig config) throws IOException {
     this.blockID = blockID;
     this.blockLength = length;
-    setPipeline(pipeline);
+    pipelineRef.set(setPipeline(pipeline));
     tokenRef.set(token);
     this.xceiverClientFactory = xceiverClientFactory;
-    this.validators = ContainerProtocolCalls.toValidatorList(
-        (request, response) -> validateBlock(response));
     this.verifyChecksum = config.isChecksumVerify();
+    this.retryPolicy = getReadRetryPolicy(config);
     this.refreshFunction = refreshFunction;
-    this.retryPolicy =
-        HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
-            TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
-
   }
 
   @Override
@@ -125,170 +105,54 @@ public long getLength() {
 
   @Override
   public synchronized long getPos() {
-    if (blockLength == 0) {
-      return 0;
-    }
-    if (blockPosition >= 0) {
-      return blockPosition;
-    }
-
-    if (buffersHaveData()) {
-      // BufferOffset w.r.t to BlockData + BufferOffset w.r.t buffers +
-      // Position of current Buffer
-      return bufferOffsetWrtBlockData + bufferOffsets.get(bufferIndex) +
-          buffers.get(bufferIndex).position();
-    }
-    if (allocated && !dataRemainingInBlock()) {
-      Preconditions.checkState(
-          bufferOffsetWrtBlockData + buffersSize == blockLength,
-          "EOF detected but not at the last byte of the chunk");
-      return blockLength;
-    }
-    if (buffersAllocated()) {
-      return bufferOffsetWrtBlockData + buffersSize;
-    }
-    return 0;
+    return position;
   }
 
   @Override
   public synchronized int read() throws IOException {
-    int dataout = EOF;
-    int len = 1;
-    int available;
-    while (len > 0) {
-      try {
-        acquireClient();
-        available = prepareRead(1);
-        retries = 0;
-      } catch (SCMSecurityException ex) {
-        throw ex;
-      } catch (StorageContainerException e) {
-        handleStorageContainerException(e);
-        continue;
-      } catch (IOException ioe) {
-        handleIOException(ioe);
-        continue;
-      }
-      if (available == EOF) {
-        // There is no more data in the chunk stream. The buffers should have
-        // been released by now
-        Preconditions.checkState(buffers == null);
-      } else {
-        dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
-      }
-
-      len -= available;
-      if (bufferEOF()) {
-        releaseBuffers(bufferIndex);
-      }
+    checkOpen();
+    if (!dataAvailableToRead()) {
+      return EOF;
     }
-
-
-    return dataout;
-
-
+    position++;
+    return buffer.get();
   }
 
   @Override
   public synchronized int read(byte[] b, int off, int len) throws IOException {
-    // According to the JavaDocs for InputStream, it is recommended that
-    // subclasses provide an override of bulk read if possible for performance
-    // reasons.  In addition to performance, we need to do it for correctness
-    // reasons.  The Ozone REST service uses PipedInputStream and
-    // PipedOutputStream to relay HTTP response data between a Jersey thread 
and
-    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
-    // have a subtle dependency (bug?) on the wrapped stream providing separate
-    // implementations of single-byte read and bulk read.  Without this, get 
key
-    // responses might close the connection before writing all of the bytes
-    // advertised in the Content-Length.
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return 0;
-    }
-    int total = 0;
-    int available;
-    while (len > 0) {
-      try {
-        acquireClient();
-        available = prepareRead(len);
-        retries = 0;
-      } catch (SCMSecurityException ex) {
-        throw ex;
-      } catch (StorageContainerException e) {
-        handleStorageContainerException(e);
-        continue;
-      } catch (IOException ioe) {
-        handleIOException(ioe);
-        continue;
-      }
-      if (available == EOF) {
-        // There is no more data in the block stream. The buffers should have
-        // been released by now
-        Preconditions.checkState(buffers == null);
-        return total != 0 ? total : EOF;
-      }
-      buffers.get(bufferIndex).get(b, off + total, available);
-      len -= available;
-      total += available;
+    ByteBuffer tmpBuffer = ByteBuffer.wrap(b, off, len);
+    return read(tmpBuffer);
+  }
 
-      if (bufferEOF()) {
-        releaseBuffers(bufferIndex);
+  @Override
+  public synchronized int read(ByteBuffer targetBuf) throws IOException {
+    checkOpen();
+    int read = 0;
+    while (targetBuf.hasRemaining()) {
+      if (!dataAvailableToRead()) {
+        break;
       }
+      int toCopy = Math.min(buffer.remaining(), targetBuf.remaining());
+      ByteBuffer tmpBuf = buffer.duplicate();
+      tmpBuf.limit(tmpBuf.position() + toCopy);
+      targetBuf.put(tmpBuf);
+      buffer.position(tmpBuf.position());
+      position += toCopy;
+      read += toCopy;
     }
-    return total;
-
+    return read > 0 ? read : EOF;
   }
 
-  @Override
-  public synchronized int read(ByteBuffer byteBuffer) throws IOException {
-    if (byteBuffer == null) {
-      throw new NullPointerException();
-    }
-    int len = byteBuffer.remaining();
-    if (len == 0) {
-      return 0;
+  private boolean dataAvailableToRead() throws IOException {
+    if (position >= blockLength) {
+      return false;
     }
-    int total = 0;
-    int available;
-    while (len > 0) {
-      try {
-        acquireClient();
-        available = prepareRead(len);
-        retries = 0;
-      } catch (SCMSecurityException ex) {
-        throw ex;
-      } catch (StorageContainerException e) {
-        handleStorageContainerException(e);
-        continue;
-      } catch (IOException ioe) {
-        handleIOException(ioe);
-        continue;
-      }
-      if (available == EOF) {
-        // There is no more data in the block stream. The buffers should have
-        // been released by now
-        Preconditions.checkState(buffers == null);
-        return total != 0 ? total : EOF;
-      }
-      ByteBuffer readBuf = buffers.get(bufferIndex);
-      ByteBuffer tmpBuf = readBuf.duplicate();
-      tmpBuf.limit(tmpBuf.position() + available);
-      byteBuffer.put(tmpBuf);
-      readBuf.position(tmpBuf.position());
-
-      len -= available;
-      total += available;
-
-      if (bufferEOF()) {
-        releaseBuffers(bufferIndex);
-      }
+    initialize();
+    if (buffer == null || buffer.remaining() == 0) {
+      int loaded = fillBuffer();
+      return loaded != EOF;
     }
-    return total;
+    return true;
   }
 
   @Override
@@ -298,62 +162,44 @@ protected int readWithStrategy(ByteReaderStrategy 
strategy) throws IOException {
 
   @Override
   public synchronized void seek(long pos) throws IOException {
-    if (pos == 0 && blockLength == 0) {
-      // It is possible for length and pos to be zero in which case
-      // seek should return instead of throwing exception
-      return;
+    checkOpen();
+    if (pos < 0) {
+      throw new IOException("Cannot seek to negative offset");
     }
-    if (pos < 0 || pos > blockLength) {
-      throw new EOFException("EOF encountered at pos: " + pos + " for block: " 
+ blockID);
+    if (pos > blockLength) {
+      throw new IOException("Cannot seek after the end of the block");
     }
-
-    if (buffersHavePosition(pos)) {
-      // The bufferPosition is w.r.t the current block.
-      // Adjust the bufferIndex and position to the seeked position.
-      adjustBufferPosition(pos - bufferOffsetWrtBlockData);
-    } else {
-      blockPosition = pos;
+    if (pos == position) {
+      return;
     }
+    closeStream();
+    position = pos;
   }
 
   @Override
+  // The seekable interface indicates that seekToNewSource should seek to a 
new source of the data,
+  // ie a different datanode. This is not supported for now.
   public synchronized boolean seekToNewSource(long l) throws IOException {
     return false;
   }
 
   @Override
   public synchronized void unbuffer() {
-    blockPosition = getPos();
     releaseClient();
-    releaseBuffers();
   }
 
-  private void setPipeline(Pipeline pipeline) throws IOException {
-    if (pipeline == null) {
-      return;
-    }
-    long replicaIndexes = 
pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();
-
-    if (replicaIndexes > 1) {
-      throw new IOException(String.format("Pipeline: %s has nodes containing 
different replica indexes.",
-          pipeline));
+  private void closeStream() {
+    if (streamingReader != null) {
+      streamingReader.cancel();
+      streamingReader = null;
     }
-
-    // irrespective of the container state, we will always read via Standalone
-    // protocol.
-    boolean okForRead =
-        pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
-            || pipeline.getType() == HddsProtos.ReplicationType.EC;
-    Pipeline readPipeline = okForRead ? pipeline : 
pipeline.copyForRead().toBuilder()
-        .setReplicationConfig(StandaloneReplicationConfig.getInstance(
-            getLegacyFactor(pipeline.getReplicationConfig())))
-        .build();
-    pipelineRef.set(readPipeline);
+    initialized = false;
+    buffer = null;
   }
 
   protected synchronized void checkOpen() throws IOException {
     if (xceiverClientFactory == null) {
-      throw new IOException("StreamBlockInputStream has been closed.");
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED + " Block: " 
+ blockID);
     }
   }
 
@@ -364,384 +210,234 @@ protected synchronized void acquireClient() throws 
IOException {
       try {
         xceiverClient = 
xceiverClientFactory.acquireClientForReadData(pipeline);
       } catch (IOException ioe) {
-        LOG.warn("Failed to acquire client for pipeline {}, block {}",
-            pipeline, blockID);
+        LOG.warn("Failed to acquire client for pipeline {}, block {}", 
pipeline, blockID);
         throw ioe;
       }
     }
   }
 
-  private synchronized int prepareRead(int len) throws IOException {
-    for (;;) {
-      if (blockPosition >= 0) {
-        if (buffersHavePosition(blockPosition)) {
-          // The current buffers have the seeked position. Adjust the buffer
-          // index and position to point to the buffer position.
-          adjustBufferPosition(blockPosition - bufferOffsetWrtBlockData);
-        } else {
-          // Read a required block data to fill the buffers with seeked
-          // position data
-          readDataFromContainer(len);
-        }
+  private void initialize() throws IOException {
+    if (initialized) {
+      return;
+    }
+    while (true) {
+      try {
+        acquireClient();
+        streamingReader = new StreamingReader();
+        ContainerProtocolCalls.readBlock(xceiverClient, position, blockID, 
tokenRef.get(),
+            pipelineRef.get().getReplicaIndexes(), streamingReader);
+        initialized = true;
+        return;
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        handleExceptions(new IOException("Interrupted", ie));
+      } catch (IOException ioe) {
+        handleExceptions(ioe);
       }
-      if (buffersHaveData()) {
-        // Data is available from buffers
-        ByteBuffer bb = buffers.get(bufferIndex);
-        return Math.min(len, bb.remaining());
-      } else if (dataRemainingInBlock()) {
-        // There is more data in the block stream which has not
-        // been read into the buffers yet.
-        readDataFromContainer(len);
+    }
+  }
+
+  private void handleExceptions(IOException cause) throws IOException {
+    if (cause instanceof StorageContainerException || 
isConnectivityIssue(cause)) {
+      if (shouldRetryRead(cause, retryPolicy, retries++)) {
+        releaseClient();
+        refreshBlockInfo(cause);
+        LOG.warn("Refreshing block data to read block {} due to {}", blockID, 
cause.getMessage());
       } else {
-        // All available input from this block stream has been consumed.
-        return EOF;
+        throw cause;
       }
+    } else {
+      throw cause;
     }
+  }
 
-
+  private int fillBuffer() throws IOException {
+    if (!streamingReader.hasNext()) {
+      return EOF;
+    }
+    buffer = streamingReader.readNext();
+    return buffer == null ? EOF : buffer.limit();
   }
 
-  private boolean buffersHavePosition(long pos) {
-    // Check if buffers have been allocated
-    if (buffersAllocated()) {
-      // Check if the current buffers cover the input position
-      // Released buffers should not be considered when checking if position
-      // is available
-      return pos >= bufferOffsetWrtBlockData +
-          bufferOffsets.get(0) &&
-          pos < bufferOffsetWrtBlockData + buffersSize;
+  protected synchronized void releaseClient() {
+    if (xceiverClientFactory != null && xceiverClient != null) {
+      closeStream();
+      xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
+      xceiverClient = null;
     }
-    return false;
   }
 
-  /**
-   * Check if the buffers have been allocated data and false otherwise.
-   */
-  @VisibleForTesting
-  protected boolean buffersAllocated() {
-    return buffers != null && !buffers.isEmpty();
+  @Override
+  public synchronized void close() throws IOException {
+    releaseClient();
+    xceiverClientFactory = null;
   }
 
-  /**
-   * Adjust the buffers position to account for seeked position and/ or 
checksum
-   * boundary reads.
-   * @param bufferPosition the position to which the buffers must be advanced
-   */
-  private void adjustBufferPosition(long bufferPosition) {
-    // The bufferPosition is w.r.t the current buffers.
-    // Adjust the bufferIndex and position to the seeked bufferPosition.
-    bufferIndex = Collections.binarySearch(bufferOffsets, bufferPosition);
-    // bufferIndex is negative if bufferPosition isn't found in bufferOffsets
-    // count (bufferIndex = -bufferIndex - 2) to get bufferPosition is between 
which offsets.
-    if (bufferIndex < 0) {
-      bufferIndex = -bufferIndex - 2;
-    }
+  private void refreshBlockInfo(IOException cause) throws IOException {
+    refreshBlockInfo(cause, blockID, pipelineRef, tokenRef, refreshFunction);
+  }
 
-    buffers.get(bufferIndex).position(
-        (int) (bufferPosition - bufferOffsets.get(bufferIndex)));
-
-    // Reset buffers > bufferIndex to position 0. We do this to reset any
-    // previous reads/ seeks which might have updated any buffer position.
-    // For buffers < bufferIndex, we do not need to reset the position as it
-    // not required for this read. If a seek was done to a position in the
-    // previous indices, the buffer position reset would be performed in the
-    // seek call.
-    for (int i = bufferIndex + 1; i < buffers.size(); i++) {
-      buffers.get(i).position(0);
+  private synchronized void releaseStreamResources(StreamingReadResponse 
response) {
+    if (xceiverClient != null) {
+      xceiverClient.completeStreamRead(response);
     }
-
-    // Reset the blockPosition as chunk stream has been initialized i.e. the
-    // buffers have been allocated.
-    blockPosition = -1;
   }
 
   /**
-   * Reads full or partial Chunk from DN Container based on the current
-   * position of the ChunkInputStream, the number of bytes of data to read
-   * and the checksum boundaries.
-   * If successful, then the read data in saved in the buffers so that
-   * subsequent read calls can utilize it.
-   * @param len number of bytes of data to be read
-   * @throws IOException if there is an I/O error while performing the call
-   * to Datanode
+   * Implementation of a StreamObserver used to received and buffer streaming 
GRPC reads.
    */
-  private synchronized void readDataFromContainer(int len) throws IOException {
-    // index of first byte to be read from the block
-    long startByteIndex;
-    if (blockPosition >= 0) {
-      // If seek operation was called to advance the buffer position, the
-      // chunk should be read from that position onwards.
-      startByteIndex = blockPosition;
-    } else {
-      // Start reading the block from the last blockPosition onwards.
-      startByteIndex = bufferOffsetWrtBlockData + buffersSize;
-    }
+  public class StreamingReader implements StreamingReaderSpi {
 
-    // bufferOffsetWrtChunkData and buffersSize are updated after the data
-    // is read from Container and put into the buffers, but if read fails
-    // and is retried, we need the previous position.  Position is reset after
-    // successful read in adjustBufferPosition()
-    blockPosition = getPos();
-    bufferOffsetWrtBlockData = readData(startByteIndex, len);
-    long tempOffset = 0L;
-    buffersSize = 0L;
-    bufferOffsets = new ArrayList<>(buffers.size());
-    for (ByteBuffer buffer : buffers) {
-      bufferOffsets.add(tempOffset);
-      tempOffset += buffer.limit();
-      buffersSize += buffer.limit();
+    private final BlockingQueue<ContainerProtos.ReadBlockResponseProto> 
responseQueue = new LinkedBlockingQueue<>(1);
+    private final AtomicBoolean completed = new AtomicBoolean(false);
+    private final AtomicBoolean failed = new AtomicBoolean(false);
+    private final AtomicBoolean semaphoreReleased = new AtomicBoolean(false);
+    private final AtomicReference<Throwable> error = new AtomicReference<>();
+    private volatile StreamingReadResponse response;
 
+    public boolean hasNext() {
+      return !responseQueue.isEmpty() || !completed.get();
     }
-    bufferIndex = 0;
-    allocated = true;
-    adjustBufferPosition(startByteIndex - bufferOffsetWrtBlockData);
 
-  }
+    public ByteBuffer readNext() throws IOException {
+      if (failed.get()) {
+        Throwable cause = error.get();
+        throw new IOException("Streaming read failed", cause);
+      }
 
-  @VisibleForTesting
-  protected long readData(long startByteIndex, long len)
-      throws IOException {
-    Pipeline pipeline = pipelineRef.get();
-    buffers = new ArrayList<>();
-    ReadBlockResponseProto response =
-        ContainerProtocolCalls.readBlock(xceiverClient, startByteIndex,
-        len, blockID, validators, tokenRef.get(), 
pipeline.getReplicaIndexes(), verifyChecksum);
-    List<ReadChunkResponseProto> readBlocks = response.getReadChunkList();
-
-    for (ReadChunkResponseProto readBlock : readBlocks) {
-      if (readBlock.hasDataBuffers()) {
-        buffers.addAll(BufferUtils.getReadOnlyByteBuffers(
-            readBlock.getDataBuffers().getBuffersList()));
-      } else {
-        throw new IOException("Unexpected error while reading chunk data " +
-            "from container. No data returned.");
+      if (completed.get() && responseQueue.isEmpty()) {
+        return null; // Stream ended
       }
-    }
-    return response.getReadChunk(0)
-        .getChunkData().getOffset();
-  }
 
-  /**
-   * Check if the buffers have any data remaining between the current
-   * position and the limit.
-   */
-  private boolean buffersHaveData() {
-    boolean hasData = false;
-    if (buffersAllocated()) {
-      int buffersLen = buffers.size();
-      while (bufferIndex < buffersLen) {
-        ByteBuffer buffer = buffers.get(bufferIndex);
-        if (buffer != null && buffer.hasRemaining()) {
-          // current buffer has data
-          hasData = true;
-          break;
+      ReadBlockResponseProto readBlock;
+      try {
+        readBlock = responseQueue.poll(30, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Interrupted while waiting for response", e);
+      }
+      if (readBlock == null) {
+        if (failed.get()) {
+          Throwable cause = error.get();
+          throw new IOException("Streaming read failed", cause);
+        } else if (completed.get()) {
+          return null; // Stream ended
         } else {
-          if (bufferIndex < buffersLen - 1) {
-            // move to next available buffer
-            ++bufferIndex;
-            Preconditions.checkState(bufferIndex < buffers.size());
-          } else {
-            // no more buffers remaining
-            break;
-          }
+          throw new IOException("Timed out waiting for response");
         }
       }
+      // The server always returns data starting from the last checksum 
boundary. Therefore if the reader position is
+      // ahead of the position we received from the server, we need to adjust 
the buffer position accordingly.
+      // If the reader position is behind
+      ByteBuffer buf = readBlock.getData().asReadOnlyByteBuffer();
+      long blockOffset = readBlock.getOffset();
+      long pos = getPos();
+      if (pos < blockOffset) {
+        // This should not happen, and if it does, we have a bug.
+        throw new IOException("Received data out of order. Position is " + pos 
+ " but received data at "
+            + blockOffset);
+      }
+      if (pos > readBlock.getOffset()) {
+        int offset = (int)(pos - readBlock.getOffset());
+        buf.position(offset);
+      }
+      return buf;
     }
 
-    return hasData;
-  }
-
-  /**
-   * Check if there is more data in the chunk which has not yet been read
-   * into the buffers.
-   */
-  private boolean dataRemainingInBlock() {
-    long bufferPos;
-    if (blockPosition >= 0) {
-      bufferPos = blockPosition;
-    } else {
-      bufferPos = bufferOffsetWrtBlockData + buffersSize;
+    private void releaseResources() {
+      boolean wasNotYetComplete = semaphoreReleased.getAndSet(true);
+      if (wasNotYetComplete) {
+        releaseStreamResources(response);
+      }
     }
 
-    return bufferPos < blockLength;
-  }
-
-  /**
-   * Check if current buffer had been read till the end.
-   */
-  private boolean bufferEOF() {
-    return allocated && buffersAllocated() && 
!buffers.get(bufferIndex).hasRemaining();
-  }
-
-  /**
-   * Release the buffers upto the given index.
-   * @param releaseUptoBufferIndex bufferIndex (inclusive) upto which the
-   *                               buffers must be released
-   */
-  private void releaseBuffers(int releaseUptoBufferIndex) {
-    int buffersLen = buffers.size();
-    if (releaseUptoBufferIndex == buffersLen - 1) {
-      // Before releasing all the buffers, if block EOF is not reached, then
-      // blockPosition should be set to point to the last position of the
-      // buffers. This should be done so that getPos() can return the current
-      // block position
-      blockPosition = bufferOffsetWrtBlockData +
-          bufferOffsets.get(releaseUptoBufferIndex) +
-          buffers.get(releaseUptoBufferIndex).capacity();
-      // Release all the buffers
-      releaseBuffers();
-    } else {
-      buffers = buffers.subList(releaseUptoBufferIndex + 1, buffersLen);
-      bufferOffsets = bufferOffsets.subList(
-          releaseUptoBufferIndex + 1, buffersLen);
-      bufferIndex = 0;
+    @Override
+    public void onNext(ContainerProtos.ContainerCommandResponseProto 
containerCommandResponseProto) {
+      try {
+        ReadBlockResponseProto readBlock = 
containerCommandResponseProto.getReadBlock();
+        ByteBuffer data = readBlock.getData().asReadOnlyByteBuffer();
+        if (verifyChecksum) {
+          ChecksumData checksumData = 
ChecksumData.getFromProtoBuf(readBlock.getChecksumData());
+          Checksum.verifyChecksum(data, checksumData, 0);
+        }
+        offerToQueue(readBlock);
+      } catch (OzoneChecksumException e) {
+        LOG.warn("Checksum verification failed for block {} from datanode {}",
+            getBlockID(), response.getDatanodeDetails(), e);
+        cancelDueToError(e);
+      }
     }
-  }
-
-  /**
-   * If EOF is reached, release the buffers.
-   */
-  private void releaseBuffers() {
-    buffers = null;
-    bufferIndex = 0;
-    // We should not reset bufferOffsetWrtBlockData and buffersSize here
-    // because when getPos() is called we use these
-    // values and determine whether chunk is read completely or not.
-  }
 
-  protected synchronized void releaseClient() {
-    if (xceiverClientFactory != null && xceiverClient != null) {
-      xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
-      xceiverClient = null;
+    @Override
+    public void onError(Throwable throwable) {
+      if (throwable instanceof StatusRuntimeException) {
+        if (((StatusRuntimeException) 
throwable).getStatus().getCode().name().equals("CANCELLED")) {
+          // This is expected when the client cancels the stream.
+          setCompleted();
+        }
+      } else {
+        setFailed(throwable);
+      }
+      releaseResources();
     }
-  }
-
-  private void validateBlock(
-      ContainerProtos.ContainerCommandResponseProto response
-  ) throws IOException {
 
-    ReadBlockResponseProto readBlock = response.getReadBlock();
-    for (ReadChunkResponseProto readChunk : readBlock.getReadChunkList()) {
-      List<ByteString> byteStrings;
+    @Override
+    public void onCompleted() {
+      setCompleted();
+      releaseResources();
+    }
 
-      ContainerProtos.ChunkInfo chunkInfo =
-          readChunk.getChunkData();
-      if (chunkInfo.getLen() <= 0) {
-        throw new IOException("Failed to get chunk: chunkName == "
-            + chunkInfo.getChunkName() + "len == " + chunkInfo.getLen());
-      }
-      byteStrings = readChunk.getDataBuffers().getBuffersList();
-      long buffersLen = BufferUtils.getBuffersLen(byteStrings);
-      if (buffersLen != chunkInfo.getLen()) {
-        // Bytes read from chunk should be equal to chunk size.
-        throw new OzoneChecksumException(String.format(
-            "Inconsistent read for chunk=%s len=%d bytesRead=%d",
-            chunkInfo.getChunkName(), chunkInfo.getLen(),
-            buffersLen));
+    /**
+     * By calling cancel, the client will send a cancel signal to the server, 
which will stop sending more data and
+     * cause the onError() to be called in this observer with a CANCELLED 
exception.
+     */
+    public void cancel() {
+      if (response != null && response.getRequestObserver() != null) {
+        response.getRequestObserver().cancel("Cancelled by client", 
CANCELLED_EXCEPTION);
+        setCompleted();
+        releaseResources();
       }
+    }
 
-
-      if (verifyChecksum) {
-        ChecksumData checksumData = ChecksumData.getFromProtoBuf(
-            chunkInfo.getChecksumData());
-        int startIndex = (int) readChunk.getChunkData().getOffset() / 
checksumData.getBytesPerChecksum();
-
-        // ChecksumData stores checksum for each 'numBytesPerChecksum'
-        // number of bytes in a list. Compute the index of the first
-        // checksum to match with the read data
-
-        Checksum.verifyChecksum(byteStrings, checksumData, startIndex);
+    public void cancelDueToError(Throwable exception) {
+      if (response != null && response.getRequestObserver() != null) {
+        response.getRequestObserver().onError(exception);
+        setFailed(exception);
+        releaseResources();
       }
     }
-  }
-
-  @VisibleForTesting
-  protected synchronized void setBuffers(List<ByteBuffer> buffers) {
-    this.buffers = buffers;
-  }
 
-  private boolean shouldRetryRead(IOException cause) throws IOException {
-    RetryPolicy.RetryAction retryAction;
-    try {
-      retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true);
-    } catch (IOException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new IOException(e);
+    private void setFailed(Throwable throwable) {
+      if (completed.get()) {
+        throw new IllegalArgumentException("Cannot mark a completed stream as 
failed");
+      }
+      failed.set(true);
+      error.set(throwable);
     }
-    return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
-  }
 
-  @VisibleForTesting
-  public boolean isVerifyChecksum() {
-    return verifyChecksum;
-  }
+    private void setCompleted() {
+      if (!failed.get()) {
+        completed.set(true);
+      }
+    }
 
-  private void refreshBlockInfo(IOException cause) throws IOException {
-    LOG.info("Attempting to update pipeline and block token for block {} from 
pipeline {}: {}",
-        blockID, pipelineRef.get().getId(), cause.getMessage());
-    if (refreshFunction != null) {
-      LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
-      BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
-      if (blockLocationInfo == null) {
-        LOG.warn("No new block location info for block {}", blockID);
-      } else {
-        setPipeline(blockLocationInfo.getPipeline());
-        LOG.info("New pipeline for block {}: {}", blockID,
-            blockLocationInfo.getPipeline());
-
-        tokenRef.set(blockLocationInfo.getToken());
-        if (blockLocationInfo.getToken() != null) {
-          OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
-          tokenId.readFromByteArray(tokenRef.get().getIdentifier());
-          LOG.info("A new token is added for block {}. Expiry: {}",
-              blockID, Instant.ofEpochMilli(tokenId.getExpiryDate()));
+    private void offerToQueue(ReadBlockResponseProto item) {
+      while (!completed.get() && !failed.get()) {
+        try {
+          if (responseQueue.offer(item, 100, TimeUnit.MILLISECONDS)) {
+            return;
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
         }
       }
-    } else {
-      throw cause;
     }
-  }
-
-  @VisibleForTesting
-  public synchronized ByteBuffer[] getCachedBuffers() {
-    return buffers == null ? null :
-        BufferUtils.getReadOnlyByteBuffers(buffers.toArray(new ByteBuffer[0]));
-  }
 
-  /**
-   * Check if this exception is because datanodes are not reachable.
-   */
-  private boolean isConnectivityIssue(IOException ex) {
-    return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    releaseClient();
-    releaseBuffers();
-    xceiverClientFactory = null;
-  }
-
-  private void handleStorageContainerException(StorageContainerException e) 
throws IOException {
-    if (shouldRetryRead(e)) {
-      releaseClient();
-      refreshBlockInfo(e);
-    } else {
-      throw e;
+    @Override
+    public void setStreamingReadResponse(StreamingReadResponse 
streamingReadResponse) {
+      response = streamingReadResponse;
     }
   }
 
-  private void handleIOException(IOException ioe) throws IOException {
-    if (shouldRetryRead(ioe)) {
-      if (isConnectivityIssue(ioe)) {
-        releaseClient();
-        refreshBlockInfo(ioe);
-      } else {
-        releaseClient();
-      }
-    } else {
-      throw ioe;
-    }
-  }
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 9249abaf42a..2765e2a00a8 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -90,9 +90,8 @@ public BlockExtendedInputStream create(ReplicationConfig 
repConfig,
           blockInfo, xceiverFactory, refreshFunction,
           ecBlockStreamFactory, config);
     } else if (config.isStreamReadBlock() && 
allDataNodesSupportStreamBlock(pipeline)) {
-      return new StreamBlockInputStream(
-          blockInfo.getBlockID(), blockInfo.getLength(),
-          pipeline, token, xceiverFactory, refreshFunction, config);
+      return new StreamBlockInputStream(blockInfo.getBlockID(), 
blockInfo.getLength(), pipeline, token, xceiverFactory,
+          refreshFunction, config);
     } else {
       return new BlockInputStream(blockInfo,
           pipeline, token, xceiverFactory, refreshFunction,
@@ -104,8 +103,7 @@ private boolean allDataNodesSupportStreamBlock(Pipeline 
pipeline) {
     // return true only if all DataNodes in the pipeline are on a version
     // that supports for reading a block by streaming chunks..
     for (DatanodeDetails dn : pipeline.getNodes()) {
-      if (dn.getCurrentVersion() <
-          STREAM_BLOCK_SUPPORT.toProtoValue()) {
+      if (dn.getCurrentVersion() < STREAM_BLOCK_SUPPORT.toProtoValue()) {
         return false;
       }
     }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
deleted file mode 100644
index e141845954d..00000000000
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.storage;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import org.apache.hadoop.hdds.client.BlockID;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.utils.BufferUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-
-/**
- * A dummy StreamBlockInputStream to mock read block call to DN.
- */
-class DummyStreamBlockInputStream extends StreamBlockInputStream {
-
-  private final List<ByteString> readByteBuffers = new ArrayList<>();
-  private final List<ChunkInfo> chunks;
-  private final long[] chunkOffsets;
-  private final Map<String, byte[]> chunkDataMap;
-
-  @SuppressWarnings("parameternumber")
-  DummyStreamBlockInputStream(
-      BlockID blockId,
-      long blockLen,
-      Pipeline pipeline,
-      Token<OzoneBlockTokenIdentifier> token,
-      XceiverClientFactory xceiverClientManager,
-      Function<BlockID, BlockLocationInfo> refreshFunction,
-      OzoneClientConfig config,
-      List<ChunkInfo> chunks,
-      Map<String, byte[]> chunkDataMap) throws IOException {
-    super(blockId, blockLen, pipeline, token, xceiverClientManager,
-        refreshFunction, config);
-    this.chunks = chunks;
-    this.chunkDataMap = chunkDataMap;
-    chunkOffsets = new long[chunks.size()];
-    long temp = 0;
-    for (int i = 0; i < chunks.size(); i++) {
-      chunkOffsets[i] = temp;
-      temp += chunks.get(i).getLen();
-    }
-  }
-
-  @Override
-  protected synchronized void checkOpen() throws IOException {
-    // No action needed
-  }
-
-  @Override
-  protected void acquireClient() {
-    // No action needed
-  }
-
-  @Override
-  protected void releaseClient() {
-    // no-op
-  }
-
-  @Override
-  protected long readData(long offset, long len) {
-    int chunkIndex = Arrays.binarySearch(chunkOffsets, offset);
-    if (chunkIndex < 0) {
-      chunkIndex = -chunkIndex - 2;
-    }
-    ChunkInfo chunkInfo = chunks.get(chunkIndex);
-    readByteBuffers.clear();
-    long chunkOffset = offset - chunkInfo.getOffset();
-    if (isVerifyChecksum()) {
-      ChecksumData checksumData = ChecksumData.getFromProtoBuf(
-          chunkInfo.getChecksumData());
-      int bytesPerChecksum = checksumData.getBytesPerChecksum();
-      chunkOffset = (chunkOffset / bytesPerChecksum) * bytesPerChecksum;
-    }
-    long bufferOffsetWrtBlockDataData = chunkOffsets[chunkIndex] + chunkOffset;
-    while (len > 0) {
-      ChunkInfo currentChunk = chunks.get(chunkIndex);
-      int bufferCapacity = 
currentChunk.getChecksumData().getBytesPerChecksum();
-      long chunkLen = currentChunk.getLen();
-      long remainingToRead = Math.min(chunkLen, len);
-      if (isVerifyChecksum()) {
-        if (len < chunkLen) {
-          final ChecksumData checksumData = 
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
-          final long endByteIndex = len - 1;
-          final int bytesPerChecksum = checksumData.getBytesPerChecksum();
-          remainingToRead = (endByteIndex / bytesPerChecksum + 1) * 
bytesPerChecksum;
-        } else {
-          remainingToRead = chunkLen;
-        }
-      }
-
-      long bufferLen;
-      while (remainingToRead > 0) {
-        if (remainingToRead < bufferCapacity) {
-          bufferLen = remainingToRead;
-        } else {
-          bufferLen = bufferCapacity;
-        }
-        ByteString byteString = 
ByteString.copyFrom(chunkDataMap.get(chunks.get(chunkIndex).getChunkName()),
-            (int) chunkOffset, (int) bufferLen);
-
-        readByteBuffers.add(byteString);
-
-        chunkOffset += bufferLen;
-        remainingToRead -= bufferLen;
-        len -= bufferLen;
-      }
-      chunkOffset = 0;
-      chunkIndex++;
-    }
-    setBuffers(BufferUtils.getReadOnlyByteBuffers(readByteBuffers));
-    return bufferOffsetWrtBlockDataData;
-  }
-
-  public List<ByteString> getReadByteBuffers() {
-    return readByteBuffers;
-  }
-}
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 2e9a84cad42..1b5e7667e84 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -267,8 +267,8 @@ public void testSeekAndRead() throws Exception {
 
   @Test
   public void testRefreshPipelineFunction() throws Exception {
-    LogCapturer logCapturer = LogCapturer.captureLogs(BlockInputStream.class);
-    GenericTestUtils.setLogLevel(BlockInputStream.class, Level.DEBUG);
+    LogCapturer logCapturer = 
LogCapturer.captureLogs(BlockExtendedInputStream.class);
+    GenericTestUtils.setLogLevel(BlockExtendedInputStream.class, Level.DEBUG);
     BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
     AtomicBoolean isRefreshed = new AtomicBoolean();
     createChunkList(5);
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
index 81cb6d4d62c..b2cb3fb865c 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
@@ -17,248 +17,338 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
-import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-
-import com.google.common.primitives.Bytes;
-import java.io.EOFException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
+import java.util.stream.Stream;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamingReadResponse;
+import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.StatusException;
+import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.invocation.InvocationOnMock;
 
 /**
  * Tests for {@link TestStreamBlockInputStream}'s functionality.
  */
 public class TestStreamBlockInputStream {
-  private int blockSize;
-  private static final int CHUNK_SIZE = 100;
-  private static final int BYTES_PER_CHECKSUM = 20;
-  private static final Random RANDOM = new Random();
-  private DummyStreamBlockInputStream blockStream;
-  private byte[] blockData;
-  private List<ChunkInfo> chunks;
-  private Map<String, byte[]> chunkDataMap;
+  private static final int BYTES_PER_CHECKSUM = 1024;
+  private static final int BLOCK_SIZE = 1024;
+  private StreamBlockInputStream blockStream;
+  private final OzoneConfiguration conf = new OzoneConfiguration();
+  private XceiverClientFactory xceiverClientFactory;
+  private XceiverClientGrpc xceiverClient;
   private Checksum checksum;
-  private BlockID blockID;
-  private static final String CHUNK_NAME = "chunk-";
-  private OzoneConfiguration conf = new OzoneConfiguration();
+  private ChecksumData checksumData;
+  private byte[] data;
+  private 
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto> 
requestObserver;
+  private Function<BlockID, BlockLocationInfo> refreshFunction;
 
   @BeforeEach
   public void setup() throws Exception {
+    Token<OzoneBlockTokenIdentifier> token = mock(Token.class);
+    when(token.encodeToUrlString()).thenReturn("url");
+
+    Set<HddsProtos.BlockTokenSecretProto.AccessModeProto> modes =
+        
Collections.singleton(HddsProtos.BlockTokenSecretProto.AccessModeProto.READ);
+    OzoneBlockTokenIdentifier tokenIdentifier = new 
OzoneBlockTokenIdentifier("owner", new BlockID(1, 1),
+        modes, Time.monotonicNow() + 10000, 10);
+    tokenIdentifier.setSecretKeyId(UUID.randomUUID());
+    when(token.getIdentifier()).thenReturn(tokenIdentifier.getBytes());
+    Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+
+    BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class);
+    when(blockLocationInfo.getPipeline()).thenReturn(pipeline);
+    when(blockLocationInfo.getToken()).thenReturn(token);
+
+    xceiverClient = mock(XceiverClientGrpc.class);
+    when(xceiverClient.getPipeline()).thenReturn(pipeline);
+    xceiverClientFactory = mock(XceiverClientFactory.class);
+    when(xceiverClientFactory.acquireClientForReadData(any()))
+        .thenReturn(xceiverClient);
+    requestObserver = mock(ClientCallStreamObserver.class);
+
     OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
     clientConfig.setStreamReadBlock(true);
-    blockID = new BlockID(new ContainerBlockID(1, 1));
+    clientConfig.setMaxReadRetryCount(1);
+    refreshFunction = mock(Function.class);
+    when(refreshFunction.apply(any())).thenReturn(blockLocationInfo);
+    BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
     checksum = new Checksum(ChecksumType.CRC32, BYTES_PER_CHECKSUM);
-    createChunkList(5);
-
-    Pipeline pipeline = MockPipeline.createSingleNodePipeline();
-    blockStream = new DummyStreamBlockInputStream(blockID, blockSize, pipeline,
-        null, null, mock(Function.class), clientConfig, chunks, chunkDataMap);
+    createDataAndChecksum();
+    blockStream = new StreamBlockInputStream(blockID, BLOCK_SIZE, pipeline,
+        token, xceiverClientFactory, refreshFunction, clientConfig);
   }
 
-  /**
-   * Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
-   * and the last chunk with length CHUNK_SIZE/2.
-   */
-  private void createChunkList(int numChunks)
-      throws Exception {
-
-    chunks = new ArrayList<>(numChunks);
-    chunkDataMap = new HashMap<>();
-    blockData = new byte[0];
-    int i, chunkLen;
-    byte[] byteData;
-    String chunkName;
-
-    for (i = 0; i < numChunks; i++) {
-      chunkName = CHUNK_NAME + i;
-      chunkLen = CHUNK_SIZE;
-      if (i == numChunks - 1) {
-        chunkLen = CHUNK_SIZE / 2;
+  @AfterEach
+  public void teardown() {
+    if (blockStream != null) {
+      try {
+        blockStream.close();
+      } catch (IOException e) {
+        // ignore
       }
-      byteData = generateRandomData(chunkLen);
-      ChunkInfo chunkInfo = ChunkInfo.newBuilder()
-          .setChunkName(chunkName)
-          .setOffset(0)
-          .setLen(chunkLen)
-          .setChecksumData(checksum.computeChecksum(
-              byteData, 0, chunkLen).getProtoBufMessage())
-          .build();
-
-      chunkDataMap.put(chunkName, byteData);
-      chunks.add(chunkInfo);
-
-      blockSize += chunkLen;
-      blockData = Bytes.concat(blockData, byteData);
     }
   }
 
-  static byte[] generateRandomData(int length) {
-    byte[] bytes = new byte[length];
-    RANDOM.nextBytes(bytes);
-    return bytes;
+  @Test
+  public void testCloseStreamReleasesResources() throws IOException, 
InterruptedException {
+    setupSuccessfulRead();
+    assertEquals(data[0], blockStream.read());
+    blockStream.close();
+    // Verify that cancel() was called on the requestObserver mock
+    verify(requestObserver).cancel(any(), any());
+    // Verify that release() was called on the xceiverClient mock
+    verify(xceiverClientFactory).releaseClientForReadData(xceiverClient, 
false);
+    verify(xceiverClient, times(1)).completeStreamRead(any());
   }
 
-  /**
-   * Match readData with the chunkData byte-wise.
-   * @param readData Data read through ChunkInputStream
-   * @param inputDataStartIndex first index (inclusive) in chunkData to compare
-   *                            with read data
-   * @param length the number of bytes of data to match starting from
-   *               inputDataStartIndex
-   */
-  private void matchWithInputData(byte[] readData, int inputDataStartIndex,
-                                  int length) {
-    for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
-      assertEquals(blockData[i], readData[i - inputDataStartIndex], "i: " + i);
-    }
+  @Test
+  public void testUnbufferReleasesResourcesAndResumesFromLastPosition() throws 
IOException, InterruptedException {
+    setupSuccessfulRead();
+    assertEquals(data[0], blockStream.read());
+    assertEquals(1, blockStream.getPos());
+    blockStream.unbuffer();
+    // Verify that cancel() was called on the requestObserver mock
+    verify(requestObserver).cancel(any(), any());
+    // Verify that release() was called on the xceiverClient mock
+    verify(xceiverClientFactory).releaseClientForReadData(xceiverClient, 
false);
+    verify(xceiverClient, times(1)).completeStreamRead(any());
+    // The next read should "rebuffer" and continue from the last position
+    assertEquals(data[1], blockStream.read());
+    assertEquals(2, blockStream.getPos());
   }
 
-  private void matchWithInputData(List<ByteString> byteStrings,
-                                  int inputDataStartIndex, int length) {
-    int offset = inputDataStartIndex;
-    int totalBufferLen = 0;
-    for (ByteString byteString : byteStrings) {
-      int bufferLen = byteString.size();
-      matchWithInputData(byteString.toByteArray(), offset, bufferLen);
-      offset += bufferLen;
-      totalBufferLen += bufferLen;
-    }
-    assertEquals(length, totalBufferLen);
+  @Test
+  public void testSeekReleasesTheStreamAndStartsFromNewPosition() throws 
IOException, InterruptedException {
+    setupSuccessfulRead();
+    assertEquals(data[0], blockStream.read());
+    blockStream.seek(100);
+    assertEquals(100, blockStream.getPos());
+    // Verify that cancel() was called on the requestObserver mock
+    verify(requestObserver).cancel(any(), any());
+    verify(xceiverClient, times(1)).completeStreamRead(any());
+    // The xceiverClient should not be released
+    verify(xceiverClientFactory, never())
+        .releaseClientForReadData(xceiverClient, false);
+
+    assertEquals(data[100], blockStream.read());
+    assertEquals(101, blockStream.getPos());
   }
 
-  /**
-   * Seek to a position and verify through getPos().
-   */
-  private void seekAndVerify(int pos) throws Exception {
-    blockStream.seek(pos);
-    assertEquals(pos, blockStream.getPos(),
-        "Current position of buffer does not match with the sought position");
+  @Test
+  public void testErrorThrownIfStreamReturnsError() throws IOException, 
InterruptedException {
+    // Note the error will only be thrown when the buffer needs to be 
refilled. I think case, as its the first
+    // read it will try to fill the buffer and encounter the error, but a 
reader could continue reading until the
+    // buffer is exhausted before seeing the error.
+    doAnswer((InvocationOnMock invocation) -> {
+      StreamingReaderSpi streamObserver = invocation.getArgument(1);
+      StreamingReadResponse resp =
+          new 
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), 
requestObserver);
+      streamObserver.setStreamingReadResponse(resp);
+      streamObserver.onError(new IOException("Test induced error"));
+      return null;
+    }).when(xceiverClient).streamRead(any(), any());
+    assertThrows(IOException.class, () -> blockStream.read());
+    verify(xceiverClient, times(0)).completeStreamRead(any());
   }
 
   @Test
-  public void testFullChunkRead() throws Exception {
-    byte[] b = new byte[blockSize];
-    int numBytesRead = blockStream.read(b, 0, blockSize);
-    assertEquals(blockSize, numBytesRead);
-    matchWithInputData(b, 0, blockSize);
+  public void seekOutOfBounds() throws IOException, InterruptedException {
+    setupSuccessfulRead();
+    assertThrows(IOException.class, () -> blockStream.seek(-1));
+    assertThrows(IOException.class, () -> blockStream.seek(BLOCK_SIZE + 1));
   }
 
   @Test
-  public void testPartialChunkRead() throws Exception {
-    int len = blockSize / 2;
-    byte[] b = new byte[len];
-
-    int numBytesRead = blockStream.read(b, 0, len);
-    assertEquals(len, numBytesRead);
-    matchWithInputData(b, 0, len);
-
-    // To read block data from index 0 to 225 (len = 225), we need to read
-    // chunk from offset 0 to 240 as the checksum boundary is at every 20
-    // bytes. Verify that 60 bytes of chunk data are read and stored in the
-    // buffers. Since checksum boundary is at every 20 bytes, there should be
-    // 240/20 number of buffers.
-    matchWithInputData(blockStream.getReadByteBuffers(), 0, 240);
+  public void readPastEOFReturnsEOF() throws IOException, InterruptedException 
{
+    setupSuccessfulRead();
+    blockStream.seek(BLOCK_SIZE);
+    // Ensure the stream is at EOF even after two attempts to read
+    assertEquals(-1, blockStream.read());
+    assertEquals(-1, blockStream.read());
+    assertEquals(BLOCK_SIZE, blockStream.getPos());
   }
 
   @Test
-  public void testSeek() throws Exception {
-    seekAndVerify(0);
-    EOFException eofException = assertThrows(EOFException.class, () ->  
seekAndVerify(blockSize + 1));
-    assertThat(eofException).hasMessage("EOF encountered at pos: " + 
(blockSize + 1) + " for block: " + blockID);
-
-    // Seek before read should update the BlockInputStream#blockPosition
-    seekAndVerify(25);
-
-    // Read from the sought position.
-    // Reading from index 25 to 54 should result in the BlockInputStream
-    // copying chunk data from index 20 to 59 into the buffers (checksum
-    // boundaries).
-    byte[] b = new byte[30];
-    int numBytesRead = blockStream.read(b, 0, 30);
-    assertEquals(30, numBytesRead);
-    matchWithInputData(b, 25, 30);
-    matchWithInputData(blockStream.getReadByteBuffers(), 20, 40);
-
-    // After read, the position of the blockStream is evaluated from the
-    // buffers and the chunkPosition should be reset to -1.
-
-    // Only the last BYTES_PER_CHECKSUM will be cached in the buffers as
-    // buffers are released after each checksum boundary is read. So the
-    // buffers should contain data from index 40 to 59.
-    // Seek to a position within the cached buffers. BlockPosition should
-    // still not be used to set the position.
-    seekAndVerify(45);
-
-    // Seek to a position outside the current cached buffers. In this case, the
-    // chunkPosition should be updated to the seeked position.
-    seekAndVerify(75);
-
-    // Read upto checksum boundary should result in all the buffers being
-    // released and hence chunkPosition updated with current position of chunk.
-    seekAndVerify(25);
-    b = new byte[15];
-    numBytesRead = blockStream.read(b, 0, 15);
-    assertEquals(15, numBytesRead);
-    matchWithInputData(b, 25, 15);
+  public void ensureExceptionThrownForReadAfterClosed() throws IOException, 
InterruptedException {
+    setupSuccessfulRead();
+    blockStream.close();
+    ByteBuffer byteBuffer = ByteBuffer.allocate(10);
+    byte[] byteArray = new byte[10];
+    assertThrows(IOException.class, () -> blockStream.read());
+    assertThrows(IOException.class, () -> {
+      // Findbugs complains about ignored return value without this :(
+      int r = blockStream.read(byteArray, 0, 10);
+    });
+    assertThrows(IOException.class, () -> blockStream.read(byteBuffer));
+    assertThrows(IOException.class, () -> blockStream.seek(10));
+  }
+
+  @ParameterizedTest
+  @MethodSource("exceptionsTriggeringRefresh")
+  public void testRefreshFunctionCalledForAllDNsBadOnInitialize(IOException 
thrown)
+      throws IOException, InterruptedException {
+    // In this case, if the first attempt to connect to any of the DNs fails, 
it should retry by refreshing the pipeline
+
+    doAnswer((InvocationOnMock invocation) -> {
+      throw thrown;
+    }).doAnswer((InvocationOnMock invocation) -> {
+      StreamingReaderSpi streamObserver = invocation.getArgument(1);
+      StreamingReadResponse resp =
+          new 
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), 
requestObserver);
+      streamObserver.setStreamingReadResponse(resp);
+      streamObserver.onNext(createChunkResponse(false));
+      streamObserver.onCompleted();
+      return null;
+    }).when(xceiverClient).streamRead(any(), any());
+    blockStream.read();
+    verify(refreshFunction, times(1)).apply(any());
+  }
+
+  @ParameterizedTest
+  @MethodSource("exceptionsNotTriggeringRefresh")
+  public void testRefreshNotCalledForAllDNsBadOnInitialize(IOException thrown)
+      throws IOException, InterruptedException {
+    // In this case, if the first attempt to connect to any of the DNs fails, 
it should retry by refreshing the pipeline
+    doAnswer((InvocationOnMock invocation) -> {
+      throw thrown;
+    }).when(xceiverClient).streamRead(any(), any());
+    assertThrows(IOException.class, () -> blockStream.read());
+    verify(refreshFunction, times(0)).apply(any());
   }
 
   @Test
-  public void testSeekAndRead() throws Exception {
-    // Seek to a position and read data
-    seekAndVerify(50);
-    byte[] b1 = new byte[20];
-    int numBytesRead = blockStream.read(b1, 0, 20);
-    assertEquals(20, numBytesRead);
-    matchWithInputData(b1, 50, 20);
-
-    // Next read should start from the position of the last read + 1 i.e. 70
-    byte[] b2 = new byte[20];
-    numBytesRead = blockStream.read(b2, 0, 20);
-    assertEquals(20, numBytesRead);
-    matchWithInputData(b2, 70, 20);
-
-    byte[] b3 = new byte[20];
-    seekAndVerify(80);
-    numBytesRead = blockStream.read(b3, 0, 20);
-    assertEquals(20, numBytesRead);
-    matchWithInputData(b3, 80, 20);
+  public void testExceptionThrownAfterRetriesExhausted() throws IOException, 
InterruptedException {
+    // In this case, if the first attempt to connect to any of the DNs fails, 
it should retry by refreshing the pipeline
+    doAnswer((InvocationOnMock invocation) -> {
+      throw new StorageContainerException(CONTAINER_NOT_FOUND);
+    }).when(xceiverClient).streamRead(any(), any());
+
+    assertThrows(IOException.class, () -> blockStream.read());
+    verify(refreshFunction, times(1)).apply(any());
   }
 
   @Test
-  public void testUnbuffered() throws Exception {
-    byte[] b1 = new byte[20];
-    int numBytesRead = blockStream.read(b1, 0, 20);
-    assertEquals(20, numBytesRead);
-    matchWithInputData(b1, 0, 20);
+  public void testInvalidChecksumThrowsException() throws IOException, 
InterruptedException {
+    doAnswer((InvocationOnMock invocation) -> {
+      StreamingReaderSpi streamObserver = invocation.getArgument(1);
+      StreamingReadResponse resp =
+          new 
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), 
requestObserver);
+      streamObserver.setStreamingReadResponse(resp);
+      streamObserver.onNext(createChunkResponse(true));
+      streamObserver.onCompleted();
+      return null;
+    }).when(xceiverClient).streamRead(any(), any());
+    assertThrows(IOException.class, () -> blockStream.read());
+  }
 
-    blockStream.unbuffer();
+  private void createDataAndChecksum() throws OzoneChecksumException {
+    data = new byte[BLOCK_SIZE];
+    new SecureRandom().nextBytes(data);
+    checksumData = checksum.computeChecksum(data);
+  }
+
+  private void setupSuccessfulRead() throws IOException, InterruptedException {
+    doAnswer((InvocationOnMock invocation) -> {
+      StreamingReaderSpi streamObserver = invocation.getArgument(1);
+      StreamingReadResponse resp =
+          new 
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), 
requestObserver);
+      streamObserver.setStreamingReadResponse(resp);
+      streamObserver.onNext(createChunkResponse(false));
+      streamObserver.onCompleted();
+      return null;
+    }).when(xceiverClient).streamRead(any(), any());
+  }
+
+  private ContainerProtos.ContainerCommandResponseProto 
createChunkResponse(boolean invalidChecksum) {
+    ContainerProtos.ReadBlockResponseProto response = invalidChecksum ?
+        createInValidChecksumResponse() : createValidResponse();
 
-    assertFalse(blockStream.buffersAllocated());
+    return ContainerProtos.ContainerCommandResponseProto.newBuilder()
+        .setCmdType(ContainerProtos.Type.ReadBlock)
+        .setReadBlock(response)
+        .setResult(ContainerProtos.Result.SUCCESS)
+        .build();
+  }
+
+  private ContainerProtos.ReadBlockResponseProto createValidResponse() {
+    return ContainerProtos.ReadBlockResponseProto.newBuilder()
+        .setChecksumData(checksumData.getProtoBufMessage())
+        .setData(ByteString.copyFrom(data))
+        .setOffset(0)
+        .build();
+  }
+
+  private ContainerProtos.ReadBlockResponseProto 
createInValidChecksumResponse() {
+    byte[] invalidData = new byte[data.length];
+    System.arraycopy(data, 0, invalidData, 0, data.length);
+    // Corrupt the data
+    invalidData[0] = (byte) (invalidData[0] + 1);
+    return ContainerProtos.ReadBlockResponseProto.newBuilder()
+        .setChecksumData(checksumData.getProtoBufMessage())
+        .setData(ByteString.copyFrom(invalidData))
+        .setOffset(0)
+        .build();
+  }
+
+  private static Stream<Arguments> exceptionsTriggeringRefresh() {
+    return Stream.of(
+        Arguments.of(new StorageContainerException(CONTAINER_NOT_FOUND)),
+        Arguments.of(new IOException(new ExecutionException(
+            new StatusException(Status.UNAVAILABLE))))
+    );
+  }
 
-    // Next read should start from the position of the last read + 1 i.e. 20
-    byte[] b2 = new byte[20];
-    numBytesRead = blockStream.read(b2, 0, 20);
-    assertEquals(20, numBytesRead);
-    matchWithInputData(b2, 20, 20);
+  private static Stream<Arguments> exceptionsNotTriggeringRefresh() {
+    return Stream.of(
+        Arguments.of(new SCMSecurityException("Security problem")),
+        Arguments.of(new OzoneChecksumException("checksum missing")),
+        Arguments.of(new IOException("Some random exception."))
+    );
   }
 
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
new file mode 100644
index 00000000000..ea8694cd8b7
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
+
+/**
+ * Streaming read response holding datanode details and
+ * request observer to send read requests.
+ */
+public class StreamingReadResponse {
+
+  private final DatanodeDetails dn;
+  private final 
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto> 
requestObserver;
+
+  public StreamingReadResponse(DatanodeDetails dn,
+      ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto> 
requestObserver) {
+    this.dn = dn;
+    this.requestObserver = requestObserver;
+  }
+
+  public DatanodeDetails getDatanodeDetails() {
+    return dn;
+  }
+
+  public 
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto> 
getRequestObserver() {
+    return requestObserver;
+  }
+}
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java
similarity index 62%
copy from 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
copy to 
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java
index 6753f600a91..0206253784c 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java
@@ -15,27 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hdds.scm.storage;
+package org.apache.hadoop.hdds.scm;
 
-import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 
 /**
- * Abstract class used as an interface for input streams related to Ozone
- * blocks.
+ * SPI for streaming reader to set the streaming read response.
  */
-public abstract class BlockExtendedInputStream extends ExtendedInputStream
-    implements PartInputStream {
+public interface StreamingReaderSpi extends 
StreamObserver<ContainerProtos.ContainerCommandResponseProto> {
 
-  public abstract BlockID getBlockID();
+  void setStreamingReadResponse(StreamingReadResponse streamingReadResponse);
 
-  @Override
-  public long getRemaining() {
-    return getLength() - getPos();
-  }
-
-  @Override
-  public abstract long getLength();
-
-  @Override
-  public abstract long getPos();
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 35e65271bb1..f1bf7a8ef85 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -144,6 +144,15 @@ public ContainerCommandResponseProto sendCommand(
     }
   }
 
+  public void streamRead(ContainerCommandRequestProto request,
+      StreamingReaderSpi streamObserver) throws IOException, 
InterruptedException {
+    throw new UnsupportedOperationException("Stream read is not supported");
+  }
+
+  public void completeStreamRead(StreamingReadResponse streamingReadResponse) {
+    throw new UnsupportedOperationException("Stream read is not supported");
+  }
+
   public static IOException getIOExceptionForSendCommand(
       ContainerCommandRequestProto request, Exception e) {
     return new IOException("Failed to execute command "
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index ededa8d070b..3fc97e97ffa 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -44,6 +44,7 @@
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto;
+import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.ChunkBufferToByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
@@ -335,19 +336,16 @@ public static ContainerCommandResponseProto 
getReadChunkResponse(
   }
 
   public static ContainerCommandResponseProto getReadBlockResponse(
-      ContainerCommandRequestProto request, DatanodeBlockID blockID,
-      ChunkInfo chunkInfo, ChunkBufferToByteString data,
-      Function<ByteBuffer, ByteString> byteBufferToByteString) {
+      ContainerCommandRequestProto request, ChecksumData checksumData, 
ByteBuffer data, long offset) {
+
+    ContainerProtos.ReadBlockResponseProto response = 
ContainerProtos.ReadBlockResponseProto.newBuilder()
+        .setChecksumData(checksumData.getProtoBufMessage())
+        .setData(ByteString.copyFrom(data))
+        .setOffset(offset)
+        .build();
 
-    ReadChunkResponseProto.Builder response;
-    response = ReadChunkResponseProto.newBuilder()
-        .setChunkData(chunkInfo)
-        .setDataBuffers(DataBuffers.newBuilder()
-            .addAllBuffers(data.toByteStringList(byteBufferToByteString))
-            .build())
-        .setBlockID(blockID);
     return getSuccessResponseBuilder(request)
-        .setReadChunk(response)
+        .setReadBlock(response)
         .build();
   }
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 1117d29c1aa..c6e5d75b5ca 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -56,13 +56,13 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockRequestProto;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
+import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
@@ -911,23 +911,18 @@ public static List<Validator> toValidatorList(Validator 
validator) {
    *
    * @param xceiverClient  client to perform call
    * @param offset         offset where block starts
-   * @param len            length of data to read
    * @param blockID        ID of the block
-   * @param validators     functions to validate the response
    * @param token          a token for this block (may be null)
-   * @return container protocol read chunk response
    * @throws IOException if there is an I/O error while performing the call
    */
   @SuppressWarnings("checkstyle:ParameterNumber")
-  public static ContainerProtos.ReadBlockResponseProto readBlock(
-      XceiverClientSpi xceiverClient, long offset, long len, BlockID blockID,
-      List<Validator> validators, Token<? extends TokenIdentifier> token,
-      Map<DatanodeDetails, Integer> replicaIndexes, boolean verifyChecksum) 
throws IOException {
+  public static void readBlock(
+      XceiverClientSpi xceiverClient, long offset, BlockID blockID, Token<? 
extends TokenIdentifier> token,
+      Map<DatanodeDetails, Integer> replicaIndexes, StreamingReaderSpi 
streamObserver)
+      throws IOException, InterruptedException {
     final ReadBlockRequestProto.Builder readBlockRequest =
         ReadBlockRequestProto.newBuilder()
-            .setOffset(offset)
-            .setVerifyChecksum(verifyChecksum)
-            .setLen(len);
+            .setOffset(offset);
     final ContainerCommandRequestProto.Builder builder =
         ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadBlock)
             .setContainerID(blockID.getContainerID());
@@ -935,18 +930,14 @@ public static ContainerProtos.ReadBlockResponseProto 
readBlock(
       builder.setEncodedToken(token.encodeToUrlString());
     }
 
-    return tryEachDatanode(xceiverClient.getPipeline(),
-        d -> readBlock(xceiverClient,
-            validators, blockID, builder, readBlockRequest, d, replicaIndexes),
-        d -> toErrorMessage(blockID, d));
+    readBlock(xceiverClient, blockID, builder, readBlockRequest, 
xceiverClient.getPipeline().getFirstNode(),
+        replicaIndexes, streamObserver);
   }
 
-  private static ReadBlockResponseProto readBlock(XceiverClientSpi 
xceiverClient,
-                                                  List<Validator> validators, 
BlockID blockID,
-                                                  
ContainerCommandRequestProto.Builder builder,
-                                                  
ReadBlockRequestProto.Builder readBlockBuilder,
-                                                  DatanodeDetails datanode,
-                                                  Map<DatanodeDetails, 
Integer> replicaIndexes) throws IOException {
+  private static void readBlock(XceiverClientSpi xceiverClient,
+      BlockID blockID, ContainerCommandRequestProto.Builder builder, 
ReadBlockRequestProto.Builder readBlockBuilder,
+      DatanodeDetails datanode, Map<DatanodeDetails, Integer> replicaIndexes,
+      StreamingReaderSpi streamObserver) throws IOException, 
InterruptedException {
     final DatanodeBlockID.Builder datanodeBlockID = 
blockID.getDatanodeBlockIDProtobufBuilder();
     int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
     if (replicaIndex > 0) {
@@ -956,8 +947,6 @@ private static ReadBlockResponseProto 
readBlock(XceiverClientSpi xceiverClient,
     final ContainerCommandRequestProto request = builder
         .setDatanodeUuid(datanode.getUuidString())
         .setReadBlock(readBlockBuilder).build();
-    ContainerCommandResponseProto response =
-        xceiverClient.sendCommand(request, validators);
-    return response.getReadBlock();
+    xceiverClient.streamRead(request, streamObserver);
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index e6be4a490b4..6dba6abf9d0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -818,8 +818,8 @@ public StateMachine.DataChannel getStreamDataChannel(
 
   @Override
   public void streamDataReadOnly(ContainerCommandRequestProto msg,
-                                 StreamObserver<ContainerCommandResponseProto> 
streamObserver,
-                                 DispatcherContext dispatcherContext) {
+      StreamObserver<ContainerCommandResponseProto> streamObserver,
+      DispatcherContext dispatcherContext) {
     Type cmdType = msg.getCmdType();
     String traceID = msg.getTraceID();
     Span span = TracingUtil.importAndCreateSpan(cmdType.toString(), traceID);
@@ -829,8 +829,7 @@ public void streamDataReadOnly(ContainerCommandRequestProto 
msg,
     try (UncheckedAutoCloseable ignored = protocolMetrics.measure(cmdType)) {
       Preconditions.checkNotNull(msg);
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Command {}, trace ID: {}.", msg.getCmdType(),
-            traceID);
+        LOG.trace("Command {}, trace ID: {}.", msg.getCmdType(), traceID);
       }
 
       PerformanceStringBuilder perf = new PerformanceStringBuilder();
@@ -849,20 +848,17 @@ public void 
streamDataReadOnly(ContainerCommandRequestProto msg,
             ContainerProtos.Result.CONTAINER_MISSING);
       }
       if (container == null) {
-        throw new StorageContainerException(
-            "ContainerID " + containerID + " does not exist",
+        throw new StorageContainerException("ContainerID " + containerID + " 
does not exist",
             ContainerProtos.Result.CONTAINER_NOT_FOUND);
       }
       ContainerType containerType = getContainerType(container);
       Handler handler = getHandler(containerType);
       if (handler == null) {
-        throw new StorageContainerException("Invalid " +
-            "ContainerType " + containerType,
+        throw new StorageContainerException("Invalid " + "ContainerType " + 
containerType,
             ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
       }
       perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
-      responseProto = handler.readBlock(
-          msg, container, dispatcherContext, streamObserver);
+      responseProto = handler.readBlock(msg, container, dispatcherContext, 
streamObserver);
       long oPLatencyMS = Time.monotonicNow() - startTime;
       metrics.incContainerOpsLatencies(cmdType, oPLatencyMS);
       if (responseProto == null) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index d6c33455008..8a4a675187d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -98,7 +98,6 @@ default void streamDataReadOnly(
        ContainerCommandRequestProto msg,
        StreamObserver<ContainerCommandResponseProto> streamObserver,
        DispatcherContext dispatcherContext) {
-    throw new UnsupportedOperationException(
-         "streamDataReadOnly not supported.");
+    throw new UnsupportedOperationException("streamDataReadOnly not 
supported.");
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index ecf13cf12af..2f4177b2d06 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -59,6 +59,7 @@
 import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
 import static 
org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient.createSingleNodePipeline;
 import static 
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.DEFAULT_LAYOUT;
+import static 
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -69,7 +70,9 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -77,6 +80,7 @@
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -88,7 +92,6 @@
 import java.util.concurrent.locks.Lock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -124,6 +127,7 @@
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
 import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.ChunkBufferToByteString;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
@@ -176,6 +180,7 @@ public class KeyValueHandler extends Handler {
 
   private static final Logger LOG = LoggerFactory.getLogger(
       KeyValueHandler.class);
+  private static final int STREAMING_BYTES_PER_CHUNK = 1024 * 64;
 
   private final BlockManager blockManager;
   private final ChunkManager chunkManager;
@@ -2055,110 +2060,81 @@ public ContainerCommandResponseProto readBlock(
       ContainerCommandRequestProto request, Container kvContainer,
       DispatcherContext dispatcherContext,
       StreamObserver<ContainerCommandResponseProto> streamObserver) {
+
+    if (kvContainer.getContainerData().getLayoutVersion() != FILE_PER_BLOCK) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Only File Per Block is supported", 
IO_EXCEPTION), request);
+    }
+
     ContainerCommandResponseProto responseProto = null;
     if (!request.hasReadBlock()) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Malformed Read Block request. trace ID: {}",
-            request.getTraceID());
+        LOG.debug("Malformed Read Block request. trace ID: {}", 
request.getTraceID());
       }
       return malformedRequest(request);
     }
     try {
       ReadBlockRequestProto readBlock = request.getReadBlock();
 
-      BlockID blockID = BlockID.getFromProtobuf(
-          readBlock.getBlockID());
+      BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID());
       // This is a new api the block should always be checked.
       BlockUtils.verifyReplicaIdx(kvContainer, blockID);
       BlockUtils.verifyBCSId(kvContainer, blockID);
 
+      File blockFile = 
FILE_PER_BLOCK.getChunkFile(kvContainer.getContainerData(), blockID, "unused");
+
       BlockData blockData = getBlockManager().getBlock(kvContainer, blockID);
       List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
-      long blockOffset = 0;
-      int chunkIndex = -1;
-      long chunkOffset = 0;
-      long offset = readBlock.getOffset();
-      for (int i = 0; i < chunkInfos.size(); i++) {
-        final long chunkLen = chunkInfos.get(i).getLen();
-        blockOffset += chunkLen;
-        if (blockOffset > offset) {
-          chunkIndex = i;
-          chunkOffset = offset - blockOffset + chunkLen;
-          break;
-        }
-      }
-      Preconditions.checkState(chunkIndex >= 0);
-
-      if (dispatcherContext == null) {
-        dispatcherContext = DispatcherContext.getHandleReadBlock();
-      }
-
-      ChunkBufferToByteString data;
+      // To get the chunksize, check the first chunk. Either there is only 1 
chunk and its the largest, or there are
+      // multiple chunks and they are all the same size except the last one.
+      long bytesPerChunk = chunkInfos.get(0).getLen();
+      // The bytes per checksum is stored in the checksum data of each chunk, 
so check the first chunk as they all
+      // must be the same.
+      ContainerProtos.ChecksumType checksumType = 
chunkInfos.get(0).getChecksumData().getType();
+      ChecksumData checksumData = null;
+      int bytesPerChecksum = STREAMING_BYTES_PER_CHUNK;
+      if (checksumType == ContainerProtos.ChecksumType.NONE) {
+        checksumData = new ChecksumData(checksumType, 0);
+      } else {
+        bytesPerChecksum = 
chunkInfos.get(0).getChecksumData().getBytesPerChecksum();
+      }
+      // We have to align the read to checksum boundaries, so whatever offset 
is requested, we have to move back to the
+      // previous checksum boundary.
+      // eg if bytesPerChecksum is 512, and the requested offset is 600, we 
have to move back to 512.
+      // If the checksum type is NONE, we don't have to do this, but using no 
checksums should be rare in practice and
+      // it simplifies the code to always do this.
+      long adjustedOffset = readBlock.getOffset() - readBlock.getOffset() % 
bytesPerChecksum;
+      try (RandomAccessFile file = new RandomAccessFile(blockFile, "r");
+           FileChannel channel = file.getChannel()) {
+        ByteBuffer buffer = ByteBuffer.allocate(bytesPerChecksum);
+        channel.position(adjustedOffset);
+        while (channel.read(buffer) != -1) {
+          buffer.flip();
+          if (checksumType != ContainerProtos.ChecksumType.NONE) {
+            // As the checksums are stored "chunk by chunk", we need to figure 
out which chunk we start reading from,
+            // and its offset to pull out the correct checksum bytes for each 
read.
+            int chunkIndex = (int) (adjustedOffset / bytesPerChunk);
+            int chunkOffset = (int) (adjustedOffset % bytesPerChunk);
+            int checksumIndex = chunkOffset / bytesPerChecksum;
+            ByteString checksum = 
blockData.getChunks().get(chunkIndex).getChecksumData().getChecksums(checksumIndex);
+            checksumData = new ChecksumData(checksumType, bytesPerChecksum, 
Collections.singletonList(checksum));
+          }
+          streamObserver.onNext(getReadBlockResponse(request, checksumData, 
buffer, adjustedOffset));
+          buffer.clear();
 
-      long len =  readBlock.getLen();
-      long adjustedChunkOffset, adjustedChunkLen;
-      do {
-        ContainerProtos.ChunkInfo chunk = chunkInfos.get(chunkIndex);
-        if (readBlock.getVerifyChecksum()) {
-          Pair<Long, Long> adjustedOffsetAndLength =
-              computeChecksumBoundaries(chunk, chunkOffset, len);
-          adjustedChunkOffset = adjustedOffsetAndLength.getLeft();
-          adjustedChunkLen = adjustedOffsetAndLength.getRight();
-          adjustedChunkOffset += chunk.getOffset();
-        } else {
-          adjustedChunkOffset = chunkOffset;
-          adjustedChunkLen = Math.min(
-              chunk.getLen() + chunk.getOffset() - chunkOffset, len);
+          adjustedOffset += bytesPerChecksum;
         }
-
-        ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
-            ContainerProtos.ChunkInfo.newBuilder(chunk)
-                .setOffset(adjustedChunkOffset)
-                .setLen(adjustedChunkLen).build());
-        BlockUtils.verifyReplicaIdx(kvContainer, blockID);
-        BlockUtils.verifyBCSId(kvContainer, blockID);
-        data = getChunkManager().readChunk(
-            kvContainer, blockID, chunkInfo, dispatcherContext);
-
-        Preconditions.checkNotNull(data, "Chunk data is null");
-        streamObserver.onNext(
-            getReadBlockResponse(request,
-                blockData.getProtoBufMessage().getBlockID(),
-                chunkInfo.getProtoBufMessage(),
-                data, byteBufferToByteString));
-        len -= adjustedChunkLen + adjustedChunkOffset - chunkOffset;
-        chunkOffset = 0;
-        chunkIndex++;
-      } while (len > 0);
-
-      metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen());
+      }
+      // TODO metrics.incContainerBytesStats(Type.ReadBlock, 
readBlock.getLen());
     } catch (StorageContainerException ex) {
       responseProto = ContainerUtils.logAndReturnError(LOG, ex, request);
     } catch (IOException ioe) {
       responseProto = ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Read Block failed", ioe, 
IO_EXCEPTION),
-          request);
+          new StorageContainerException("Read Block failed", ioe, 
IO_EXCEPTION), request);
     }
     return responseProto;
   }
 
-  private Pair<Long, Long> computeChecksumBoundaries(
-      ContainerProtos.ChunkInfo chunkInfo, long startByteIndex, long dataLen) {
-
-    int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum();
-
-    // index of the last byte to be read from chunk, inclusively.
-    final long endByteIndex = startByteIndex + dataLen - 1;
-
-    long adjustedChunkOffset = (startByteIndex / bytesPerChecksum)
-        * bytesPerChecksum; // inclusive
-    final long endIndex = ((endByteIndex / bytesPerChecksum) + 1)
-        * bytesPerChecksum; // exclusive
-    long adjustedChunkLen =
-        Math.min(endIndex, chunkInfo.getLen()) - adjustedChunkOffset;
-    return Pair.of(adjustedChunkOffset, adjustedChunkLen);
-  }
-
   @Override
   public void addFinalizedBlock(Container container, long localID) {
     KeyValueContainer keyValueContainer = (KeyValueContainer)container;
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index efede65e524..6afee1c5d77 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -42,10 +42,8 @@
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.atMostOnce;
-import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -55,11 +53,9 @@
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Clock;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -72,29 +68,22 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
-import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
 import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -105,28 +94,22 @@
 import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
 import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
 import org.apache.hadoop.util.Time;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.GenericTestUtils.LogCapturer;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -144,10 +127,7 @@ public class TestKeyValueHandler {
   private Path dbFile;
 
   private static final long DUMMY_CONTAINER_ID = 9999;
-  private static final long LOCAL_ID = 1;
   private static final String DUMMY_PATH = "dummy/dir/doesnt/exist";
-  private static final long CHUNK_SIZE = 1024 * 1024;          // 1MB
-  private static final long BYTES_PER_CHECKSUM = 256 * 1024;
   private static final String DATANODE_UUID = UUID.randomUUID().toString();
   private static final String CLUSTER_ID = UUID.randomUUID().toString();
 
@@ -958,77 +938,4 @@ private KeyValueHandler createKeyValueHandler(Path path) 
throws IOException {
 
     return kvHandler;
   }
-
-  @Test
-  public void testReadBlock() throws IOException {
-
-    StreamObserver<ContainerCommandResponseProto> streamObserver = 
mock(StreamObserver.class);
-    KeyValueContainer container = mock(KeyValueContainer.class);
-    final KeyValueHandler kvHandler = new KeyValueHandler(new 
OzoneConfiguration(),
-        UUID.randomUUID().toString(), mock(ContainerSet.class), 
mock(VolumeSet.class), mock(ContainerMetrics.class),
-        mock(IncrementalReportSender.class), 
mock(ContainerChecksumTreeManager.class));
-    final KeyValueHandler keyValueHandler = spy(kvHandler);
-    DispatcherContext dispatcherContext = mock(DispatcherContext.class);
-
-    List<ContainerProtos.ChunkInfo> chunkInfoList = new ArrayList<>();
-    BlockData blockData = new BlockData(new BlockID(1, 1));
-    for (int i = 0; i < 4; i++) {
-      chunkInfoList.add(ContainerProtos.ChunkInfo
-          .newBuilder()
-          .setOffset(0)
-          .setLen(CHUNK_SIZE)
-          .setChecksumData(
-              ChecksumData.newBuilder().setBytesPerChecksum((int) 
BYTES_PER_CHECKSUM)
-                  .setType(ChecksumType.CRC32).build())
-          .setChunkName("chunkName" + i)
-          .build());
-    }
-    blockData.setChunks(chunkInfoList);
-
-    try (MockedStatic<BlockUtils> blockUtils = mockStatic(BlockUtils.class)) {
-      BlockManager blockManager = mock(BlockManager.class);
-      ChunkManager chunkManager = mock(ChunkManager.class);
-      when(keyValueHandler.getBlockManager()).thenReturn(blockManager);
-      when(keyValueHandler.getChunkManager()).thenReturn(chunkManager);
-      when(blockManager.getBlock(any(), any())).thenReturn(blockData);
-      ChunkBuffer data = ChunkBuffer.wrap(ByteBuffer.allocate(0));
-      when(chunkManager.readChunk(any(), any(),
-          any(), any()))
-          .thenReturn(data);
-      testReadBlock(0, 1, keyValueHandler, dispatcherContext,
-          streamObserver, container);
-      testReadBlock(0, CHUNK_SIZE + 1, keyValueHandler, dispatcherContext,
-          streamObserver, container);
-      testReadBlock(CHUNK_SIZE / 2, 2 * CHUNK_SIZE, keyValueHandler, 
dispatcherContext,
-          streamObserver, container);
-    }
-  }
-
-  private static ContainerCommandRequestProto readBlockRequest(
-      long offset, long length) {
-    return ContainerCommandRequestProto.newBuilder()
-        .setCmdType(Type.ReadBlock)
-        .setReadBlock(
-            ContainerProtos.ReadBlockRequestProto.newBuilder()
-                .setBlockID(
-                    ContainerProtos.DatanodeBlockID.newBuilder()
-                        .setContainerID(DUMMY_CONTAINER_ID)
-                        .setLocalID(LOCAL_ID))
-                .setOffset(offset)
-                .setLen(length)
-                .setVerifyChecksum(true))
-        .setContainerID(DUMMY_CONTAINER_ID)
-        .setDatanodeUuid(UUID.randomUUID().toString())
-        .build();
-  }
-
-  private static void testReadBlock(
-      long offset, long length, KeyValueHandler keyValueHandler, 
DispatcherContext dispatcherContext,
-      StreamObserver<ContainerCommandResponseProto> streamObserver, 
KeyValueContainer container) {
-    int responseCount = (int) (((offset + length - 1) / CHUNK_SIZE) + 1 - 
(offset / CHUNK_SIZE));
-    ContainerCommandRequestProto requestProto = readBlockRequest(offset, 
length);
-    keyValueHandler.readBlock(requestProto, container, dispatcherContext, 
streamObserver);
-    verify(streamObserver, times(responseCount)).onNext(any());
-    clearInvocations(streamObserver);
-  }
 }
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index c5548244560..6b4d8f1bd7f 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -401,12 +401,12 @@ message ListBlockResponseProto {
 message ReadBlockRequestProto {
   required DatanodeBlockID blockID = 1;
   required uint64 offset = 2;
-  required uint64 len = 3;
-  required bool verifyChecksum = 4;
 }
 
 message ReadBlockResponseProto {
-  repeated ReadChunkResponseProto readChunk = 1;
+  required ChecksumData checksumData = 1;
+  required uint64 offset = 2;
+  required bytes data = 3;
 }
 
 message EchoRequestProto {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
index 80ae5118467..bb66a303155 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
@@ -18,24 +18,21 @@
 package org.apache.hadoop.ozone.client.rpc.read;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
-import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
-import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
 import org.apache.hadoop.ozone.om.TestBucket;
+import org.junit.jupiter.api.Test;
 
 /**
  * Tests {@link StreamBlockInputStream}.
@@ -45,8 +42,12 @@ public class TestStreamBlockInputStream extends 
TestInputStreamBase {
    * Run the tests as a single test method to avoid needing a new mini-cluster
    * for each test.
    */
-  @ContainerLayoutTestInfo.ContainerTest
-  void testAll(ContainerLayoutVersion layout) throws Exception {
+  private static final int DATA_LENGTH = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
+  private byte[] inputData;
+  private TestBucket bucket;
+
+  @Test
+  void testAll() throws Exception {
     try (MiniOzoneCluster cluster = newCluster()) {
       cluster.waitForClusterToBeReady();
 
@@ -55,14 +56,22 @@ void testAll(ContainerLayoutVersion layout) throws 
Exception {
       clientConfig.setStreamReadBlock(true);
       OzoneConfiguration copy = new OzoneConfiguration(conf);
       copy.setFromObject(clientConfig);
+      String keyName = getNewKeyName();
       try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
-        updateConfig(layout);
-        TestBucket bucket = TestBucket.newBuilder(client).build();
-
-        testBlockReadBuffers(bucket);
-        testBufferRelease(bucket);
-        testCloseReleasesBuffers(bucket);
-        testReadEmptyBlock(bucket);
+        bucket = TestBucket.newBuilder(client).build();
+        inputData = bucket.writeRandomBytes(keyName, DATA_LENGTH);
+        testReadKeyFully(keyName);
+        testSeek(keyName);
+        testReadEmptyBlock();
+      }
+      keyName = getNewKeyName();
+      clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
+      copy.setFromObject(clientConfig);
+      try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
+        bucket = TestBucket.newBuilder(client).build();
+        inputData = bucket.writeRandomBytes(keyName, DATA_LENGTH);
+        testReadKeyFully(keyName);
+        testSeek(keyName);
       }
     }
   }
@@ -71,175 +80,66 @@ void testAll(ContainerLayoutVersion layout) throws 
Exception {
    * Test to verify that data read from blocks is stored in a list of buffers
    * with max capacity equal to the bytes per checksum.
    */
-  private void testBlockReadBuffers(TestBucket bucket) throws Exception {
-    String keyName = getNewKeyName();
-    int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
-    byte[] inputData = bucket.writeRandomBytes(keyName, dataLength);
-
-    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
-
-      StreamBlockInputStream block0Stream =
-          (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
-
-
-      // To read 1 byte of chunk data, ChunkInputStream should get one full
-      // checksum boundary worth of data from Container and store it in 
buffers.
-      IOUtils.readFully(block0Stream, new byte[1]);
-      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-
-      // Read > checksum boundary of data from chunk0
-      int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
-      byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen);
-      bucket.validateData(inputData, 0, readData);
-
-      // The first checksum boundary size of data was already existing in the
-      // ChunkStream buffers. Once that data is read, the next checksum
-      // boundary size of data will be fetched again to read the remaining 
data.
-      // Hence, there should be 1 checksum boundary size of data stored in the
-      // ChunkStreams buffers at the end of the read.
-      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-
-      // Seek to a position in the third checksum boundary (so that current
-      // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
-      // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
-      // data being read into the buffers. There should be 2 buffers in the
-      // stream but the first buffer should be released after it is read
-      // and the second buffer should have BYTES_PER_CHECKSUM capacity.
-      int offset = 2 * BYTES_PER_CHECKSUM + 1;
-      readData = readDataFromBlock(block0Stream, offset, readDataLen);
-      bucket.validateData(inputData, offset, readData);
-      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-
-
-      // Read the full chunk data -1 and verify that all chunk data is read 
into
-      // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
-      // released once all chunk data is read.
-      readData = readDataFromBlock(block0Stream, 0, CHUNK_SIZE - 1);
-      bucket.validateData(inputData, 0, readData);
-      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-
-      // Read the last byte of chunk and verify that the buffers are released.
-      IOUtils.readFully(block0Stream, new byte[1]);
-      assertNull(block0Stream.getCachedBuffers(),
-          "ChunkInputStream did not release buffers after reaching EOF.");
+  private void testReadKeyFully(String key) throws Exception {
+    // Read the data fully into a large enough byte array
+    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) {
+      byte[] readData = new byte[DATA_LENGTH];
+      int totalRead = keyInputStream.read(readData, 0, DATA_LENGTH);
+      assertEquals(DATA_LENGTH, totalRead);
+      for (int i = 0; i < DATA_LENGTH; i++) {
+        assertEquals(inputData[i], readData[i],
+            "Read data is not same as written data at index " + i);
+      }
     }
-  }
-
-  private void testCloseReleasesBuffers(TestBucket bucket) throws Exception {
-    String keyName = getNewKeyName();
-    bucket.writeRandomBytes(keyName, CHUNK_SIZE);
-
-    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
-      StreamBlockInputStream block0Stream =
-          (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
-
-      readDataFromBlock(block0Stream, 0, 1);
-      assertNotNull(block0Stream.getCachedBuffers());
-
-      block0Stream.close();
-
-      assertNull(block0Stream.getCachedBuffers());
+    // Read the data 1 byte at a time
+    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) {
+      for (int i = 0; i < DATA_LENGTH; i++) {
+        int b = keyInputStream.read();
+        assertEquals(inputData[i], (byte) b,
+            "Read data is not same as written data at index " + i);
+      }
     }
-  }
-
-  /**
-   * Test that ChunkInputStream buffers are released as soon as the last byte
-   * of the buffer is read.
-   */
-  private void testBufferRelease(TestBucket bucket) throws Exception {
-    String keyName = getNewKeyName();
-    byte[] inputData = bucket.writeRandomBytes(keyName, CHUNK_SIZE);
-
-    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
-
-      StreamBlockInputStream block0Stream =
-          (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
-
-      // Read checksum boundary - 1 bytes of data
-      int readDataLen = BYTES_PER_CHECKSUM - 1;
-      byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen);
-      bucket.validateData(inputData, 0, readData);
-
-      // There should be 1 byte of data remaining in the buffer which is not
-      // yet read. Hence, the buffer should not be released.
-      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-      assertEquals(1, block0Stream.getCachedBuffers()[0].remaining());
-
-      // Reading the last byte in the buffer should result in all the buffers
-      // being released.
-      readData = readDataFromBlock(block0Stream, 1);
-      bucket.validateData(inputData, readDataLen, readData);
-      assertNull(block0Stream.getCachedBuffers(),
-          "Chunk stream buffers not released after last byte is read");
-
-      // Read more data to get the data till the next checksum boundary.
-      readDataLen = BYTES_PER_CHECKSUM / 2;
-      readDataFromBlock(block0Stream, readDataLen);
-      // There should be one buffer and the buffer should not be released as
-      // there is data pending to be read from the buffer
-      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-      ByteBuffer lastCachedBuffer = block0Stream.getCachedBuffers()[0];
-      assertEquals(BYTES_PER_CHECKSUM - readDataLen,
-          lastCachedBuffer.remaining());
-
-      // Read more than the remaining data in buffer (but less than the next
-      // checksum boundary).
-      int position = (int) block0Stream.getPos();
-      readDataLen = lastCachedBuffer.remaining() + BYTES_PER_CHECKSUM / 2;
-      readData = readDataFromBlock(block0Stream, readDataLen);
-      bucket.validateData(inputData, position, readData);
-      // After reading the remaining data in the buffer, the buffer should be
-      // released and next checksum size of data must be read into the buffers
-      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-      // Verify that the previously cached buffer is released by comparing it
-      // with the current cached buffer
-      assertNotEquals(lastCachedBuffer,
-          block0Stream.getCachedBuffers()[0]);
+    // Read the data into a large enough ByteBuffer
+    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) {
+      ByteBuffer readBuf = ByteBuffer.allocate(DATA_LENGTH);
+      int totalRead = keyInputStream.read(readBuf);
+      assertEquals(DATA_LENGTH, totalRead);
+      readBuf.flip();
+      for (int i = 0; i < DATA_LENGTH; i++) {
+        assertEquals(inputData[i], readBuf.get(),
+            "Read data is not same as written data at index " + i);
+      }
     }
   }
 
-  private byte[] readDataFromBlock(StreamBlockInputStream 
streamBlockInputStream,
-                                   int offset, int readDataLength) throws 
IOException {
-    byte[] readData = new byte[readDataLength];
-    streamBlockInputStream.seek(offset);
-    IOUtils.readFully(streamBlockInputStream, readData);
-    return readData;
-  }
-
-  private byte[] readDataFromBlock(StreamBlockInputStream 
streamBlockInputStream,
-                                   int readDataLength) throws IOException {
-    byte[] readData = new byte[readDataLength];
-    IOUtils.readFully(streamBlockInputStream, readData);
-    return readData;
-  }
-
-  /**
-   * Verify number of buffers and their capacities.
-   * @param buffers chunk stream buffers
-   */
-  private void checkBufferSizeAndCapacity(ByteBuffer[] buffers) {
-    assertEquals(1, buffers.length,
-        "ChunkInputStream does not have expected number of " +
-            "ByteBuffers");
-    for (ByteBuffer buffer : buffers) {
-      assertEquals(BYTES_PER_CHECKSUM, buffer.capacity(),
-          "ChunkInputStream ByteBuffer capacity is wrong");
+  private void testSeek(String key) throws IOException {
+    java.util.Random random = new java.util.Random();
+    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) {
+      for (int i = 0; i < 100; i++) {
+        int position = random.nextInt(DATA_LENGTH);
+        keyInputStream.seek(position);
+        int b = keyInputStream.read();
+        assertEquals(inputData[position], (byte) b, "Read data is not same as 
written data at index " + position);
+      }
+      StreamBlockInputStream blockStream = (StreamBlockInputStream) 
keyInputStream.getPartStreams().get(0);
+      long length = blockStream.getLength();
+      blockStream.seek(10);
+      long position = blockStream.getPos();
+      assertThrows(IOException.class, () -> blockStream.seek(length + 1),
+          "Seek beyond block length should throw exception");
+      assertThrows(IOException.class, () -> blockStream.seek(-1),
+          "Seeking to a negative position should throw exception");
+      assertEquals(position, blockStream.getPos(),
+          "Position should not change after failed seek attempts");
     }
   }
 
-  private void testReadEmptyBlock(TestBucket bucket) throws Exception {
+  private void testReadEmptyBlock() throws Exception {
     String keyName = getNewKeyName();
-    int dataLength = 10;
     bucket.writeRandomBytes(keyName, 0);
-
     try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
-
-      byte[] readData = new byte[dataLength];
       assertTrue(keyInputStream.getPartStreams().isEmpty());
-      IOUtils.read(keyInputStream, readData);
-      for (byte b : readData) {
-        assertEquals((byte) 0, b);
-      }
+      assertEquals(-1, keyInputStream.read());
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to