This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f52c16620fe HDDS-13973. The ground work to support stream read block 
(#9342)
f52c16620fe is described below

commit f52c16620fe3a0032824ea0aa3a976f1e84bee07
Author: Chung En Lee <[email protected]>
AuthorDate: Wed Nov 26 04:14:31 2025 +0800

    HDDS-13973. The ground work to support stream read block (#9342)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  |  19 +-
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  73 +-
 .../hdds/scm/storage/StreamBlockInputStream.java   | 747 +++++++++++++++++++++
 .../client/io/BlockInputStreamFactoryImpl.java     |  20 +
 .../scm/storage/DummyStreamBlockInputStream.java   | 143 ++++
 .../scm/storage/TestStreamBlockInputStream.java    | 264 ++++++++
 .../client/io/TestBlockInputStreamFactoryImpl.java |  15 +-
 .../org/apache/hadoop/hdds/DatanodeVersion.java    |   2 +
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   7 +
 .../ContainerCommandResponseBuilders.java          |  17 +
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  56 ++
 .../org/apache/hadoop/ozone/audit/DNAction.java    |   3 +-
 .../container/common/impl/HddsDispatcher.java      |  79 +++
 .../common/interfaces/ContainerDispatcher.java     |  12 +
 .../ozone/container/common/interfaces/Handler.java |   7 +
 .../transport/server/GrpcXceiverService.java       |   8 +-
 .../transport/server/ratis/DispatcherContext.java  |   7 +
 .../ozone/container/keyvalue/KeyValueHandler.java  | 115 +++-
 .../container/keyvalue/TestKeyValueHandler.java    |  93 +++
 .../src/main/proto/DatanodeClientProtocol.proto    |  16 +
 .../rpc/read/TestStreamBlockInputStream.java       | 245 +++++++
 21 files changed, 1937 insertions(+), 11 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 5dc44f4d4ec..7329f2c16b7 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -113,6 +113,13 @@ public class OzoneClientConfig {
       tags = ConfigTag.CLIENT)
   private long streamBufferMaxSize = 32 * 1024 * 1024;
 
+  @Config(key = "stream.readblock.enable",
+      defaultValue = "false",
+      type = ConfigType.BOOLEAN,
+      description = "Allow ReadBlock to stream all the readChunk in one 
request.",
+      tags = ConfigTag.CLIENT)
+  private boolean streamReadBlock = false;
+
   @Config(key = "ozone.client.max.retries",
       defaultValue = "5",
       description = "Maximum number of retries by Ozone Client on "
@@ -151,7 +158,7 @@ public class OzoneClientConfig {
       description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
           + "determines which algorithm would be used to compute checksum for "
           + "chunk data. Default checksum type is CRC32.",
-      tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE })
+      tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE})
   private String checksumType = ChecksumType.CRC32.name();
 
   @Config(key = "ozone.client.bytes.per.checksum",
@@ -160,7 +167,7 @@ public class OzoneClientConfig {
       description = "Checksum will be computed for every bytes per checksum "
           + "number of bytes and stored sequentially. The minimum value for "
           + "this config is 8KB.",
-      tags = { ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE })
+      tags = {ConfigTag.CLIENT, ConfigTag.CRYPTO_COMPLIANCE})
   private int bytesPerChecksum = 16 * 1024;
 
   @Config(key = "ozone.client.verify.checksum",
@@ -538,6 +545,14 @@ public int getMaxConcurrentWritePerKey() {
     return this.maxConcurrentWritePerKey;
   }
 
+  public boolean isStreamReadBlock() {
+    return streamReadBlock;
+  }
+
+  public void setStreamReadBlock(boolean streamReadBlock) {
+    this.streamReadBlock = streamReadBlock;
+  }
+
   /**
    * Enum for indicating what mode to use when combining chunk and block
    * checksums to define an aggregate FileChecksum. This should be considered
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index b07cee4097c..09e01593feb 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -44,6 +44,8 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -451,7 +453,11 @@ private XceiverClientReply sendCommandWithRetry(
         // sendCommandAsyncCall will create a new channel and async stub
         // in case these don't exist for the specific datanode.
         reply.addDatanode(dn);
-        responseProto = sendCommandAsync(request, dn).getResponse().get();
+        if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
+          responseProto = sendCommandReadBlock(request, 
dn).getResponse().get();
+        } else {
+          responseProto = sendCommandAsync(request, dn).getResponse().get();
+        }
         if (validators != null && !validators.isEmpty()) {
           for (Validator validator : validators) {
             validator.accept(request, responseProto);
@@ -495,7 +501,7 @@ private XceiverClientReply sendCommandWithRetry(
       String message = "Failed to execute command {}";
       if (LOG.isDebugEnabled()) {
         LOG.debug(message + " on the pipeline {}.",
-                processForDebug(request), pipeline);
+            processForDebug(request), pipeline);
       } else {
         LOG.warn(message + " on the pipeline {}.",
                 request.getCmdType(), pipeline);
@@ -623,6 +629,69 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
     return new XceiverClientReply(replyFuture);
   }
 
+  public XceiverClientReply sendCommandReadBlock(
+      ContainerCommandRequestProto request, DatanodeDetails dn)
+      throws IOException, InterruptedException {
+
+    CompletableFuture<ContainerCommandResponseProto> future =
+        new CompletableFuture<>();
+    ContainerCommandResponseProto.Builder response =
+        ContainerCommandResponseProto.newBuilder();
+    ContainerProtos.ReadBlockResponseProto.Builder readBlock =
+        ContainerProtos.ReadBlockResponseProto.newBuilder();
+    checkOpen(dn);
+    DatanodeID dnId = dn.getID();
+    Type cmdType = request.getCmdType();
+    semaphore.acquire();
+    long requestTime = System.currentTimeMillis();
+    metrics.incrPendingContainerOpsMetrics(cmdType);
+
+    final StreamObserver<ContainerCommandRequestProto> requestObserver =
+        asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS)
+            .send(new StreamObserver<ContainerCommandResponseProto>() {
+              @Override
+              public void onNext(
+                  ContainerCommandResponseProto responseProto) {
+                if (responseProto.getResult() == Result.SUCCESS) {
+                  readBlock.addReadChunk(responseProto.getReadChunk());
+                } else {
+                  future.complete(
+                      ContainerCommandResponseProto.newBuilder(responseProto)
+                          .setCmdType(Type.ReadBlock).build());
+                }
+              }
+
+              @Override
+              public void onError(Throwable t) {
+                future.completeExceptionally(t);
+                metrics.decrPendingContainerOpsMetrics(cmdType);
+                metrics.addContainerOpsLatency(
+                    cmdType, Time.monotonicNow() - requestTime);
+                semaphore.release();
+              }
+
+              @Override
+              public void onCompleted() {
+                if (readBlock.getReadChunkCount() > 0) {
+                  future.complete(response.setReadBlock(readBlock)
+                      
.setCmdType(Type.ReadBlock).setResult(Result.SUCCESS).build());
+                }
+                if (!future.isDone()) {
+                  future.completeExceptionally(new IOException(
+                      "Stream completed but no reply for request " +
+                          processForDebug(request)));
+                }
+                metrics.decrPendingContainerOpsMetrics(cmdType);
+                metrics.addContainerOpsLatency(
+                    cmdType, System.currentTimeMillis() - requestTime);
+                semaphore.release();
+              }
+            });
+    requestObserver.onNext(request);
+    requestObserver.onCompleted();
+    return new XceiverClientReply(future);
+  }
+
   private synchronized void checkOpen(DatanodeDetails dn)
       throws IOException {
     if (closed) {
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
new file mode 100644
index 00000000000..3db7fd8f660
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link java.io.InputStream} called from KeyInputStream to read a block 
from the
+ * container.
+ */
+public class StreamBlockInputStream extends BlockExtendedInputStream
+    implements Seekable, CanUnbuffer, ByteBufferReadable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StreamBlockInputStream.class);
+  private final BlockID blockID;
+  private final long blockLength;
+  private final AtomicReference<Pipeline> pipelineRef =
+      new AtomicReference<>();
+  private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
+      new AtomicReference<>();
+  private XceiverClientFactory xceiverClientFactory;
+  private XceiverClientSpi xceiverClient;
+
+  private List<Long> bufferOffsets;
+  private int bufferIndex;
+  private long blockPosition = -1;
+  private List<ByteBuffer> buffers;
+  // Checks if the StreamBlockInputStream has already read data from the 
container.
+  private boolean allocated = false;
+  private long bufferOffsetWrtBlockData;
+  private long buffersSize;
+  private static final int EOF = -1;
+  private final List<XceiverClientSpi.Validator> validators;
+  private final boolean verifyChecksum;
+  private final Function<BlockID, BlockLocationInfo> refreshFunction;
+  private final RetryPolicy retryPolicy;
+  private int retries;
+
+  public StreamBlockInputStream(
+      BlockID blockID, long length, Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token,
+      XceiverClientFactory xceiverClientFactory,
+      Function<BlockID, BlockLocationInfo> refreshFunction,
+      OzoneClientConfig config) throws IOException {
+    this.blockID = blockID;
+    this.blockLength = length;
+    setPipeline(pipeline);
+    tokenRef.set(token);
+    this.xceiverClientFactory = xceiverClientFactory;
+    this.validators = ContainerProtocolCalls.toValidatorList(
+        (request, response) -> validateBlock(response));
+    this.verifyChecksum = config.isChecksumVerify();
+    this.refreshFunction = refreshFunction;
+    this.retryPolicy =
+        HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
+            TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
+
+  }
+
+  @Override
+  public BlockID getBlockID() {
+    return blockID;
+  }
+
+  @Override
+  public long getLength() {
+    return blockLength;
+  }
+
+  @Override
+  public synchronized long getPos() {
+    if (blockLength == 0) {
+      return 0;
+    }
+    if (blockPosition >= 0) {
+      return blockPosition;
+    }
+
+    if (buffersHaveData()) {
+      // BufferOffset w.r.t to BlockData + BufferOffset w.r.t buffers +
+      // Position of current Buffer
+      return bufferOffsetWrtBlockData + bufferOffsets.get(bufferIndex) +
+          buffers.get(bufferIndex).position();
+    }
+    if (allocated && !dataRemainingInBlock()) {
+      Preconditions.checkState(
+          bufferOffsetWrtBlockData + buffersSize == blockLength,
+          "EOF detected but not at the last byte of the chunk");
+      return blockLength;
+    }
+    if (buffersAllocated()) {
+      return bufferOffsetWrtBlockData + buffersSize;
+    }
+    return 0;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    int dataout = EOF;
+    int len = 1;
+    int available;
+    while (len > 0) {
+      try {
+        acquireClient();
+        available = prepareRead(1);
+        retries = 0;
+      } catch (SCMSecurityException ex) {
+        throw ex;
+      } catch (StorageContainerException e) {
+        handleStorageContainerException(e);
+        continue;
+      } catch (IOException ioe) {
+        handleIOException(ioe);
+        continue;
+      }
+      if (available == EOF) {
+        // There is no more data in the chunk stream. The buffers should have
+        // been released by now
+        Preconditions.checkState(buffers == null);
+      } else {
+        dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
+      }
+
+      len -= available;
+      if (bufferEOF()) {
+        releaseBuffers(bufferIndex);
+      }
+    }
+
+
+    return dataout;
+
+
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    // According to the JavaDocs for InputStream, it is recommended that
+    // subclasses provide an override of bulk read if possible for performance
+    // reasons.  In addition to performance, we need to do it for correctness
+    // reasons.  The Ozone REST service uses PipedInputStream and
+    // PipedOutputStream to relay HTTP response data between a Jersey thread 
and
+    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
+    // have a subtle dependency (bug?) on the wrapped stream providing separate
+    // implementations of single-byte read and bulk read.  Without this, get 
key
+    // responses might close the connection before writing all of the bytes
+    // advertised in the Content-Length.
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return 0;
+    }
+    int total = 0;
+    int available;
+    while (len > 0) {
+      try {
+        acquireClient();
+        available = prepareRead(len);
+        retries = 0;
+      } catch (SCMSecurityException ex) {
+        throw ex;
+      } catch (StorageContainerException e) {
+        handleStorageContainerException(e);
+        continue;
+      } catch (IOException ioe) {
+        handleIOException(ioe);
+        continue;
+      }
+      if (available == EOF) {
+        // There is no more data in the block stream. The buffers should have
+        // been released by now
+        Preconditions.checkState(buffers == null);
+        return total != 0 ? total : EOF;
+      }
+      buffers.get(bufferIndex).get(b, off + total, available);
+      len -= available;
+      total += available;
+
+      if (bufferEOF()) {
+        releaseBuffers(bufferIndex);
+      }
+    }
+    return total;
+
+  }
+
+  @Override
+  public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+    if (byteBuffer == null) {
+      throw new NullPointerException();
+    }
+    int len = byteBuffer.remaining();
+    if (len == 0) {
+      return 0;
+    }
+    int total = 0;
+    int available;
+    while (len > 0) {
+      try {
+        acquireClient();
+        available = prepareRead(len);
+        retries = 0;
+      } catch (SCMSecurityException ex) {
+        throw ex;
+      } catch (StorageContainerException e) {
+        handleStorageContainerException(e);
+        continue;
+      } catch (IOException ioe) {
+        handleIOException(ioe);
+        continue;
+      }
+      if (available == EOF) {
+        // There is no more data in the block stream. The buffers should have
+        // been released by now
+        Preconditions.checkState(buffers == null);
+        return total != 0 ? total : EOF;
+      }
+      ByteBuffer readBuf = buffers.get(bufferIndex);
+      ByteBuffer tmpBuf = readBuf.duplicate();
+      tmpBuf.limit(tmpBuf.position() + available);
+      byteBuffer.put(tmpBuf);
+      readBuf.position(tmpBuf.position());
+
+      len -= available;
+      total += available;
+
+      if (bufferEOF()) {
+        releaseBuffers(bufferIndex);
+      }
+    }
+    return total;
+  }
+
+  @Override
+  protected int readWithStrategy(ByteReaderStrategy strategy) throws 
IOException {
+    throw new NotImplementedException("readWithStrategy is not implemented.");
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    if (pos == 0 && blockLength == 0) {
+      // It is possible for length and pos to be zero in which case
+      // seek should return instead of throwing exception
+      return;
+    }
+    if (pos < 0 || pos > blockLength) {
+      throw new EOFException("EOF encountered at pos: " + pos + " for block: " 
+ blockID);
+    }
+
+    if (buffersHavePosition(pos)) {
+      // The bufferPosition is w.r.t the current block.
+      // Adjust the bufferIndex and position to the seeked position.
+      adjustBufferPosition(pos - bufferOffsetWrtBlockData);
+    } else {
+      blockPosition = pos;
+    }
+  }
+
+  @Override
+  public synchronized boolean seekToNewSource(long l) throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized void unbuffer() {
+    blockPosition = getPos();
+    releaseClient();
+    releaseBuffers();
+  }
+
+  private void setPipeline(Pipeline pipeline) throws IOException {
+    if (pipeline == null) {
+      return;
+    }
+    long replicaIndexes = 
pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();
+
+    if (replicaIndexes > 1) {
+      throw new IOException(String.format("Pipeline: %s has nodes containing 
different replica indexes.",
+          pipeline));
+    }
+
+    // irrespective of the container state, we will always read via Standalone
+    // protocol.
+    boolean okForRead =
+        pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
+            || pipeline.getType() == HddsProtos.ReplicationType.EC;
+    Pipeline readPipeline = okForRead ? pipeline : 
pipeline.copyForRead().toBuilder()
+        .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+            getLegacyFactor(pipeline.getReplicationConfig())))
+        .build();
+    pipelineRef.set(readPipeline);
+  }
+
+  protected synchronized void checkOpen() throws IOException {
+    if (xceiverClientFactory == null) {
+      throw new IOException("StreamBlockInputStream has been closed.");
+    }
+  }
+
+  protected synchronized void acquireClient() throws IOException {
+    checkOpen();
+    if (xceiverClient == null) {
+      final Pipeline pipeline = pipelineRef.get();
+      try {
+        xceiverClient = 
xceiverClientFactory.acquireClientForReadData(pipeline);
+      } catch (IOException ioe) {
+        LOG.warn("Failed to acquire client for pipeline {}, block {}",
+            pipeline, blockID);
+        throw ioe;
+      }
+    }
+  }
+
+  private synchronized int prepareRead(int len) throws IOException {
+    for (;;) {
+      if (blockPosition >= 0) {
+        if (buffersHavePosition(blockPosition)) {
+          // The current buffers have the seeked position. Adjust the buffer
+          // index and position to point to the buffer position.
+          adjustBufferPosition(blockPosition - bufferOffsetWrtBlockData);
+        } else {
+          // Read a required block data to fill the buffers with seeked
+          // position data
+          readDataFromContainer(len);
+        }
+      }
+      if (buffersHaveData()) {
+        // Data is available from buffers
+        ByteBuffer bb = buffers.get(bufferIndex);
+        return Math.min(len, bb.remaining());
+      } else if (dataRemainingInBlock()) {
+        // There is more data in the block stream which has not
+        // been read into the buffers yet.
+        readDataFromContainer(len);
+      } else {
+        // All available input from this block stream has been consumed.
+        return EOF;
+      }
+    }
+
+
+  }
+
+  private boolean buffersHavePosition(long pos) {
+    // Check if buffers have been allocated
+    if (buffersAllocated()) {
+      // Check if the current buffers cover the input position
+      // Released buffers should not be considered when checking if position
+      // is available
+      return pos >= bufferOffsetWrtBlockData +
+          bufferOffsets.get(0) &&
+          pos < bufferOffsetWrtBlockData + buffersSize;
+    }
+    return false;
+  }
+
+  /**
+   * Check if the buffers have been allocated data and false otherwise.
+   */
+  @VisibleForTesting
+  protected boolean buffersAllocated() {
+    return buffers != null && !buffers.isEmpty();
+  }
+
+  /**
+   * Adjust the buffers position to account for seeked position and/ or 
checksum
+   * boundary reads.
+   * @param bufferPosition the position to which the buffers must be advanced
+   */
+  private void adjustBufferPosition(long bufferPosition) {
+    // The bufferPosition is w.r.t the current buffers.
+    // Adjust the bufferIndex and position to the seeked bufferPosition.
+    bufferIndex = Collections.binarySearch(bufferOffsets, bufferPosition);
+    // bufferIndex is negative if bufferPosition isn't found in bufferOffsets
+    // count (bufferIndex = -bufferIndex - 2) to get bufferPosition is between 
which offsets.
+    if (bufferIndex < 0) {
+      bufferIndex = -bufferIndex - 2;
+    }
+
+    buffers.get(bufferIndex).position(
+        (int) (bufferPosition - bufferOffsets.get(bufferIndex)));
+
+    // Reset buffers > bufferIndex to position 0. We do this to reset any
+    // previous reads/ seeks which might have updated any buffer position.
+    // For buffers < bufferIndex, we do not need to reset the position as it
+    // not required for this read. If a seek was done to a position in the
+    // previous indices, the buffer position reset would be performed in the
+    // seek call.
+    for (int i = bufferIndex + 1; i < buffers.size(); i++) {
+      buffers.get(i).position(0);
+    }
+
+    // Reset the blockPosition as chunk stream has been initialized i.e. the
+    // buffers have been allocated.
+    blockPosition = -1;
+  }
+
+  /**
+   * Reads full or partial Chunk from DN Container based on the current
+   * position of the ChunkInputStream, the number of bytes of data to read
+   * and the checksum boundaries.
+   * If successful, then the read data in saved in the buffers so that
+   * subsequent read calls can utilize it.
+   * @param len number of bytes of data to be read
+   * @throws IOException if there is an I/O error while performing the call
+   * to Datanode
+   */
+  private synchronized void readDataFromContainer(int len) throws IOException {
+    // index of first byte to be read from the block
+    long startByteIndex;
+    if (blockPosition >= 0) {
+      // If seek operation was called to advance the buffer position, the
+      // chunk should be read from that position onwards.
+      startByteIndex = blockPosition;
+    } else {
+      // Start reading the block from the last blockPosition onwards.
+      startByteIndex = bufferOffsetWrtBlockData + buffersSize;
+    }
+
+    // bufferOffsetWrtChunkData and buffersSize are updated after the data
+    // is read from Container and put into the buffers, but if read fails
+    // and is retried, we need the previous position.  Position is reset after
+    // successful read in adjustBufferPosition()
+    blockPosition = getPos();
+    bufferOffsetWrtBlockData = readData(startByteIndex, len);
+    long tempOffset = 0L;
+    buffersSize = 0L;
+    bufferOffsets = new ArrayList<>(buffers.size());
+    for (ByteBuffer buffer : buffers) {
+      bufferOffsets.add(tempOffset);
+      tempOffset += buffer.limit();
+      buffersSize += buffer.limit();
+
+    }
+    bufferIndex = 0;
+    allocated = true;
+    adjustBufferPosition(startByteIndex - bufferOffsetWrtBlockData);
+
+  }
+
+  @VisibleForTesting
+  protected long readData(long startByteIndex, long len)
+      throws IOException {
+    Pipeline pipeline = pipelineRef.get();
+    buffers = new ArrayList<>();
+    ReadBlockResponseProto response =
+        ContainerProtocolCalls.readBlock(xceiverClient, startByteIndex,
+        len, blockID, validators, tokenRef.get(), 
pipeline.getReplicaIndexes(), verifyChecksum);
+    List<ReadChunkResponseProto> readBlocks = response.getReadChunkList();
+
+    for (ReadChunkResponseProto readBlock : readBlocks) {
+      if (readBlock.hasDataBuffers()) {
+        buffers.addAll(BufferUtils.getReadOnlyByteBuffers(
+            readBlock.getDataBuffers().getBuffersList()));
+      } else {
+        throw new IOException("Unexpected error while reading chunk data " +
+            "from container. No data returned.");
+      }
+    }
+    return response.getReadChunk(0)
+        .getChunkData().getOffset();
+  }
+
+  /**
+   * Check if the buffers have any data remaining between the current
+   * position and the limit.
+   */
+  private boolean buffersHaveData() {
+    boolean hasData = false;
+    if (buffersAllocated()) {
+      int buffersLen = buffers.size();
+      while (bufferIndex < buffersLen) {
+        ByteBuffer buffer = buffers.get(bufferIndex);
+        if (buffer != null && buffer.hasRemaining()) {
+          // current buffer has data
+          hasData = true;
+          break;
+        } else {
+          if (bufferIndex < buffersLen - 1) {
+            // move to next available buffer
+            ++bufferIndex;
+            Preconditions.checkState(bufferIndex < buffers.size());
+          } else {
+            // no more buffers remaining
+            break;
+          }
+        }
+      }
+    }
+
+    return hasData;
+  }
+
+  /**
+   * Check if there is more data in the chunk which has not yet been read
+   * into the buffers.
+   */
+  private boolean dataRemainingInBlock() {
+    long bufferPos;
+    if (blockPosition >= 0) {
+      bufferPos = blockPosition;
+    } else {
+      bufferPos = bufferOffsetWrtBlockData + buffersSize;
+    }
+
+    return bufferPos < blockLength;
+  }
+
+  /**
+   * Check if current buffer had been read till the end.
+   */
+  private boolean bufferEOF() {
+    return allocated && buffersAllocated() && 
!buffers.get(bufferIndex).hasRemaining();
+  }
+
+  /**
+   * Release the buffers upto the given index.
+   * @param releaseUptoBufferIndex bufferIndex (inclusive) upto which the
+   *                               buffers must be released
+   */
+  private void releaseBuffers(int releaseUptoBufferIndex) {
+    int buffersLen = buffers.size();
+    if (releaseUptoBufferIndex == buffersLen - 1) {
+      // Before releasing all the buffers, if block EOF is not reached, then
+      // blockPosition should be set to point to the last position of the
+      // buffers. This should be done so that getPos() can return the current
+      // block position
+      blockPosition = bufferOffsetWrtBlockData +
+          bufferOffsets.get(releaseUptoBufferIndex) +
+          buffers.get(releaseUptoBufferIndex).capacity();
+      // Release all the buffers
+      releaseBuffers();
+    } else {
+      buffers = buffers.subList(releaseUptoBufferIndex + 1, buffersLen);
+      bufferOffsets = bufferOffsets.subList(
+          releaseUptoBufferIndex + 1, buffersLen);
+      bufferIndex = 0;
+    }
+  }
+
+  /**
+   * If EOF is reached, release the buffers.
+   */
+  private void releaseBuffers() {
+    buffers = null;
+    bufferIndex = 0;
+    // We should not reset bufferOffsetWrtBlockData and buffersSize here
+    // because when getPos() is called we use these
+    // values and determine whether chunk is read completely or not.
+  }
+
+  protected synchronized void releaseClient() {
+    if (xceiverClientFactory != null && xceiverClient != null) {
+      xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
+      xceiverClient = null;
+    }
+  }
+
+  private void validateBlock(
+      ContainerProtos.ContainerCommandResponseProto response
+  ) throws IOException {
+
+    ReadBlockResponseProto readBlock = response.getReadBlock();
+    for (ReadChunkResponseProto readChunk : readBlock.getReadChunkList()) {
+      List<ByteString> byteStrings;
+
+      ContainerProtos.ChunkInfo chunkInfo =
+          readChunk.getChunkData();
+      if (chunkInfo.getLen() <= 0) {
+        throw new IOException("Failed to get chunk: chunkName == "
+            + chunkInfo.getChunkName() + "len == " + chunkInfo.getLen());
+      }
+      byteStrings = readChunk.getDataBuffers().getBuffersList();
+      long buffersLen = BufferUtils.getBuffersLen(byteStrings);
+      if (buffersLen != chunkInfo.getLen()) {
+        // Bytes read from chunk should be equal to chunk size.
+        throw new OzoneChecksumException(String.format(
+            "Inconsistent read for chunk=%s len=%d bytesRead=%d",
+            chunkInfo.getChunkName(), chunkInfo.getLen(),
+            buffersLen));
+      }
+
+
+      if (verifyChecksum) {
+        ChecksumData checksumData = ChecksumData.getFromProtoBuf(
+            chunkInfo.getChecksumData());
+        int startIndex = (int) readChunk.getChunkData().getOffset() / 
checksumData.getBytesPerChecksum();
+
+        // ChecksumData stores checksum for each 'numBytesPerChecksum'
+        // number of bytes in a list. Compute the index of the first
+        // checksum to match with the read data
+
+        Checksum.verifyChecksum(byteStrings, checksumData, startIndex);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  protected synchronized void setBuffers(List<ByteBuffer> buffers) {
+    this.buffers = buffers;
+  }
+
+  private boolean shouldRetryRead(IOException cause) throws IOException {
+    RetryPolicy.RetryAction retryAction;
+    try {
+      retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true);
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
+  }
+
+  @VisibleForTesting
+  public boolean isVerifyChecksum() {
+    return verifyChecksum;
+  }
+
+  private void refreshBlockInfo(IOException cause) throws IOException {
+    LOG.info("Attempting to update pipeline and block token for block {} from 
pipeline {}: {}",
+        blockID, pipelineRef.get().getId(), cause.getMessage());
+    if (refreshFunction != null) {
+      LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
+      BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
+      if (blockLocationInfo == null) {
+        LOG.warn("No new block location info for block {}", blockID);
+      } else {
+        setPipeline(blockLocationInfo.getPipeline());
+        LOG.info("New pipeline for block {}: {}", blockID,
+            blockLocationInfo.getPipeline());
+
+        tokenRef.set(blockLocationInfo.getToken());
+        if (blockLocationInfo.getToken() != null) {
+          OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
+          tokenId.readFromByteArray(tokenRef.get().getIdentifier());
+          LOG.info("A new token is added for block {}. Expiry: {}",
+              blockID, Instant.ofEpochMilli(tokenId.getExpiryDate()));
+        }
+      }
+    } else {
+      throw cause;
+    }
+  }
+
+  @VisibleForTesting
+  public synchronized ByteBuffer[] getCachedBuffers() {
+    return buffers == null ? null :
+        BufferUtils.getReadOnlyByteBuffers(buffers.toArray(new ByteBuffer[0]));
+  }
+
+  /**
+   * Check if this exception is because datanodes are not reachable.
+   */
+  private boolean isConnectivityIssue(IOException ex) {
+    return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    releaseClient();
+    releaseBuffers();
+    xceiverClientFactory = null;
+  }
+
+  private void handleStorageContainerException(StorageContainerException e) 
throws IOException {
+    if (shouldRetryRead(e)) {
+      releaseClient();
+      refreshBlockInfo(e);
+    } else {
+      throw e;
+    }
+  }
+
+  private void handleIOException(IOException ioe) throws IOException {
+    if (shouldRetryRead(ioe)) {
+      if (isConnectivityIssue(ioe)) {
+        releaseClient();
+        refreshBlockInfo(ioe);
+      } else {
+        releaseClient();
+      }
+    } else {
+      throw ioe;
+    }
+  }
+}
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 3fbee6be871..9249abaf42a 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.ozone.client.io;
 
+import static org.apache.hadoop.hdds.DatanodeVersion.STREAM_BLOCK_SUPPORT;
+
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -25,6 +27,7 @@
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
@@ -32,6 +35,7 @@
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.ElasticByteBufferPool;
@@ -85,6 +89,10 @@ public BlockExtendedInputStream create(ReplicationConfig 
repConfig,
       return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
           blockInfo, xceiverFactory, refreshFunction,
           ecBlockStreamFactory, config);
+    } else if (config.isStreamReadBlock() && 
allDataNodesSupportStreamBlock(pipeline)) {
+      return new StreamBlockInputStream(
+          blockInfo.getBlockID(), blockInfo.getLength(),
+          pipeline, token, xceiverFactory, refreshFunction, config);
     } else {
       return new BlockInputStream(blockInfo,
           pipeline, token, xceiverFactory, refreshFunction,
@@ -92,4 +100,16 @@ public BlockExtendedInputStream create(ReplicationConfig 
repConfig,
     }
   }
 
+  private boolean allDataNodesSupportStreamBlock(Pipeline pipeline) {
+    // return true only if all DataNodes in the pipeline are on a version
+    // that supports for reading a block by streaming chunks..
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      if (dn.getCurrentVersion() <
+          STREAM_BLOCK_SUPPORT.toProtoValue()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
 }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
new file mode 100644
index 00000000000..e141845954d
--- /dev/null
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.hadoop.hdds.client.BlockID;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+/**
+ * A dummy StreamBlockInputStream to mock read block call to DN.
+ */
+class DummyStreamBlockInputStream extends StreamBlockInputStream {
+
+  private final List<ByteString> readByteBuffers = new ArrayList<>();
+  private final List<ChunkInfo> chunks;
+  private final long[] chunkOffsets;
+  private final Map<String, byte[]> chunkDataMap;
+
+  @SuppressWarnings("parameternumber")
+  DummyStreamBlockInputStream(
+      BlockID blockId,
+      long blockLen,
+      Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token,
+      XceiverClientFactory xceiverClientManager,
+      Function<BlockID, BlockLocationInfo> refreshFunction,
+      OzoneClientConfig config,
+      List<ChunkInfo> chunks,
+      Map<String, byte[]> chunkDataMap) throws IOException {
+    super(blockId, blockLen, pipeline, token, xceiverClientManager,
+        refreshFunction, config);
+    this.chunks = chunks;
+    this.chunkDataMap = chunkDataMap;
+    chunkOffsets = new long[chunks.size()];
+    long temp = 0;
+    for (int i = 0; i < chunks.size(); i++) {
+      chunkOffsets[i] = temp;
+      temp += chunks.get(i).getLen();
+    }
+  }
+
+  @Override
+  protected synchronized void checkOpen() throws IOException {
+    // No action needed
+  }
+
+  @Override
+  protected void acquireClient() {
+    // No action needed
+  }
+
+  @Override
+  protected void releaseClient() {
+    // no-op
+  }
+
+  @Override
+  protected long readData(long offset, long len) {
+    int chunkIndex = Arrays.binarySearch(chunkOffsets, offset);
+    if (chunkIndex < 0) {
+      chunkIndex = -chunkIndex - 2;
+    }
+    ChunkInfo chunkInfo = chunks.get(chunkIndex);
+    readByteBuffers.clear();
+    long chunkOffset = offset - chunkInfo.getOffset();
+    if (isVerifyChecksum()) {
+      ChecksumData checksumData = ChecksumData.getFromProtoBuf(
+          chunkInfo.getChecksumData());
+      int bytesPerChecksum = checksumData.getBytesPerChecksum();
+      chunkOffset = (chunkOffset / bytesPerChecksum) * bytesPerChecksum;
+    }
+    long bufferOffsetWrtBlockDataData = chunkOffsets[chunkIndex] + chunkOffset;
+    while (len > 0) {
+      ChunkInfo currentChunk = chunks.get(chunkIndex);
+      int bufferCapacity = 
currentChunk.getChecksumData().getBytesPerChecksum();
+      long chunkLen = currentChunk.getLen();
+      long remainingToRead = Math.min(chunkLen, len);
+      if (isVerifyChecksum()) {
+        if (len < chunkLen) {
+          final ChecksumData checksumData = 
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
+          final long endByteIndex = len - 1;
+          final int bytesPerChecksum = checksumData.getBytesPerChecksum();
+          remainingToRead = (endByteIndex / bytesPerChecksum + 1) * 
bytesPerChecksum;
+        } else {
+          remainingToRead = chunkLen;
+        }
+      }
+
+      long bufferLen;
+      while (remainingToRead > 0) {
+        if (remainingToRead < bufferCapacity) {
+          bufferLen = remainingToRead;
+        } else {
+          bufferLen = bufferCapacity;
+        }
+        ByteString byteString = 
ByteString.copyFrom(chunkDataMap.get(chunks.get(chunkIndex).getChunkName()),
+            (int) chunkOffset, (int) bufferLen);
+
+        readByteBuffers.add(byteString);
+
+        chunkOffset += bufferLen;
+        remainingToRead -= bufferLen;
+        len -= bufferLen;
+      }
+      chunkOffset = 0;
+      chunkIndex++;
+    }
+    setBuffers(BufferUtils.getReadOnlyByteBuffers(readByteBuffers));
+    return bufferOffsetWrtBlockDataData;
+  }
+
+  public List<ByteString> getReadByteBuffers() {
+    return readByteBuffers;
+  }
+}
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
new file mode 100644
index 00000000000..81cb6d4d62c
--- /dev/null
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.primitives.Bytes;
+import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Function;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link TestStreamBlockInputStream}'s functionality.
+ */
+public class TestStreamBlockInputStream {
+  private int blockSize;
+  private static final int CHUNK_SIZE = 100;
+  private static final int BYTES_PER_CHECKSUM = 20;
+  private static final Random RANDOM = new Random();
+  private DummyStreamBlockInputStream blockStream;
+  private byte[] blockData;
+  private List<ChunkInfo> chunks;
+  private Map<String, byte[]> chunkDataMap;
+  private Checksum checksum;
+  private BlockID blockID;
+  private static final String CHUNK_NAME = "chunk-";
+  private OzoneConfiguration conf = new OzoneConfiguration();
+
+  @BeforeEach
+  public void setup() throws Exception {
+    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+    clientConfig.setStreamReadBlock(true);
+    blockID = new BlockID(new ContainerBlockID(1, 1));
+    checksum = new Checksum(ChecksumType.CRC32, BYTES_PER_CHECKSUM);
+    createChunkList(5);
+
+    Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+    blockStream = new DummyStreamBlockInputStream(blockID, blockSize, pipeline,
+        null, null, mock(Function.class), clientConfig, chunks, chunkDataMap);
+  }
+
+  /**
+   * Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
+   * and the last chunk with length CHUNK_SIZE/2.
+   */
+  private void createChunkList(int numChunks)
+      throws Exception {
+
+    chunks = new ArrayList<>(numChunks);
+    chunkDataMap = new HashMap<>();
+    blockData = new byte[0];
+    int i, chunkLen;
+    byte[] byteData;
+    String chunkName;
+
+    for (i = 0; i < numChunks; i++) {
+      chunkName = CHUNK_NAME + i;
+      chunkLen = CHUNK_SIZE;
+      if (i == numChunks - 1) {
+        chunkLen = CHUNK_SIZE / 2;
+      }
+      byteData = generateRandomData(chunkLen);
+      ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+          .setChunkName(chunkName)
+          .setOffset(0)
+          .setLen(chunkLen)
+          .setChecksumData(checksum.computeChecksum(
+              byteData, 0, chunkLen).getProtoBufMessage())
+          .build();
+
+      chunkDataMap.put(chunkName, byteData);
+      chunks.add(chunkInfo);
+
+      blockSize += chunkLen;
+      blockData = Bytes.concat(blockData, byteData);
+    }
+  }
+
+  static byte[] generateRandomData(int length) {
+    byte[] bytes = new byte[length];
+    RANDOM.nextBytes(bytes);
+    return bytes;
+  }
+
+  /**
+   * Match readData with the chunkData byte-wise.
+   * @param readData Data read through ChunkInputStream
+   * @param inputDataStartIndex first index (inclusive) in chunkData to compare
+   *                            with read data
+   * @param length the number of bytes of data to match starting from
+   *               inputDataStartIndex
+   */
+  private void matchWithInputData(byte[] readData, int inputDataStartIndex,
+                                  int length) {
+    for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
+      assertEquals(blockData[i], readData[i - inputDataStartIndex], "i: " + i);
+    }
+  }
+
+  private void matchWithInputData(List<ByteString> byteStrings,
+                                  int inputDataStartIndex, int length) {
+    int offset = inputDataStartIndex;
+    int totalBufferLen = 0;
+    for (ByteString byteString : byteStrings) {
+      int bufferLen = byteString.size();
+      matchWithInputData(byteString.toByteArray(), offset, bufferLen);
+      offset += bufferLen;
+      totalBufferLen += bufferLen;
+    }
+    assertEquals(length, totalBufferLen);
+  }
+
+  /**
+   * Seek to a position and verify through getPos().
+   */
+  private void seekAndVerify(int pos) throws Exception {
+    blockStream.seek(pos);
+    assertEquals(pos, blockStream.getPos(),
+        "Current position of buffer does not match with the sought position");
+  }
+
+  @Test
+  public void testFullChunkRead() throws Exception {
+    byte[] b = new byte[blockSize];
+    int numBytesRead = blockStream.read(b, 0, blockSize);
+    assertEquals(blockSize, numBytesRead);
+    matchWithInputData(b, 0, blockSize);
+  }
+
+  @Test
+  public void testPartialChunkRead() throws Exception {
+    int len = blockSize / 2;
+    byte[] b = new byte[len];
+
+    int numBytesRead = blockStream.read(b, 0, len);
+    assertEquals(len, numBytesRead);
+    matchWithInputData(b, 0, len);
+
+    // To read block data from index 0 to 225 (len = 225), we need to read
+    // chunk from offset 0 to 240 as the checksum boundary is at every 20
+    // bytes. Verify that 60 bytes of chunk data are read and stored in the
+    // buffers. Since checksum boundary is at every 20 bytes, there should be
+    // 240/20 number of buffers.
+    matchWithInputData(blockStream.getReadByteBuffers(), 0, 240);
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    seekAndVerify(0);
+    EOFException eofException = assertThrows(EOFException.class, () ->  
seekAndVerify(blockSize + 1));
+    assertThat(eofException).hasMessage("EOF encountered at pos: " + 
(blockSize + 1) + " for block: " + blockID);
+
+    // Seek before read should update the BlockInputStream#blockPosition
+    seekAndVerify(25);
+
+    // Read from the sought position.
+    // Reading from index 25 to 54 should result in the BlockInputStream
+    // copying chunk data from index 20 to 59 into the buffers (checksum
+    // boundaries).
+    byte[] b = new byte[30];
+    int numBytesRead = blockStream.read(b, 0, 30);
+    assertEquals(30, numBytesRead);
+    matchWithInputData(b, 25, 30);
+    matchWithInputData(blockStream.getReadByteBuffers(), 20, 40);
+
+    // After read, the position of the blockStream is evaluated from the
+    // buffers and the chunkPosition should be reset to -1.
+
+    // Only the last BYTES_PER_CHECKSUM will be cached in the buffers as
+    // buffers are released after each checksum boundary is read. So the
+    // buffers should contain data from index 40 to 59.
+    // Seek to a position within the cached buffers. BlockPosition should
+    // still not be used to set the position.
+    seekAndVerify(45);
+
+    // Seek to a position outside the current cached buffers. In this case, the
+    // chunkPosition should be updated to the seeked position.
+    seekAndVerify(75);
+
+    // Read upto checksum boundary should result in all the buffers being
+    // released and hence chunkPosition updated with current position of chunk.
+    seekAndVerify(25);
+    b = new byte[15];
+    numBytesRead = blockStream.read(b, 0, 15);
+    assertEquals(15, numBytesRead);
+    matchWithInputData(b, 25, 15);
+  }
+
+  @Test
+  public void testSeekAndRead() throws Exception {
+    // Seek to a position and read data
+    seekAndVerify(50);
+    byte[] b1 = new byte[20];
+    int numBytesRead = blockStream.read(b1, 0, 20);
+    assertEquals(20, numBytesRead);
+    matchWithInputData(b1, 50, 20);
+
+    // Next read should start from the position of the last read + 1 i.e. 70
+    byte[] b2 = new byte[20];
+    numBytesRead = blockStream.read(b2, 0, 20);
+    assertEquals(20, numBytesRead);
+    matchWithInputData(b2, 70, 20);
+
+    byte[] b3 = new byte[20];
+    seekAndVerify(80);
+    numBytesRead = blockStream.read(b3, 0, 20);
+    assertEquals(20, numBytesRead);
+    matchWithInputData(b3, 80, 20);
+  }
+
+  @Test
+  public void testUnbuffered() throws Exception {
+    byte[] b1 = new byte[20];
+    int numBytesRead = blockStream.read(b1, 0, 20);
+    assertEquals(20, numBytesRead);
+    matchWithInputData(b1, 0, 20);
+
+    blockStream.unbuffer();
+
+    assertFalse(blockStream.buffersAllocated());
+
+    // Next read should start from the position of the last read + 1 i.e. 20
+    byte[] b2 = new byte[20];
+    numBytesRead = blockStream.read(b2, 0, 20);
+    assertEquals(20, numBytesRead);
+    matchWithInputData(b2, 20, 20);
+  }
+
+}
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
index dbc42816036..2fd81fc91dc 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
@@ -39,7 +39,10 @@
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mockito;
 
 /**
@@ -49,8 +52,9 @@ public class TestBlockInputStreamFactoryImpl {
 
   private OzoneConfiguration conf = new OzoneConfiguration();
 
-  @Test
-  public void testNonECGivesBlockInputStream() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testNonECGivesBlockInputStream(boolean streamReadBlockEnabled) 
throws IOException {
     BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
     ReplicationConfig repConfig =
         RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
@@ -62,11 +66,16 @@ public void testNonECGivesBlockInputStream() throws 
IOException {
     
Mockito.when(pipeline.getReplicaIndex(any(DatanodeDetails.class))).thenReturn(1);
     OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
     clientConfig.setChecksumVerify(true);
+    clientConfig.setStreamReadBlock(streamReadBlockEnabled);
     BlockExtendedInputStream stream =
         factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
             blockInfo.getToken(), null, null,
             clientConfig);
-    assertInstanceOf(BlockInputStream.class, stream);
+    if (streamReadBlockEnabled) {
+      assertInstanceOf(StreamBlockInputStream.class, stream);
+    } else {
+      assertInstanceOf(BlockInputStream.class, stream);
+    }
     assertEquals(stream.getBlockID(), blockInfo.getBlockID());
     assertEquals(stream.getLength(), blockInfo.getLength());
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
index 4c0bb03c165..2717e8eb3d9 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
@@ -33,6 +33,8 @@ public enum DatanodeVersion implements ComponentVersion {
   SEPARATE_RATIS_PORTS_AVAILABLE(1, "Version with separated Ratis port."),
   COMBINED_PUTBLOCK_WRITECHUNK_RPC(2, "WriteChunk can optionally support " +
           "a PutBlock request"),
+  STREAM_BLOCK_SUPPORT(3,
+      "This version has support for reading a block by streaming chunks."),
 
   FUTURE_VERSION(-1, "Used internally in the client when the server side is "
       + " newer and an unknown server version has arrived to the client.");
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index c07a21680ef..78ff1feabca 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -418,6 +418,7 @@ public static boolean isReadOnly(
     switch (proto.getCmdType()) {
     case ReadContainer:
     case ReadChunk:
+    case ReadBlock:
     case ListBlock:
     case GetBlock:
     case GetSmallFile:
@@ -478,6 +479,7 @@ public static boolean requireBlockToken(Type cmdType) {
     case PutBlock:
     case PutSmallFile:
     case ReadChunk:
+    case ReadBlock:
     case WriteChunk:
     case FinalizeBlock:
       return true;
@@ -553,6 +555,11 @@ public static BlockID 
getBlockID(ContainerCommandRequestProtoOrBuilder msg) {
         blockID = msg.getReadChunk().getBlockID();
       }
       break;
+    case ReadBlock:
+      if (msg.hasReadBlock()) {
+        blockID = msg.getReadBlock().getBlockID();
+      }
+      break;
     case WriteChunk:
       if (msg.hasWriteChunk()) {
         blockID = msg.getWriteChunk().getBlockID();
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index 5e547898662..ededa8d070b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -334,6 +334,23 @@ public static ContainerCommandResponseProto 
getReadChunkResponse(
         .build();
   }
 
+  public static ContainerCommandResponseProto getReadBlockResponse(
+      ContainerCommandRequestProto request, DatanodeBlockID blockID,
+      ChunkInfo chunkInfo, ChunkBufferToByteString data,
+      Function<ByteBuffer, ByteString> byteBufferToByteString) {
+
+    ReadChunkResponseProto.Builder response;
+    response = ReadChunkResponseProto.newBuilder()
+        .setChunkData(chunkInfo)
+        .setDataBuffers(DataBuffers.newBuilder()
+            .addAllBuffers(data.toByteStringList(byteBufferToByteString))
+            .build())
+        .setBlockID(blockID);
+    return getSuccessResponseBuilder(request)
+        .setReadChunk(response)
+        .build();
+  }
+
   public static ContainerCommandResponseProto getFinalizeBlockResponse(
       ContainerCommandRequestProto msg, BlockData data) {
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index a934fc51372..1117d29c1aa 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -55,6 +55,8 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
@@ -904,4 +906,58 @@ public static List<Validator> toValidatorList(Validator 
validator) {
     return datanodeToResponseMap;
   }
 
+  /**
+   * Calls the container protocol to read a chunk.
+   *
+   * @param xceiverClient  client to perform call
+   * @param offset         offset where block starts
+   * @param len            length of data to read
+   * @param blockID        ID of the block
+   * @param validators     functions to validate the response
+   * @param token          a token for this block (may be null)
+   * @return container protocol read chunk response
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  public static ContainerProtos.ReadBlockResponseProto readBlock(
+      XceiverClientSpi xceiverClient, long offset, long len, BlockID blockID,
+      List<Validator> validators, Token<? extends TokenIdentifier> token,
+      Map<DatanodeDetails, Integer> replicaIndexes, boolean verifyChecksum) 
throws IOException {
+    final ReadBlockRequestProto.Builder readBlockRequest =
+        ReadBlockRequestProto.newBuilder()
+            .setOffset(offset)
+            .setVerifyChecksum(verifyChecksum)
+            .setLen(len);
+    final ContainerCommandRequestProto.Builder builder =
+        ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadBlock)
+            .setContainerID(blockID.getContainerID());
+    if (token != null) {
+      builder.setEncodedToken(token.encodeToUrlString());
+    }
+
+    return tryEachDatanode(xceiverClient.getPipeline(),
+        d -> readBlock(xceiverClient,
+            validators, blockID, builder, readBlockRequest, d, replicaIndexes),
+        d -> toErrorMessage(blockID, d));
+  }
+
+  private static ReadBlockResponseProto readBlock(XceiverClientSpi 
xceiverClient,
+                                                  List<Validator> validators, 
BlockID blockID,
+                                                  
ContainerCommandRequestProto.Builder builder,
+                                                  
ReadBlockRequestProto.Builder readBlockBuilder,
+                                                  DatanodeDetails datanode,
+                                                  Map<DatanodeDetails, 
Integer> replicaIndexes) throws IOException {
+    final DatanodeBlockID.Builder datanodeBlockID = 
blockID.getDatanodeBlockIDProtobufBuilder();
+    int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
+    if (replicaIndex > 0) {
+      datanodeBlockID.setReplicaIndex(replicaIndex);
+    }
+    readBlockBuilder.setBlockID(datanodeBlockID);
+    final ContainerCommandRequestProto request = builder
+        .setDatanodeUuid(datanode.getUuidString())
+        .setReadBlock(readBlockBuilder).build();
+    ContainerCommandResponseProto response =
+        xceiverClient.sendCommand(request, validators);
+    return response.getReadBlock();
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index 09117335617..61d1c49da04 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -43,7 +43,8 @@ public enum DNAction implements AuditAction {
   STREAM_INIT,
   FINALIZE_BLOCK,
   ECHO,
-  GET_CONTAINER_CHECKSUM_INFO;
+  GET_CONTAINER_CHECKSUM_INFO,
+  READ_BLOCK;
 
   @Override
   public String getAction() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index ea47c4945b8..e6be4a490b4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -24,6 +24,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
+import io.opentelemetry.api.trace.Span;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
@@ -52,6 +53,7 @@
 import org.apache.hadoop.hdds.security.token.NoopTokenVerifier;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 import org.apache.hadoop.ozone.audit.AuditAction;
 import org.apache.hadoop.ozone.audit.AuditEventStatus;
@@ -75,6 +77,8 @@
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.UncheckedAutoCloseable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -812,6 +816,80 @@ public StateMachine.DataChannel getStreamDataChannel(
     }
   }
 
+  @Override
+  public void streamDataReadOnly(ContainerCommandRequestProto msg,
+                                 StreamObserver<ContainerCommandResponseProto> 
streamObserver,
+                                 DispatcherContext dispatcherContext) {
+    Type cmdType = msg.getCmdType();
+    String traceID = msg.getTraceID();
+    Span span = TracingUtil.importAndCreateSpan(cmdType.toString(), traceID);
+    AuditAction action = getAuditAction(msg.getCmdType());
+    EventType eventType = getEventType(msg);
+
+    try (UncheckedAutoCloseable ignored = protocolMetrics.measure(cmdType)) {
+      Preconditions.checkNotNull(msg);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Command {}, trace ID: {}.", msg.getCmdType(),
+            traceID);
+      }
+
+      PerformanceStringBuilder perf = new PerformanceStringBuilder();
+      ContainerCommandResponseProto responseProto = null;
+      long containerID = msg.getContainerID();
+      Container container = getContainer(containerID);
+      long startTime = Time.monotonicNow();
+
+      if (DispatcherContext.op(dispatcherContext).validateToken()) {
+        validateToken(msg);
+      }
+      if (getMissingContainerSet().contains(containerID)) {
+        throw new StorageContainerException(
+            "ContainerID " + containerID
+                + " has been lost and and cannot be recreated on this 
DataNode",
+            ContainerProtos.Result.CONTAINER_MISSING);
+      }
+      if (container == null) {
+        throw new StorageContainerException(
+            "ContainerID " + containerID + " does not exist",
+            ContainerProtos.Result.CONTAINER_NOT_FOUND);
+      }
+      ContainerType containerType = getContainerType(container);
+      Handler handler = getHandler(containerType);
+      if (handler == null) {
+        throw new StorageContainerException("Invalid " +
+            "ContainerType " + containerType,
+            ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+      }
+      perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
+      responseProto = handler.readBlock(
+          msg, container, dispatcherContext, streamObserver);
+      long oPLatencyMS = Time.monotonicNow() - startTime;
+      metrics.incContainerOpsLatencies(cmdType, oPLatencyMS);
+      if (responseProto == null) {
+        audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.SUCCESS, null);
+      } else {
+        containerSet.scanContainer(containerID, "ReadBlock failed " + 
responseProto.getResult());
+        audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.FAILURE,
+            new Exception(responseProto.getMessage()));
+        streamObserver.onNext(responseProto);
+      }
+      perf.appendOpLatencyMs(oPLatencyMS);
+      performanceAudit(action, msg, dispatcherContext, perf, oPLatencyMS);
+
+    } catch (StorageContainerException sce) {
+      audit(action, eventType, msg, dispatcherContext, 
AuditEventStatus.FAILURE, sce);
+      streamObserver.onNext(ContainerUtils.logAndReturnError(LOG, sce, msg));
+    } catch (IOException ioe) {
+      final String s = ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED
+          + " for " + dispatcherContext + ": " + ioe.getMessage();
+      final StorageContainerException sce = new StorageContainerException(
+          s, ioe, ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
+      streamObserver.onNext(ContainerUtils.logAndReturnError(LOG, sce, msg));
+    } finally {
+      span.end();
+    }
+  }
+
   private static DNAction getAuditAction(Type cmdType) {
     switch (cmdType) {
     case CreateContainer  : return DNAction.CREATE_CONTAINER;
@@ -836,6 +914,7 @@ private static DNAction getAuditAction(Type cmdType) {
     case FinalizeBlock    : return DNAction.FINALIZE_BLOCK;
     case Echo             : return DNAction.ECHO;
     case GetContainerChecksumInfo: return DNAction.GET_CONTAINER_CHECKSUM_INFO;
+    case ReadBlock        : return DNAction.READ_BLOCK;
     default :
       LOG.debug("Invalid command type - {}", cmdType);
       return null;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index 1c3071a3791..d6c33455008 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -24,6 +24,7 @@
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 
 /**
  * Dispatcher acts as the bridge between the transport layer and
@@ -89,4 +90,15 @@ default StateMachine.DataChannel getStreamDataChannel(
     throw new UnsupportedOperationException(
         "getStreamDataChannel not supported.");
   }
+
+  /**
+   * When reading data form client by streaming chunks.
+   */
+  default void streamDataReadOnly(
+       ContainerCommandRequestProto msg,
+       StreamObserver<ContainerCommandResponseProto> streamObserver,
+       DispatcherContext dispatcherContext) {
+    throw new UnsupportedOperationException(
+         "streamDataReadOnly not supported.");
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 4337d667618..0abcab5afea 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -40,9 +40,11 @@
 import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 
 /**
  * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
@@ -264,4 +266,9 @@ public void setClusterID(String clusterID) {
     this.clusterId = clusterID;
   }
 
+  public abstract ContainerCommandResponseProto readBlock(
+      ContainerCommandRequestProto msg, Container container,
+      DispatcherContext dispatcherContext,
+      StreamObserver<ContainerCommandResponseProto> streamObserver);
+
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
index 6728744d147..041958b4227 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -104,8 +104,12 @@ public void onNext(ContainerCommandRequestProto request) {
             .build();
 
         try {
-          final ContainerCommandResponseProto resp = 
dispatcher.dispatch(request, context);
-          responseObserver.onNext(resp);
+          if (request.getCmdType() == Type.ReadBlock) {
+            dispatcher.streamDataReadOnly(request, responseObserver, null);
+          } else {
+            final ContainerCommandResponseProto resp = 
dispatcher.dispatch(request, context);
+            responseObserver.onNext(resp);
+          }
         } catch (Throwable e) {
           LOG.error("Got exception when processing"
                     + " ContainerCommandRequestProto {}", request, e);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
index f9ee0a4bd0f..e0f0ccbe193 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
@@ -34,6 +34,8 @@
 public final class DispatcherContext {
   private static final DispatcherContext HANDLE_READ_CHUNK
       = newBuilder(Op.HANDLE_READ_CHUNK).build();
+  private static final DispatcherContext HANDLE_READ_BLOCK
+      = newBuilder(Op.HANDLE_READ_BLOCK).build();
   private static final DispatcherContext HANDLE_WRITE_CHUNK
       = newBuilder(Op.HANDLE_WRITE_CHUNK).build();
   private static final DispatcherContext HANDLE_GET_SMALL_FILE
@@ -60,6 +62,10 @@ public static DispatcherContext getHandleReadChunk() {
     return HANDLE_READ_CHUNK;
   }
 
+  public static DispatcherContext getHandleReadBlock() {
+    return HANDLE_READ_BLOCK;
+  }
+
   public static DispatcherContext getHandleWriteChunk() {
     return HANDLE_WRITE_CHUNK;
   }
@@ -92,6 +98,7 @@ public enum Op {
     NULL,
 
     HANDLE_READ_CHUNK,
+    HANDLE_READ_BLOCK,
     HANDLE_WRITE_CHUNK,
     HANDLE_GET_SMALL_FILE,
     HANDLE_PUT_SMALL_FILE,
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 584cb98b367..ecf13cf12af 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -46,6 +46,7 @@
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess;
+import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadBlockResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse;
@@ -87,6 +88,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -102,6 +104,7 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -162,6 +165,7 @@
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -2046,6 +2050,115 @@ public void deleteUnreferenced(Container container, 
long localID)
     }
   }
 
+  @Override
+  public ContainerCommandResponseProto readBlock(
+      ContainerCommandRequestProto request, Container kvContainer,
+      DispatcherContext dispatcherContext,
+      StreamObserver<ContainerCommandResponseProto> streamObserver) {
+    ContainerCommandResponseProto responseProto = null;
+    if (!request.hasReadBlock()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Malformed Read Block request. trace ID: {}",
+            request.getTraceID());
+      }
+      return malformedRequest(request);
+    }
+    try {
+      ReadBlockRequestProto readBlock = request.getReadBlock();
+
+      BlockID blockID = BlockID.getFromProtobuf(
+          readBlock.getBlockID());
+      // This is a new api the block should always be checked.
+      BlockUtils.verifyReplicaIdx(kvContainer, blockID);
+      BlockUtils.verifyBCSId(kvContainer, blockID);
+
+      BlockData blockData = getBlockManager().getBlock(kvContainer, blockID);
+      List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+      long blockOffset = 0;
+      int chunkIndex = -1;
+      long chunkOffset = 0;
+      long offset = readBlock.getOffset();
+      for (int i = 0; i < chunkInfos.size(); i++) {
+        final long chunkLen = chunkInfos.get(i).getLen();
+        blockOffset += chunkLen;
+        if (blockOffset > offset) {
+          chunkIndex = i;
+          chunkOffset = offset - blockOffset + chunkLen;
+          break;
+        }
+      }
+      Preconditions.checkState(chunkIndex >= 0);
+
+      if (dispatcherContext == null) {
+        dispatcherContext = DispatcherContext.getHandleReadBlock();
+      }
+
+      ChunkBufferToByteString data;
+
+      long len =  readBlock.getLen();
+      long adjustedChunkOffset, adjustedChunkLen;
+      do {
+        ContainerProtos.ChunkInfo chunk = chunkInfos.get(chunkIndex);
+        if (readBlock.getVerifyChecksum()) {
+          Pair<Long, Long> adjustedOffsetAndLength =
+              computeChecksumBoundaries(chunk, chunkOffset, len);
+          adjustedChunkOffset = adjustedOffsetAndLength.getLeft();
+          adjustedChunkLen = adjustedOffsetAndLength.getRight();
+          adjustedChunkOffset += chunk.getOffset();
+        } else {
+          adjustedChunkOffset = chunkOffset;
+          adjustedChunkLen = Math.min(
+              chunk.getLen() + chunk.getOffset() - chunkOffset, len);
+        }
+
+        ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
+            ContainerProtos.ChunkInfo.newBuilder(chunk)
+                .setOffset(adjustedChunkOffset)
+                .setLen(adjustedChunkLen).build());
+        BlockUtils.verifyReplicaIdx(kvContainer, blockID);
+        BlockUtils.verifyBCSId(kvContainer, blockID);
+        data = getChunkManager().readChunk(
+            kvContainer, blockID, chunkInfo, dispatcherContext);
+
+        Preconditions.checkNotNull(data, "Chunk data is null");
+        streamObserver.onNext(
+            getReadBlockResponse(request,
+                blockData.getProtoBufMessage().getBlockID(),
+                chunkInfo.getProtoBufMessage(),
+                data, byteBufferToByteString));
+        len -= adjustedChunkLen + adjustedChunkOffset - chunkOffset;
+        chunkOffset = 0;
+        chunkIndex++;
+      } while (len > 0);
+
+      metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen());
+    } catch (StorageContainerException ex) {
+      responseProto = ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ioe) {
+      responseProto = ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Read Block failed", ioe, 
IO_EXCEPTION),
+          request);
+    }
+    return responseProto;
+  }
+
+  private Pair<Long, Long> computeChecksumBoundaries(
+      ContainerProtos.ChunkInfo chunkInfo, long startByteIndex, long dataLen) {
+
+    int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum();
+
+    // index of the last byte to be read from chunk, inclusively.
+    final long endByteIndex = startByteIndex + dataLen - 1;
+
+    long adjustedChunkOffset = (startByteIndex / bytesPerChecksum)
+        * bytesPerChecksum; // inclusive
+    final long endIndex = ((endByteIndex / bytesPerChecksum) + 1)
+        * bytesPerChecksum; // exclusive
+    long adjustedChunkLen =
+        Math.min(endIndex, chunkInfo.getLen()) - adjustedChunkOffset;
+    return Pair.of(adjustedChunkOffset, adjustedChunkLen);
+  }
+
   @Override
   public void addFinalizedBlock(Container container, long localID) {
     KeyValueContainer keyValueContainer = (KeyValueContainer)container;
@@ -2084,7 +2197,7 @@ private boolean logBlocksIfNonZero(Container container)
       }
       if (nonZero) {
         LOG.error("blocks in rocksDB on container delete: {}",
-            stringBuilder.toString());
+            stringBuilder);
       }
     }
     return nonZero;
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 6afee1c5d77..efede65e524 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -42,8 +42,10 @@
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -53,9 +55,11 @@
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Clock;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -68,22 +72,29 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
 import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -94,22 +105,28 @@
 import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
 import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
 import org.apache.hadoop.util.Time;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.GenericTestUtils.LogCapturer;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -127,7 +144,10 @@ public class TestKeyValueHandler {
   private Path dbFile;
 
   private static final long DUMMY_CONTAINER_ID = 9999;
+  private static final long LOCAL_ID = 1;
   private static final String DUMMY_PATH = "dummy/dir/doesnt/exist";
+  private static final long CHUNK_SIZE = 1024 * 1024;          // 1MB
+  private static final long BYTES_PER_CHECKSUM = 256 * 1024;
   private static final String DATANODE_UUID = UUID.randomUUID().toString();
   private static final String CLUSTER_ID = UUID.randomUUID().toString();
 
@@ -938,4 +958,77 @@ private KeyValueHandler createKeyValueHandler(Path path) 
throws IOException {
 
     return kvHandler;
   }
+
+  @Test
+  public void testReadBlock() throws IOException {
+
+    StreamObserver<ContainerCommandResponseProto> streamObserver = 
mock(StreamObserver.class);
+    KeyValueContainer container = mock(KeyValueContainer.class);
+    final KeyValueHandler kvHandler = new KeyValueHandler(new 
OzoneConfiguration(),
+        UUID.randomUUID().toString(), mock(ContainerSet.class), 
mock(VolumeSet.class), mock(ContainerMetrics.class),
+        mock(IncrementalReportSender.class), 
mock(ContainerChecksumTreeManager.class));
+    final KeyValueHandler keyValueHandler = spy(kvHandler);
+    DispatcherContext dispatcherContext = mock(DispatcherContext.class);
+
+    List<ContainerProtos.ChunkInfo> chunkInfoList = new ArrayList<>();
+    BlockData blockData = new BlockData(new BlockID(1, 1));
+    for (int i = 0; i < 4; i++) {
+      chunkInfoList.add(ContainerProtos.ChunkInfo
+          .newBuilder()
+          .setOffset(0)
+          .setLen(CHUNK_SIZE)
+          .setChecksumData(
+              ChecksumData.newBuilder().setBytesPerChecksum((int) 
BYTES_PER_CHECKSUM)
+                  .setType(ChecksumType.CRC32).build())
+          .setChunkName("chunkName" + i)
+          .build());
+    }
+    blockData.setChunks(chunkInfoList);
+
+    try (MockedStatic<BlockUtils> blockUtils = mockStatic(BlockUtils.class)) {
+      BlockManager blockManager = mock(BlockManager.class);
+      ChunkManager chunkManager = mock(ChunkManager.class);
+      when(keyValueHandler.getBlockManager()).thenReturn(blockManager);
+      when(keyValueHandler.getChunkManager()).thenReturn(chunkManager);
+      when(blockManager.getBlock(any(), any())).thenReturn(blockData);
+      ChunkBuffer data = ChunkBuffer.wrap(ByteBuffer.allocate(0));
+      when(chunkManager.readChunk(any(), any(),
+          any(), any()))
+          .thenReturn(data);
+      testReadBlock(0, 1, keyValueHandler, dispatcherContext,
+          streamObserver, container);
+      testReadBlock(0, CHUNK_SIZE + 1, keyValueHandler, dispatcherContext,
+          streamObserver, container);
+      testReadBlock(CHUNK_SIZE / 2, 2 * CHUNK_SIZE, keyValueHandler, 
dispatcherContext,
+          streamObserver, container);
+    }
+  }
+
+  private static ContainerCommandRequestProto readBlockRequest(
+      long offset, long length) {
+    return ContainerCommandRequestProto.newBuilder()
+        .setCmdType(Type.ReadBlock)
+        .setReadBlock(
+            ContainerProtos.ReadBlockRequestProto.newBuilder()
+                .setBlockID(
+                    ContainerProtos.DatanodeBlockID.newBuilder()
+                        .setContainerID(DUMMY_CONTAINER_ID)
+                        .setLocalID(LOCAL_ID))
+                .setOffset(offset)
+                .setLen(length)
+                .setVerifyChecksum(true))
+        .setContainerID(DUMMY_CONTAINER_ID)
+        .setDatanodeUuid(UUID.randomUUID().toString())
+        .build();
+  }
+
+  private static void testReadBlock(
+      long offset, long length, KeyValueHandler keyValueHandler, 
DispatcherContext dispatcherContext,
+      StreamObserver<ContainerCommandResponseProto> streamObserver, 
KeyValueContainer container) {
+    int responseCount = (int) (((offset + length - 1) / CHUNK_SIZE) + 1 - 
(offset / CHUNK_SIZE));
+    ContainerCommandRequestProto requestProto = readBlockRequest(offset, 
length);
+    keyValueHandler.readBlock(requestProto, container, dispatcherContext, 
streamObserver);
+    verify(streamObserver, times(responseCount)).onNext(any());
+    clearInvocations(streamObserver);
+  }
 }
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index bd890eae64a..c5548244560 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -77,6 +77,8 @@ package hadoop.hdds.datanode;
  *  18. CopyContainer - Copies a container from a remote machine.
  *
  *  19. FinalizeBlock - Finalize block request from client.
+ *
+ *  20. ReadBlock - Allows us to read a block.
  */
 
 enum Type {
@@ -108,6 +110,7 @@ enum Type {
   FinalizeBlock = 21;
   Echo = 22;
   GetContainerChecksumInfo = 23;
+  ReadBlock = 24;
 }
 
 
@@ -218,6 +221,7 @@ message ContainerCommandRequestProto {
   optional   FinalizeBlockRequestProto finalizeBlock = 25;
   optional   EchoRequestProto echo = 26;
   optional   GetContainerChecksumInfoRequestProto getContainerChecksumInfo = 
27;
+  optional   ReadBlockRequestProto readBlock = 28;
 }
 
 message ContainerCommandResponseProto {
@@ -250,6 +254,7 @@ message ContainerCommandResponseProto {
   optional   FinalizeBlockResponseProto finalizeBlock = 22;
   optional   EchoResponseProto echo = 23;
   optional   GetContainerChecksumInfoResponseProto getContainerChecksumInfo = 
24;
+  optional   ReadBlockResponseProto readBlock = 25;
 }
 
 message ContainerDataProto {
@@ -393,6 +398,17 @@ message ListBlockResponseProto {
   repeated BlockData blockData = 1;
 }
 
+message ReadBlockRequestProto {
+  required DatanodeBlockID blockID = 1;
+  required uint64 offset = 2;
+  required uint64 len = 3;
+  required bool verifyChecksum = 4;
+}
+
+message ReadBlockResponseProto {
+  repeated ReadChunkResponseProto readChunk = 1;
+}
+
 message EchoRequestProto {
   optional bytes payload = 1;
   optional int32 payloadSizeResp = 2;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
new file mode 100644
index 00000000000..80ae5118467
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
+import org.apache.hadoop.ozone.om.TestBucket;
+
+/**
+ * Tests {@link StreamBlockInputStream}.
+ */
+public class TestStreamBlockInputStream extends TestInputStreamBase {
+  /**
+   * Run the tests as a single test method to avoid needing a new mini-cluster
+   * for each test.
+   */
+  @ContainerLayoutTestInfo.ContainerTest
+  void testAll(ContainerLayoutVersion layout) throws Exception {
+    try (MiniOzoneCluster cluster = newCluster()) {
+      cluster.waitForClusterToBeReady();
+
+      OzoneConfiguration conf = cluster.getConf();
+      OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+      clientConfig.setStreamReadBlock(true);
+      OzoneConfiguration copy = new OzoneConfiguration(conf);
+      copy.setFromObject(clientConfig);
+      try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
+        updateConfig(layout);
+        TestBucket bucket = TestBucket.newBuilder(client).build();
+
+        testBlockReadBuffers(bucket);
+        testBufferRelease(bucket);
+        testCloseReleasesBuffers(bucket);
+        testReadEmptyBlock(bucket);
+      }
+    }
+  }
+
+  /**
+   * Test to verify that data read from blocks is stored in a list of buffers
+   * with max capacity equal to the bytes per checksum.
+   */
+  private void testBlockReadBuffers(TestBucket bucket) throws Exception {
+    String keyName = getNewKeyName();
+    int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
+    byte[] inputData = bucket.writeRandomBytes(keyName, dataLength);
+
+    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
+
+      StreamBlockInputStream block0Stream =
+          (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
+
+
+      // To read 1 byte of chunk data, ChunkInputStream should get one full
+      // checksum boundary worth of data from Container and store it in 
buffers.
+      IOUtils.readFully(block0Stream, new byte[1]);
+      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+
+      // Read > checksum boundary of data from chunk0
+      int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
+      byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen);
+      bucket.validateData(inputData, 0, readData);
+
+      // The first checksum boundary size of data was already existing in the
+      // ChunkStream buffers. Once that data is read, the next checksum
+      // boundary size of data will be fetched again to read the remaining 
data.
+      // Hence, there should be 1 checksum boundary size of data stored in the
+      // ChunkStreams buffers at the end of the read.
+      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+
+      // Seek to a position in the third checksum boundary (so that current
+      // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
+      // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
+      // data being read into the buffers. There should be 2 buffers in the
+      // stream but the first buffer should be released after it is read
+      // and the second buffer should have BYTES_PER_CHECKSUM capacity.
+      int offset = 2 * BYTES_PER_CHECKSUM + 1;
+      readData = readDataFromBlock(block0Stream, offset, readDataLen);
+      bucket.validateData(inputData, offset, readData);
+      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+
+
+      // Read the full chunk data -1 and verify that all chunk data is read 
into
+      // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
+      // released once all chunk data is read.
+      readData = readDataFromBlock(block0Stream, 0, CHUNK_SIZE - 1);
+      bucket.validateData(inputData, 0, readData);
+      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+
+      // Read the last byte of chunk and verify that the buffers are released.
+      IOUtils.readFully(block0Stream, new byte[1]);
+      assertNull(block0Stream.getCachedBuffers(),
+          "ChunkInputStream did not release buffers after reaching EOF.");
+    }
+  }
+
+  private void testCloseReleasesBuffers(TestBucket bucket) throws Exception {
+    String keyName = getNewKeyName();
+    bucket.writeRandomBytes(keyName, CHUNK_SIZE);
+
+    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
+      StreamBlockInputStream block0Stream =
+          (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
+
+      readDataFromBlock(block0Stream, 0, 1);
+      assertNotNull(block0Stream.getCachedBuffers());
+
+      block0Stream.close();
+
+      assertNull(block0Stream.getCachedBuffers());
+    }
+  }
+
+  /**
+   * Test that ChunkInputStream buffers are released as soon as the last byte
+   * of the buffer is read.
+   */
+  private void testBufferRelease(TestBucket bucket) throws Exception {
+    String keyName = getNewKeyName();
+    byte[] inputData = bucket.writeRandomBytes(keyName, CHUNK_SIZE);
+
+    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
+
+      StreamBlockInputStream block0Stream =
+          (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
+
+      // Read checksum boundary - 1 bytes of data
+      int readDataLen = BYTES_PER_CHECKSUM - 1;
+      byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen);
+      bucket.validateData(inputData, 0, readData);
+
+      // There should be 1 byte of data remaining in the buffer which is not
+      // yet read. Hence, the buffer should not be released.
+      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+      assertEquals(1, block0Stream.getCachedBuffers()[0].remaining());
+
+      // Reading the last byte in the buffer should result in all the buffers
+      // being released.
+      readData = readDataFromBlock(block0Stream, 1);
+      bucket.validateData(inputData, readDataLen, readData);
+      assertNull(block0Stream.getCachedBuffers(),
+          "Chunk stream buffers not released after last byte is read");
+
+      // Read more data to get the data till the next checksum boundary.
+      readDataLen = BYTES_PER_CHECKSUM / 2;
+      readDataFromBlock(block0Stream, readDataLen);
+      // There should be one buffer and the buffer should not be released as
+      // there is data pending to be read from the buffer
+      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+      ByteBuffer lastCachedBuffer = block0Stream.getCachedBuffers()[0];
+      assertEquals(BYTES_PER_CHECKSUM - readDataLen,
+          lastCachedBuffer.remaining());
+
+      // Read more than the remaining data in buffer (but less than the next
+      // checksum boundary).
+      int position = (int) block0Stream.getPos();
+      readDataLen = lastCachedBuffer.remaining() + BYTES_PER_CHECKSUM / 2;
+      readData = readDataFromBlock(block0Stream, readDataLen);
+      bucket.validateData(inputData, position, readData);
+      // After reading the remaining data in the buffer, the buffer should be
+      // released and next checksum size of data must be read into the buffers
+      checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
+      // Verify that the previously cached buffer is released by comparing it
+      // with the current cached buffer
+      assertNotEquals(lastCachedBuffer,
+          block0Stream.getCachedBuffers()[0]);
+    }
+  }
+
+  private byte[] readDataFromBlock(StreamBlockInputStream 
streamBlockInputStream,
+                                   int offset, int readDataLength) throws 
IOException {
+    byte[] readData = new byte[readDataLength];
+    streamBlockInputStream.seek(offset);
+    IOUtils.readFully(streamBlockInputStream, readData);
+    return readData;
+  }
+
+  private byte[] readDataFromBlock(StreamBlockInputStream 
streamBlockInputStream,
+                                   int readDataLength) throws IOException {
+    byte[] readData = new byte[readDataLength];
+    IOUtils.readFully(streamBlockInputStream, readData);
+    return readData;
+  }
+
+  /**
+   * Verify number of buffers and their capacities.
+   * @param buffers chunk stream buffers
+   */
+  private void checkBufferSizeAndCapacity(ByteBuffer[] buffers) {
+    assertEquals(1, buffers.length,
+        "ChunkInputStream does not have expected number of " +
+            "ByteBuffers");
+    for (ByteBuffer buffer : buffers) {
+      assertEquals(BYTES_PER_CHECKSUM, buffer.capacity(),
+          "ChunkInputStream ByteBuffer capacity is wrong");
+    }
+  }
+
+  private void testReadEmptyBlock(TestBucket bucket) throws Exception {
+    String keyName = getNewKeyName();
+    int dataLength = 10;
+    bucket.writeRandomBytes(keyName, 0);
+
+    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
+
+      byte[] readData = new byte[dataLength];
+      assertTrue(keyInputStream.getPartStreams().isEmpty());
+      IOUtils.read(keyInputStream, readData);
+      for (byte b : readData) {
+        assertEquals((byte) 0, b);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to