This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d673fbf634c HDDS-13975. Limit the number of responses in stream read 
block. (#9375)
d673fbf634c is described below

commit d673fbf634c2c8af79ffbbd7358365621f6e7195
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Dec 1 08:40:14 2025 -0800

    HDDS-13975. Limit the number of responses in stream read block. (#9375)
---
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  46 +--
 .../hdds/scm/storage/StreamBlockInputStream.java   | 260 ++++++++-------
 .../scm/storage/TestStreamBlockInputStream.java    | 354 ---------------------
 .../hadoop/hdds/scm/StreamingReadResponse.java     |   9 +
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   |  10 +-
 .../common/helpers/RandomAccessBlockFile.java      |  92 ++++++
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  52 ++-
 .../org/apache/hadoop/ozone/common/Checksum.java   |   9 +
 .../container/common/impl/HddsDispatcher.java      |   5 +-
 .../common/interfaces/ContainerDispatcher.java     |   2 +
 .../ozone/container/common/interfaces/Handler.java |   4 +-
 .../transport/server/GrpcXceiverService.java       |  25 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  | 169 ++++++----
 .../src/main/proto/DatanodeClientProtocol.proto    |   2 +
 .../rpc/read/TestStreamBlockInputStream.java       |  67 ++++
 .../ozone/client/rpc/read/TestStreamRead.java      | 227 +++++++++++++
 16 files changed, 748 insertions(+), 585 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index b6a3d00f010..d53cc957cbf 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -37,6 +37,7 @@
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeID;
@@ -59,6 +60,7 @@
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.util.Time;
+import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
@@ -386,11 +388,14 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(
   }
 
   private List<DatanodeDetails> sortDatanodes(ContainerCommandRequestProto 
request) throws IOException {
+    return sortDatanodes(getRequestBlockID(request), request.getCmdType());
+  }
+
+  List<DatanodeDetails> sortDatanodes(DatanodeBlockID blockID, 
ContainerProtos.Type cmdType) throws IOException {
     List<DatanodeDetails> datanodeList = null;
-    DatanodeBlockID blockID = getRequestBlockID(request);
 
     if (blockID != null) {
-      if (request.getCmdType() != ContainerProtos.Type.ReadChunk) {
+      if (cmdType != ContainerProtos.Type.ReadChunk) {
         datanodeList = pipeline.getNodes();
         int getBlockDNLeaderIndex = 
datanodeList.indexOf(pipeline.getLeaderNode());
         if (getBlockDNLeaderIndex > 0) {
@@ -516,23 +521,19 @@ private XceiverClientReply sendCommandWithRetry(
     }
   }
 
-  /**
-   * Starts a streaming read operation, intended to read entire blocks from 
the datanodes. This method expects a
-   * {@link StreamingReaderSpi} to be passed in, which will be used to receive 
the streamed data from the datanode.
-   * Upon successfully starting the streaming read, a {@link 
StreamingReadResponse} is set into the pass StreamObserver,
-   * which contains information about the datanode used for the read, and the 
request observer that can be used to
-   * manage the stream (e.g., to cancel it if needed). A semaphore is acquired 
to limit the number of concurrent
-   * streaming reads so upon successful return of this method, the caller must 
ensure to call
-   * {@link #completeStreamRead(StreamingReadResponse)} to release the 
semaphore once the streaming read is complete.
-   * @param request The container command request to initiate the streaming 
read.
-   * @param streamObserver The observer that will handle the streamed 
responses.=
-   * @throws IOException
-   * @throws InterruptedException
-   */
   @Override
   public void streamRead(ContainerCommandRequestProto request,
-      StreamingReaderSpi streamObserver) throws IOException, 
InterruptedException {
-    List<DatanodeDetails> datanodeList = sortDatanodes(request);
+      StreamingReadResponse streamObserver) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("->{}, send onNext request {}",
+          streamObserver, TextFormat.shortDebugString(request.getReadBlock()));
+    }
+    streamObserver.getRequestObserver().onNext(request);
+  }
+
+  @Override
+  public void initStreamRead(BlockID blockID, StreamingReaderSpi 
streamObserver) throws IOException {
+    final List<DatanodeDetails> datanodeList = sortDatanodes(null, 
ContainerProtos.Type.ReadBlock);
     IOException lastException = null;
     for (DatanodeDetails dn : datanodeList) {
       try {
@@ -542,21 +543,20 @@ public void streamRead(ContainerCommandRequestProto 
request,
         if (stub == null) {
           throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
         }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Executing command {} on datanode {}", 
processForDebug(request), dn);
-        }
+        LOG.debug("initStreamRead {} on datanode {}", 
blockID.getContainerBlockID(), dn);
         StreamObserver<ContainerCommandRequestProto> requestObserver = stub
             .withDeadlineAfter(timeout, TimeUnit.SECONDS)
             .send(streamObserver);
         streamObserver.setStreamingReadResponse(new StreamingReadResponse(dn,
             (ClientCallStreamObserver<ContainerCommandRequestProto>) 
requestObserver));
-        requestObserver.onNext(request);
-        requestObserver.onCompleted();
         return;
       } catch (IOException e) {
         LOG.error("Failed to start streaming read to DataNode {}", dn, e);
         semaphore.release();
         lastException = e;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Interrupted initStreamRead to " + dn + " for " 
+ blockID, e);
       }
     }
     if (lastException != null) {
@@ -572,7 +572,7 @@ public void streamRead(ContainerCommandRequestProto request,
    * needed.
    */
   @Override
-  public void completeStreamRead(StreamingReadResponse streamingReadResponse) {
+  public void completeStreamRead() {
     semaphore.release();
   }
 
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
index 223ce65881f..8ddd5c4220d 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
@@ -17,9 +17,13 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import static org.apache.ratis.thirdparty.io.grpc.Status.Code.CANCELLED;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,6 +34,7 @@
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
@@ -37,6 +42,7 @@
 import org.apache.hadoop.hdds.scm.StreamingReadResponse;
 import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -44,9 +50,10 @@
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
+import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,18 +65,19 @@ public class StreamBlockInputStream extends 
BlockExtendedInputStream
     implements Seekable, CanUnbuffer, ByteBufferReadable {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamBlockInputStream.class);
   private static final int EOF = -1;
-  private static final Throwable CANCELLED_EXCEPTION = new 
Throwable("Cancelled by client");
 
   private final BlockID blockID;
   private final long blockLength;
+  private final int responseDataSize = 1 << 20; // 1 MB
+  private final long preReadSize = 32 << 20; // 32 MB
   private final AtomicReference<Pipeline> pipelineRef = new 
AtomicReference<>();
   private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef = 
new AtomicReference<>();
   private XceiverClientFactory xceiverClientFactory;
-  private XceiverClientSpi xceiverClient;
+  private XceiverClientGrpc xceiverClient;
 
   private ByteBuffer buffer;
   private long position = 0;
-  private boolean initialized = false;
+  private long requestedLength = 0;
   private StreamingReader streamingReader;
 
   private final boolean verifyChecksum;
@@ -111,7 +119,7 @@ public synchronized long getPos() {
   @Override
   public synchronized int read() throws IOException {
     checkOpen();
-    if (!dataAvailableToRead()) {
+    if (!dataAvailableToRead(1)) {
       return EOF;
     }
     position++;
@@ -129,7 +137,7 @@ public synchronized int read(ByteBuffer targetBuf) throws 
IOException {
     checkOpen();
     int read = 0;
     while (targetBuf.hasRemaining()) {
-      if (!dataAvailableToRead()) {
+      if (!dataAvailableToRead(targetBuf.remaining())) {
         break;
       }
       int toCopy = Math.min(buffer.remaining(), targetBuf.remaining());
@@ -143,16 +151,21 @@ public synchronized int read(ByteBuffer targetBuf) throws 
IOException {
     return read > 0 ? read : EOF;
   }
 
-  private boolean dataAvailableToRead() throws IOException {
+  private synchronized boolean dataAvailableToRead(int length) throws 
IOException {
     if (position >= blockLength) {
       return false;
     }
     initialize();
-    if (buffer == null || buffer.remaining() == 0) {
-      int loaded = fillBuffer();
-      return loaded != EOF;
+
+    if (bufferHasRemaining()) {
+      return true;
     }
-    return true;
+    buffer = streamingReader.read(length);
+    return bufferHasRemaining();
+  }
+
+  private synchronized boolean bufferHasRemaining() {
+    return buffer != null && buffer.hasRemaining();
   }
 
   @Override
@@ -174,6 +187,7 @@ public synchronized void seek(long pos) throws IOException {
     }
     closeStream();
     position = pos;
+    requestedLength = pos;
   }
 
   @Override
@@ -188,12 +202,11 @@ public synchronized void unbuffer() {
     releaseClient();
   }
 
-  private void closeStream() {
+  private synchronized void closeStream() {
     if (streamingReader != null) {
-      streamingReader.cancel();
+      streamingReader.onCompleted();
       streamingReader = null;
     }
-    initialized = false;
     buffer = null;
   }
 
@@ -207,36 +220,61 @@ protected synchronized void acquireClient() throws 
IOException {
     checkOpen();
     if (xceiverClient == null) {
       final Pipeline pipeline = pipelineRef.get();
+      final XceiverClientSpi client;
       try {
-        xceiverClient = 
xceiverClientFactory.acquireClientForReadData(pipeline);
+        client = xceiverClientFactory.acquireClientForReadData(pipeline);
       } catch (IOException ioe) {
         LOG.warn("Failed to acquire client for pipeline {}, block {}", 
pipeline, blockID);
         throw ioe;
       }
+
+      if (client == null) {
+        throw new IOException("Failed to acquire client for " + pipeline);
+      }
+      if (!(client instanceof XceiverClientGrpc)) {
+        throw new IOException("Unexpected client class: " + 
client.getClass().getName() + ", " + pipeline);
+      }
+
+      xceiverClient =  (XceiverClientGrpc) client;
     }
   }
 
-  private void initialize() throws IOException {
-    if (initialized) {
-      return;
-    }
-    while (true) {
+  private synchronized void initialize() throws IOException {
+    while (streamingReader == null) {
       try {
         acquireClient();
-        streamingReader = new StreamingReader();
-        ContainerProtocolCalls.readBlock(xceiverClient, position, blockID, 
tokenRef.get(),
-            pipelineRef.get().getReplicaIndexes(), streamingReader);
-        initialized = true;
-        return;
-      } catch (InterruptedException ie) {
-        Thread.currentThread().interrupt();
-        handleExceptions(new IOException("Interrupted", ie));
       } catch (IOException ioe) {
         handleExceptions(ioe);
       }
+
+      streamingReader = new StreamingReader();
+      xceiverClient.initStreamRead(blockID, streamingReader);
     }
   }
 
+  synchronized void readBlock(int length) throws IOException {
+    final long diff = position + length - requestedLength;
+    if (diff > 0) {
+      final long rounded = roundUp(diff + preReadSize, responseDataSize);
+      LOG.debug("position {}, length {}, requested {}, diff {}, rounded {}, 
preReadSize={}",
+          position, length, requestedLength, diff, rounded, preReadSize);
+      readBlockImpl(rounded);
+      requestedLength += rounded;
+    }
+  }
+
+  synchronized void readBlockImpl(long length) throws IOException {
+    if (streamingReader == null) {
+      throw new IOException("Uninitialized StreamingReader: " + blockID);
+    }
+    final StreamingReadResponse r = streamingReader.getResponse();
+    if (r == null) {
+      throw new IOException("Uninitialized StreamingReadResponse: " + blockID);
+    }
+    xceiverClient.streamRead(ContainerProtocolCalls.buildReadBlockCommandProto(
+        blockID, requestedLength, length, responseDataSize, tokenRef.get(), 
pipelineRef.get()), r);
+  }
+
   private void handleExceptions(IOException cause) throws IOException {
     if (cause instanceof StorageContainerException || 
isConnectivityIssue(cause)) {
       if (shouldRetryRead(cause, retryPolicy, retries++)) {
@@ -251,14 +289,6 @@ private void handleExceptions(IOException cause) throws 
IOException {
     }
   }
 
-  private int fillBuffer() throws IOException {
-    if (!streamingReader.hasNext()) {
-      return EOF;
-    }
-    buffer = streamingReader.readNext();
-    return buffer == null ? EOF : buffer.limit();
-  }
-
   protected synchronized void releaseClient() {
     if (xceiverClientFactory != null && xceiverClient != null) {
       closeStream();
@@ -277,9 +307,9 @@ private void refreshBlockInfo(IOException cause) throws 
IOException {
     refreshBlockInfo(cause, blockID, pipelineRef, tokenRef, refreshFunction);
   }
 
-  private synchronized void releaseStreamResources(StreamingReadResponse 
response) {
+  private synchronized void releaseStreamResources() {
     if (xceiverClient != null) {
-      xceiverClient.completeStreamRead(response);
+      xceiverClient.completeStreamRead();
     }
   }
 
@@ -288,44 +318,52 @@ private synchronized void 
releaseStreamResources(StreamingReadResponse response)
    */
   public class StreamingReader implements StreamingReaderSpi {
 
-    private final BlockingQueue<ContainerProtos.ReadBlockResponseProto> 
responseQueue = new LinkedBlockingQueue<>(1);
-    private final AtomicBoolean completed = new AtomicBoolean(false);
-    private final AtomicBoolean failed = new AtomicBoolean(false);
+    /** Response queue: poll is blocking while offer is non-blocking. */
+    private final BlockingQueue<ReadBlockResponseProto> responseQueue = new 
LinkedBlockingQueue<>();
+
+    private final CompletableFuture<Void> future = new CompletableFuture<>();
     private final AtomicBoolean semaphoreReleased = new AtomicBoolean(false);
-    private final AtomicReference<Throwable> error = new AtomicReference<>();
-    private volatile StreamingReadResponse response;
+    private final AtomicReference<StreamingReadResponse> response = new 
AtomicReference<>();
 
-    public boolean hasNext() {
-      return !responseQueue.isEmpty() || !completed.get();
+    void checkError() throws IOException {
+      if (future.isCompletedExceptionally()) {
+        try {
+          future.get();
+        } catch (InterruptedException | ExecutionException e) {
+          throw new IOException("Streaming read failed", e);
+        }
+      }
     }
 
-    public ByteBuffer readNext() throws IOException {
-      if (failed.get()) {
-        Throwable cause = error.get();
-        throw new IOException("Streaming read failed", cause);
+    ReadBlockResponseProto poll() throws IOException {
+      while (true) {
+        checkError();
+        if (future.isDone()) {
+          return null; // Stream ended
+        }
+
+        final ReadBlockResponseProto proto;
+        try {
+          proto = responseQueue.poll(1, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while waiting for response", e);
+        }
+        if (proto != null) {
+          return proto;
+        }
       }
+    }
 
-      if (completed.get() && responseQueue.isEmpty()) {
+    private ByteBuffer read(int length) throws IOException {
+      checkError();
+      if (future.isDone()) {
         return null; // Stream ended
       }
 
-      ReadBlockResponseProto readBlock;
-      try {
-        readBlock = responseQueue.poll(30, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Interrupted while waiting for response", e);
-      }
-      if (readBlock == null) {
-        if (failed.get()) {
-          Throwable cause = error.get();
-          throw new IOException("Streaming read failed", cause);
-        } else if (completed.get()) {
-          return null; // Stream ended
-        } else {
-          throw new IOException("Timed out waiting for response");
-        }
-      }
+      readBlock(length);
+
+      final ReadBlockResponseProto readBlock = poll();
       // The server always returns data starting from the last checksum 
boundary. Therefore if the reader position is
       // ahead of the position we received from the server, we need to adjust 
the buffer position accordingly.
       // If the reader position is behind
@@ -339,39 +377,48 @@ public ByteBuffer readNext() throws IOException {
       }
       if (pos > readBlock.getOffset()) {
         int offset = (int)(pos - readBlock.getOffset());
+        if (offset > buf.limit()) {
+          offset = buf.limit();
+        }
         buf.position(offset);
       }
       return buf;
     }
 
     private void releaseResources() {
-      // release resources only if it was not yet completed
       if (semaphoreReleased.compareAndSet(false, true)) {
-        releaseStreamResources(response);
+        releaseStreamResources();
       }
     }
 
     @Override
     public void onNext(ContainerProtos.ContainerCommandResponseProto 
containerCommandResponseProto) {
+      final ReadBlockResponseProto readBlock = 
containerCommandResponseProto.getReadBlock();
       try {
-        ReadBlockResponseProto readBlock = 
containerCommandResponseProto.getReadBlock();
         ByteBuffer data = readBlock.getData().asReadOnlyByteBuffer();
         if (verifyChecksum) {
           ChecksumData checksumData = 
ChecksumData.getFromProtoBuf(readBlock.getChecksumData());
           Checksum.verifyChecksum(data, checksumData, 0);
         }
         offerToQueue(readBlock);
-      } catch (OzoneChecksumException e) {
-        LOG.warn("Checksum verification failed for block {} from datanode {}",
-            getBlockID(), response.getDatanodeDetails(), e);
-        cancelDueToError(e);
+      } catch (Exception e) {
+        final ByteString data = readBlock.getData();
+        final long offset = readBlock.getOffset();
+        final StreamingReadResponse r = getResponse();
+        LOG.warn("Failed to process block {} response at offset={}, size={}: 
{}, {}",
+            getBlockID().getContainerBlockID(),
+            offset, data.size(), StringUtils.bytes2Hex(data.substring(0, 
10).asReadOnlyByteBuffer()),
+            readBlock.getChecksumData(), e);
+        setFailed(e);
+        r.getRequestObserver().onError(e);
+        releaseResources();
       }
     }
 
     @Override
     public void onError(Throwable throwable) {
       if (throwable instanceof StatusRuntimeException) {
-        if (((StatusRuntimeException) 
throwable).getStatus().getCode().name().equals("CANCELLED")) {
+        if (((StatusRuntimeException) throwable).getStatus().getCode() == 
CANCELLED) {
           // This is expected when the client cancels the stream.
           setCompleted();
         }
@@ -387,57 +434,52 @@ public void onCompleted() {
       releaseResources();
     }
 
-    /**
-     * By calling cancel, the client will send a cancel signal to the server, 
which will stop sending more data and
-     * cause the onError() to be called in this observer with a CANCELLED 
exception.
-     */
-    public void cancel() {
-      if (response != null && response.getRequestObserver() != null) {
-        response.getRequestObserver().cancel("Cancelled by client", 
CANCELLED_EXCEPTION);
-        setCompleted();
-        releaseResources();
-      }
-    }
-
-    public void cancelDueToError(Throwable exception) {
-      if (response != null && response.getRequestObserver() != null) {
-        response.getRequestObserver().onError(exception);
-        setFailed(exception);
-        releaseResources();
-      }
+    StreamingReadResponse getResponse() {
+      return response.get();
     }
 
     private void setFailed(Throwable throwable) {
-      if (completed.get()) {
-        throw new IllegalArgumentException("Cannot mark a completed stream as 
failed");
+      final boolean completed = future.completeExceptionally(throwable);
+      if (!completed) {
+        LOG.warn("Already failed: suppressed ", throwable);
       }
-      failed.set(true);
-      error.set(throwable);
     }
 
     private void setCompleted() {
-      if (!failed.get()) {
-        completed.set(true);
+      final boolean changed = future.complete(null);
+      if (!changed) {
+        try {
+          future.get();
+        } catch (InterruptedException | ExecutionException e) {
+          LOG.warn("Failed to setCompleted", e);
+        }
       }
+
+      releaseResources();
     }
 
     private void offerToQueue(ReadBlockResponseProto item) {
-      while (!completed.get() && !failed.get()) {
-        try {
-          if (responseQueue.offer(item, 100, TimeUnit.MILLISECONDS)) {
-            return;
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return;
-        }
+      if (LOG.isDebugEnabled()) {
+        final ContainerProtos.ChecksumData checksumData = 
item.getChecksumData();
+        LOG.debug("offerToQueue {} bytes, numChecksums {}, 
bytesPerChecksum={}",
+            item.getData().size(), checksumData.getChecksumsList().size(), 
checksumData.getBytesPerChecksum());
       }
+      final boolean offered = responseQueue.offer(item);
+      Preconditions.assertTrue(offered, () -> "Failed to offer " + item);
     }
 
     @Override
     public void setStreamingReadResponse(StreamingReadResponse 
streamingReadResponse) {
-      response = streamingReadResponse;
+      final boolean set = response.compareAndSet(null, streamingReadResponse);
+      Preconditions.assertTrue(set, () -> "Failed to set 
streamingReadResponse");
     }
   }
 
+  static long roundUp(long required, int packet) {
+    final long n = (required - 1) / packet;
+    final long rounded = (n + 1) * packet;
+    Preconditions.assertTrue(rounded >= required);
+    Preconditions.assertTrue(rounded - packet < required);
+    return rounded;
+  }
 }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
deleted file mode 100644
index 83784499110..00000000000
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.storage;
-
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.SecureRandom;
-import java.util.Collections;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
-import java.util.stream.Stream;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.client.ContainerBlockID;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.StreamingReadResponse;
-import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.common.Checksum;
-import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.OzoneChecksumException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Time;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.Status;
-import org.apache.ratis.thirdparty.io.grpc.StatusException;
-import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.invocation.InvocationOnMock;
-
-/**
- * Tests for {@link TestStreamBlockInputStream}'s functionality.
- */
-public class TestStreamBlockInputStream {
-  private static final int BYTES_PER_CHECKSUM = 1024;
-  private static final int BLOCK_SIZE = 1024;
-  private StreamBlockInputStream blockStream;
-  private final OzoneConfiguration conf = new OzoneConfiguration();
-  private XceiverClientFactory xceiverClientFactory;
-  private XceiverClientGrpc xceiverClient;
-  private Checksum checksum;
-  private ChecksumData checksumData;
-  private byte[] data;
-  private 
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto> 
requestObserver;
-  private Function<BlockID, BlockLocationInfo> refreshFunction;
-
-  @BeforeEach
-  public void setup() throws Exception {
-    Token<OzoneBlockTokenIdentifier> token = mock(Token.class);
-    when(token.encodeToUrlString()).thenReturn("url");
-
-    Set<HddsProtos.BlockTokenSecretProto.AccessModeProto> modes =
-        
Collections.singleton(HddsProtos.BlockTokenSecretProto.AccessModeProto.READ);
-    OzoneBlockTokenIdentifier tokenIdentifier = new 
OzoneBlockTokenIdentifier("owner", new BlockID(1, 1),
-        modes, Time.monotonicNow() + 10000, 10);
-    tokenIdentifier.setSecretKeyId(UUID.randomUUID());
-    when(token.getIdentifier()).thenReturn(tokenIdentifier.getBytes());
-    Pipeline pipeline = MockPipeline.createSingleNodePipeline();
-
-    BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class);
-    when(blockLocationInfo.getPipeline()).thenReturn(pipeline);
-    when(blockLocationInfo.getToken()).thenReturn(token);
-
-    xceiverClient = mock(XceiverClientGrpc.class);
-    when(xceiverClient.getPipeline()).thenReturn(pipeline);
-    xceiverClientFactory = mock(XceiverClientFactory.class);
-    when(xceiverClientFactory.acquireClientForReadData(any()))
-        .thenReturn(xceiverClient);
-    requestObserver = mock(ClientCallStreamObserver.class);
-
-    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
-    clientConfig.setStreamReadBlock(true);
-    clientConfig.setMaxReadRetryCount(1);
-    refreshFunction = mock(Function.class);
-    when(refreshFunction.apply(any())).thenReturn(blockLocationInfo);
-    BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
-    checksum = new Checksum(ChecksumType.CRC32, BYTES_PER_CHECKSUM);
-    createDataAndChecksum();
-    blockStream = new StreamBlockInputStream(blockID, BLOCK_SIZE, pipeline,
-        token, xceiverClientFactory, refreshFunction, clientConfig);
-  }
-
-  @AfterEach
-  public void teardown() {
-    if (blockStream != null) {
-      try {
-        blockStream.close();
-      } catch (IOException e) {
-        // ignore
-      }
-    }
-  }
-
-  @Test
-  public void testCloseStreamReleasesResources() throws IOException, 
InterruptedException {
-    setupSuccessfulRead();
-    assertEquals(data[0], blockStream.read());
-    blockStream.close();
-    // Verify that cancel() was called on the requestObserver mock
-    verify(requestObserver).cancel(any(), any());
-    // Verify that release() was called on the xceiverClient mock
-    verify(xceiverClientFactory).releaseClientForReadData(xceiverClient, 
false);
-    verify(xceiverClient, times(1)).completeStreamRead(any());
-  }
-
-  @Test
-  public void testUnbufferReleasesResourcesAndResumesFromLastPosition() throws 
IOException, InterruptedException {
-    setupSuccessfulRead();
-    assertEquals(data[0], blockStream.read());
-    assertEquals(1, blockStream.getPos());
-    blockStream.unbuffer();
-    // Verify that cancel() was called on the requestObserver mock
-    verify(requestObserver).cancel(any(), any());
-    // Verify that release() was called on the xceiverClient mock
-    verify(xceiverClientFactory).releaseClientForReadData(xceiverClient, 
false);
-    verify(xceiverClient, times(1)).completeStreamRead(any());
-    // The next read should "rebuffer" and continue from the last position
-    assertEquals(data[1], blockStream.read());
-    assertEquals(2, blockStream.getPos());
-  }
-
-  @Test
-  public void testSeekReleasesTheStreamAndStartsFromNewPosition() throws 
IOException, InterruptedException {
-    setupSuccessfulRead();
-    assertEquals(data[0], blockStream.read());
-    blockStream.seek(100);
-    assertEquals(100, blockStream.getPos());
-    // Verify that cancel() was called on the requestObserver mock
-    verify(requestObserver).cancel(any(), any());
-    verify(xceiverClient, times(1)).completeStreamRead(any());
-    // The xceiverClient should not be released
-    verify(xceiverClientFactory, never())
-        .releaseClientForReadData(xceiverClient, false);
-
-    assertEquals(data[100], blockStream.read());
-    assertEquals(101, blockStream.getPos());
-  }
-
-  @Test
-  public void testErrorThrownIfStreamReturnsError() throws IOException, 
InterruptedException {
-    // Note the error will only be thrown when the buffer needs to be 
refilled. I think case, as its the first
-    // read it will try to fill the buffer and encounter the error, but a 
reader could continue reading until the
-    // buffer is exhausted before seeing the error.
-    doAnswer((InvocationOnMock invocation) -> {
-      StreamingReaderSpi streamObserver = invocation.getArgument(1);
-      StreamingReadResponse resp =
-          new 
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), 
requestObserver);
-      streamObserver.setStreamingReadResponse(resp);
-      streamObserver.onError(new IOException("Test induced error"));
-      return null;
-    }).when(xceiverClient).streamRead(any(), any());
-    assertThrows(IOException.class, () -> blockStream.read());
-    verify(xceiverClient, times(1)).completeStreamRead(any());
-  }
-
-  @Test
-  public void seekOutOfBounds() throws IOException, InterruptedException {
-    setupSuccessfulRead();
-    assertThrows(IOException.class, () -> blockStream.seek(-1));
-    assertThrows(IOException.class, () -> blockStream.seek(BLOCK_SIZE + 1));
-  }
-
-  @Test
-  public void readPastEOFReturnsEOF() throws IOException, InterruptedException 
{
-    setupSuccessfulRead();
-    blockStream.seek(BLOCK_SIZE);
-    // Ensure the stream is at EOF even after two attempts to read
-    assertEquals(-1, blockStream.read());
-    assertEquals(-1, blockStream.read());
-    assertEquals(BLOCK_SIZE, blockStream.getPos());
-  }
-
-  @Test
-  public void ensureExceptionThrownForReadAfterClosed() throws IOException, 
InterruptedException {
-    setupSuccessfulRead();
-    blockStream.close();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(10);
-    byte[] byteArray = new byte[10];
-    assertThrows(IOException.class, () -> blockStream.read());
-    assertThrows(IOException.class, () -> {
-      // Findbugs complains about ignored return value without this :(
-      int r = blockStream.read(byteArray, 0, 10);
-    });
-    assertThrows(IOException.class, () -> blockStream.read(byteBuffer));
-    assertThrows(IOException.class, () -> blockStream.seek(10));
-  }
-
-  @ParameterizedTest
-  @MethodSource("exceptionsTriggeringRefresh")
-  public void testRefreshFunctionCalledForAllDNsBadOnInitialize(IOException 
thrown)
-      throws IOException, InterruptedException {
-    // In this case, if the first attempt to connect to any of the DNs fails, 
it should retry by refreshing the pipeline
-
-    doAnswer((InvocationOnMock invocation) -> {
-      throw thrown;
-    }).doAnswer((InvocationOnMock invocation) -> {
-      StreamingReaderSpi streamObserver = invocation.getArgument(1);
-      StreamingReadResponse resp =
-          new 
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), 
requestObserver);
-      streamObserver.setStreamingReadResponse(resp);
-      streamObserver.onNext(createChunkResponse(false));
-      streamObserver.onCompleted();
-      return null;
-    }).when(xceiverClient).streamRead(any(), any());
-    blockStream.read();
-    verify(refreshFunction, times(1)).apply(any());
-  }
-
-  @ParameterizedTest
-  @MethodSource("exceptionsNotTriggeringRefresh")
-  public void testRefreshNotCalledForAllDNsBadOnInitialize(IOException thrown)
-      throws IOException, InterruptedException {
-    // In this case, if the first attempt to connect to any of the DNs fails, 
it should retry by refreshing the pipeline
-    doAnswer((InvocationOnMock invocation) -> {
-      throw thrown;
-    }).when(xceiverClient).streamRead(any(), any());
-    assertThrows(IOException.class, () -> blockStream.read());
-    verify(refreshFunction, times(0)).apply(any());
-  }
-
-  @Test
-  public void testExceptionThrownAfterRetriesExhausted() throws IOException, 
InterruptedException {
-    // In this case, if the first attempt to connect to any of the DNs fails, 
it should retry by refreshing the pipeline
-    doAnswer((InvocationOnMock invocation) -> {
-      throw new StorageContainerException(CONTAINER_NOT_FOUND);
-    }).when(xceiverClient).streamRead(any(), any());
-
-    assertThrows(IOException.class, () -> blockStream.read());
-    verify(refreshFunction, times(1)).apply(any());
-  }
-
-  @Test
-  public void testInvalidChecksumThrowsException() throws IOException, 
InterruptedException {
-    doAnswer((InvocationOnMock invocation) -> {
-      StreamingReaderSpi streamObserver = invocation.getArgument(1);
-      StreamingReadResponse resp =
-          new 
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), 
requestObserver);
-      streamObserver.setStreamingReadResponse(resp);
-      streamObserver.onNext(createChunkResponse(true));
-      streamObserver.onCompleted();
-      return null;
-    }).when(xceiverClient).streamRead(any(), any());
-    assertThrows(IOException.class, () -> blockStream.read());
-  }
-
-  private void createDataAndChecksum() throws OzoneChecksumException {
-    data = new byte[BLOCK_SIZE];
-    new SecureRandom().nextBytes(data);
-    checksumData = checksum.computeChecksum(data);
-  }
-
-  private void setupSuccessfulRead() throws IOException, InterruptedException {
-    doAnswer((InvocationOnMock invocation) -> {
-      StreamingReaderSpi streamObserver = invocation.getArgument(1);
-      StreamingReadResponse resp =
-          new 
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(), 
requestObserver);
-      streamObserver.setStreamingReadResponse(resp);
-      streamObserver.onNext(createChunkResponse(false));
-      streamObserver.onCompleted();
-      return null;
-    }).when(xceiverClient).streamRead(any(), any());
-  }
-
-  private ContainerProtos.ContainerCommandResponseProto 
createChunkResponse(boolean invalidChecksum) {
-    ContainerProtos.ReadBlockResponseProto response = invalidChecksum ?
-        createInValidChecksumResponse() : createValidResponse();
-
-    return ContainerProtos.ContainerCommandResponseProto.newBuilder()
-        .setCmdType(ContainerProtos.Type.ReadBlock)
-        .setReadBlock(response)
-        .setResult(ContainerProtos.Result.SUCCESS)
-        .build();
-  }
-
-  private ContainerProtos.ReadBlockResponseProto createValidResponse() {
-    return ContainerProtos.ReadBlockResponseProto.newBuilder()
-        .setChecksumData(checksumData.getProtoBufMessage())
-        .setData(ByteString.copyFrom(data))
-        .setOffset(0)
-        .build();
-  }
-
-  private ContainerProtos.ReadBlockResponseProto 
createInValidChecksumResponse() {
-    byte[] invalidData = new byte[data.length];
-    System.arraycopy(data, 0, invalidData, 0, data.length);
-    // Corrupt the data
-    invalidData[0] = (byte) (invalidData[0] + 1);
-    return ContainerProtos.ReadBlockResponseProto.newBuilder()
-        .setChecksumData(checksumData.getProtoBufMessage())
-        .setData(ByteString.copyFrom(invalidData))
-        .setOffset(0)
-        .build();
-  }
-
-  private static Stream<Arguments> exceptionsTriggeringRefresh() {
-    return Stream.of(
-        Arguments.of(new StorageContainerException(CONTAINER_NOT_FOUND)),
-        Arguments.of(new IOException(new ExecutionException(
-            new StatusException(Status.UNAVAILABLE))))
-    );
-  }
-
-  private static Stream<Arguments> exceptionsNotTriggeringRefresh() {
-    return Stream.of(
-        Arguments.of(new SCMSecurityException("Security problem")),
-        Arguments.of(new OzoneChecksumException("checksum missing")),
-        Arguments.of(new IOException("Some random exception."))
-    );
-  }
-
-}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
index ea8694cd8b7..3018fda7ea6 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
@@ -29,11 +29,15 @@ public class StreamingReadResponse {
 
   private final DatanodeDetails dn;
   private final 
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto> 
requestObserver;
+  private final String name;
 
   public StreamingReadResponse(DatanodeDetails dn,
       ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto> 
requestObserver) {
     this.dn = dn;
     this.requestObserver = requestObserver;
+
+    final String s = dn.getID().toString();
+    this.name = "dn" + s.substring(s.lastIndexOf('-')) + "_stream";
   }
 
   public DatanodeDetails getDatanodeDetails() {
@@ -43,4 +47,9 @@ public DatanodeDetails getDatanodeDetails() {
   public 
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto> 
getRequestObserver() {
     return requestObserver;
   }
+
+  @Override
+  public String toString() {
+    return name;
+  }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index f1bf7a8ef85..54be3c5686a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -26,6 +26,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -144,12 +145,15 @@ public ContainerCommandResponseProto sendCommand(
     }
   }
 
-  public void streamRead(ContainerCommandRequestProto request,
-      StreamingReaderSpi streamObserver) throws IOException, 
InterruptedException {
+  public void initStreamRead(BlockID blockID, StreamingReaderSpi 
streamObserver) throws IOException {
     throw new UnsupportedOperationException("Stream read is not supported");
   }
 
-  public void completeStreamRead(StreamingReadResponse streamingReadResponse) {
+  public void streamRead(ContainerCommandRequestProto request, 
StreamingReadResponse streamObserver) {
+    throw new UnsupportedOperationException("Stream read is not supported");
+  }
+
+  public void completeStreamRead() {
     throw new UnsupportedOperationException("Stream read is not supported");
   }
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/RandomAccessBlockFile.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/RandomAccessBlockFile.java
new file mode 100644
index 00000000000..fa5dd1ca21c
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/RandomAccessBlockFile.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** {@link RandomAccessFile} for blocks. */
+public class RandomAccessBlockFile {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RandomAccessBlockFile.class);
+
+  private File blockFile;
+  private RandomAccessFile raf;
+  private FileChannel channel;
+
+  public RandomAccessBlockFile() {
+  }
+
+  public synchronized boolean isOpen() {
+    return blockFile != null;
+  }
+
+  public synchronized void open(File file) throws FileNotFoundException {
+    Preconditions.assertNull(blockFile, "blockFile");
+    blockFile = Objects.requireNonNull(file, "blockFile == null");
+    raf = new RandomAccessFile(blockFile, "r");
+    channel = raf.getChannel();
+  }
+
+  public synchronized void position(long newPosition) throws IOException {
+    Preconditions.assertTrue(isOpen(), "Not opened");
+    final long oldPosition = channel.position();
+    if (newPosition != oldPosition) {
+      LOG.debug("seek {} -> {} for file {}", oldPosition, newPosition, 
blockFile);
+      channel.position(newPosition);
+    }
+  }
+
+  public synchronized boolean read(ByteBuffer buffer) throws IOException {
+    Preconditions.assertTrue(isOpen(), "Not opened");
+    while (buffer.hasRemaining()) {
+      final int r = channel.read(buffer);
+      if (r == -1) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public synchronized void close() {
+    if (blockFile == null) {
+      return;
+    }
+    blockFile = null;
+    try {
+      channel.close();
+      channel = null;
+    } catch (IOException e) {
+      LOG.warn("Failed to close channel for {}", blockFile, e);
+      throw new RuntimeException(e);
+    }
+    try {
+      raf.close();
+      raf = null;
+    } catch (IOException e) {
+      LOG.warn("Failed to close RandomAccessFile for {}", blockFile, e);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index c6e5d75b5ca..15879fb4764 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -62,7 +62,6 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
-import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
@@ -906,23 +905,17 @@ public static List<Validator> toValidatorList(Validator 
validator) {
     return datanodeToResponseMap;
   }
 
-  /**
-   * Calls the container protocol to read a chunk.
-   *
-   * @param xceiverClient  client to perform call
-   * @param offset         offset where block starts
-   * @param blockID        ID of the block
-   * @param token          a token for this block (may be null)
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  @SuppressWarnings("checkstyle:ParameterNumber")
-  public static void readBlock(
-      XceiverClientSpi xceiverClient, long offset, BlockID blockID, Token<? 
extends TokenIdentifier> token,
-      Map<DatanodeDetails, Integer> replicaIndexes, StreamingReaderSpi 
streamObserver)
-      throws IOException, InterruptedException {
-    final ReadBlockRequestProto.Builder readBlockRequest =
-        ReadBlockRequestProto.newBuilder()
-            .setOffset(offset);
+  public static ContainerCommandRequestProto buildReadBlockCommandProto(
+      BlockID blockID, long offset, long length, int responseDataSize,
+      Token<? extends TokenIdentifier> token, Pipeline pipeline)
+      throws IOException {
+    final DatanodeDetails datanode = pipeline.getClosestNode();
+    final DatanodeBlockID datanodeBlockID = getDatanodeBlockID(blockID, 
datanode, pipeline.getReplicaIndexes());
+    final ReadBlockRequestProto.Builder readBlockRequest = 
ReadBlockRequestProto.newBuilder()
+        .setOffset(offset)
+        .setLength(length)
+        .setResponseDataSize(responseDataSize)
+        .setBlockID(datanodeBlockID);
     final ContainerCommandRequestProto.Builder builder =
         ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadBlock)
             .setContainerID(blockID.getContainerID());
@@ -930,23 +923,18 @@ public static void readBlock(
       builder.setEncodedToken(token.encodeToUrlString());
     }
 
-    readBlock(xceiverClient, blockID, builder, readBlockRequest, 
xceiverClient.getPipeline().getFirstNode(),
-        replicaIndexes, streamObserver);
+    return builder.setDatanodeUuid(datanode.getUuidString())
+        .setReadBlock(readBlockRequest)
+        .build();
   }
 
-  private static void readBlock(XceiverClientSpi xceiverClient,
-      BlockID blockID, ContainerCommandRequestProto.Builder builder, 
ReadBlockRequestProto.Builder readBlockBuilder,
-      DatanodeDetails datanode, Map<DatanodeDetails, Integer> replicaIndexes,
-      StreamingReaderSpi streamObserver) throws IOException, 
InterruptedException {
-    final DatanodeBlockID.Builder datanodeBlockID = 
blockID.getDatanodeBlockIDProtobufBuilder();
-    int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
+  static DatanodeBlockID getDatanodeBlockID(BlockID blockID, DatanodeDetails 
datanode,
+      Map<DatanodeDetails, Integer> replicaIndexes) {
+    final DatanodeBlockID.Builder b = 
blockID.getDatanodeBlockIDProtobufBuilder();
+    final int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
     if (replicaIndex > 0) {
-      datanodeBlockID.setReplicaIndex(replicaIndex);
+      b.setReplicaIndex(replicaIndex);
     }
-    readBlockBuilder.setBlockID(datanodeBlockID);
-    final ContainerCommandRequestProto request = builder
-        .setDatanodeUuid(datanode.getUuidString())
-        .setReadBlock(readBlockBuilder).build();
-    xceiverClient.streamRead(request, streamObserver);
+    return b.build();
   }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
index fbb29cfcd70..e911e1cb7a8 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -23,6 +23,7 @@
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -270,6 +271,14 @@ protected static ByteString computeChecksum(ByteBuffer 
data,
     }
   }
 
+  public static void verifySingleChecksum(ByteBuffer buffer, int offset, int 
bytesPerChecksum,
+      ByteString checksum, ChecksumType checksumType) throws 
OzoneChecksumException {
+    final ByteBuffer duplicated = buffer.duplicate();
+    duplicated.position(offset).limit(offset + bytesPerChecksum);
+    final ChecksumData cd = new ChecksumData(checksumType, bytesPerChecksum, 
Collections.singletonList(checksum));
+    verifyChecksum(duplicated, cd, 0);
+  }
+
   /**
    * Computes the ChecksumData for the input data and verifies that it
    * matches with that of the input checksumData.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 6dba6abf9d0..4e8becfb10c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -49,6 +49,7 @@
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.RandomAccessBlockFile;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.security.token.NoopTokenVerifier;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
@@ -819,7 +820,7 @@ public StateMachine.DataChannel getStreamDataChannel(
   @Override
   public void streamDataReadOnly(ContainerCommandRequestProto msg,
       StreamObserver<ContainerCommandResponseProto> streamObserver,
-      DispatcherContext dispatcherContext) {
+      RandomAccessBlockFile blockFile, DispatcherContext dispatcherContext) {
     Type cmdType = msg.getCmdType();
     String traceID = msg.getTraceID();
     Span span = TracingUtil.importAndCreateSpan(cmdType.toString(), traceID);
@@ -858,7 +859,7 @@ public void streamDataReadOnly(ContainerCommandRequestProto 
msg,
             ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
       }
       perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
-      responseProto = handler.readBlock(msg, container, dispatcherContext, 
streamObserver);
+      responseProto = handler.readBlock(msg, container, blockFile, 
streamObserver);
       long oPLatencyMS = Time.monotonicNow() - startTime;
       metrics.incContainerOpsLatencies(cmdType, oPLatencyMS);
       if (responseProto == null) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index 8a4a675187d..8edf6c34b9f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.RandomAccessBlockFile;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.ratis.statemachine.StateMachine;
@@ -97,6 +98,7 @@ default StateMachine.DataChannel getStreamDataChannel(
   default void streamDataReadOnly(
        ContainerCommandRequestProto msg,
        StreamObserver<ContainerCommandResponseProto> streamObserver,
+       RandomAccessBlockFile blockFile,
        DispatcherContext dispatcherContext) {
     throw new UnsupportedOperationException("streamDataReadOnly not 
supported.");
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 0abcab5afea..af4670bc3a8 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -29,6 +29,7 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.RandomAccessBlockFile;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
@@ -40,7 +41,6 @@
 import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.ratis.statemachine.StateMachine;
@@ -268,7 +268,7 @@ public void setClusterID(String clusterID) {
 
   public abstract ContainerCommandResponseProto readBlock(
       ContainerCommandRequestProto msg, Container container,
-      DispatcherContext dispatcherContext,
+      RandomAccessBlockFile blockFile,
       StreamObserver<ContainerCommandResponseProto> streamObserver);
 
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
index 041958b4227..5a05b3f48ca 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -24,6 +24,7 @@
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.RandomAccessBlockFile;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
@@ -31,6 +32,8 @@
 import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
 import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler;
 import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,6 +98,15 @@ public StreamObserver<ContainerCommandRequestProto> send(
       StreamObserver<ContainerCommandResponseProto> responseObserver) {
     return new StreamObserver<ContainerCommandRequestProto>() {
       private final AtomicBoolean isClosed = new AtomicBoolean(false);
+      private final RandomAccessBlockFile blockFile = new 
RandomAccessBlockFile();
+
+      boolean close() {
+        if (isClosed.compareAndSet(false, true)) {
+          blockFile.close();
+          return true;
+        }
+        return false;
+      }
 
       @Override
       public void onNext(ContainerCommandRequestProto request) {
@@ -105,7 +117,7 @@ public void onNext(ContainerCommandRequestProto request) {
 
         try {
           if (request.getCmdType() == Type.ReadBlock) {
-            dispatcher.streamDataReadOnly(request, responseObserver, null);
+            dispatcher.streamDataReadOnly(request, responseObserver, 
blockFile, context);
           } else {
             final ContainerCommandResponseProto resp = 
dispatcher.dispatch(request, context);
             responseObserver.onNext(resp);
@@ -113,7 +125,7 @@ public void onNext(ContainerCommandRequestProto request) {
         } catch (Throwable e) {
           LOG.error("Got exception when processing"
                     + " ContainerCommandRequestProto {}", request, e);
-          isClosed.set(true);
+          close();
           responseObserver.onError(e);
         } finally {
           zeroCopyMessageMarshaller.release(request);
@@ -125,13 +137,20 @@ public void onNext(ContainerCommandRequestProto request) {
 
       @Override
       public void onError(Throwable t) {
+        close();
+        if (t instanceof StatusRuntimeException) {
+          if (((StatusRuntimeException) t).getStatus().getCode() == 
Status.Code.CANCELLED) {
+            return;
+          }
+        }
+
         // for now we just log a msg
         LOG.error("ContainerCommand send on error. Exception: ", t);
       }
 
       @Override
       public void onCompleted() {
-        if (isClosed.compareAndSet(false, true)) {
+        if (close()) {
           LOG.debug("ContainerCommand send completed");
           responseObserver.onCompleted();
         }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 2f4177b2d06..60224078625 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -60,6 +60,8 @@
 import static 
org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient.createSingleNodePipeline;
 import static 
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.DEFAULT_LAYOUT;
 import static 
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
+import static org.apache.ratis.util.Preconditions.assertSame;
+import static org.apache.ratis.util.Preconditions.assertTrue;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -70,9 +72,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -80,7 +80,6 @@
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -101,6 +100,7 @@
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
@@ -114,6 +114,7 @@
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.RandomAccessBlockFile;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
@@ -2058,7 +2059,7 @@ public void deleteUnreferenced(Container container, long 
localID)
   @Override
   public ContainerCommandResponseProto readBlock(
       ContainerCommandRequestProto request, Container kvContainer,
-      DispatcherContext dispatcherContext,
+      RandomAccessBlockFile blockFile,
       StreamObserver<ContainerCommandResponseProto> streamObserver) {
 
     if (kvContainer.getContainerData().getLayoutVersion() != FILE_PER_BLOCK) {
@@ -2074,67 +2075,121 @@ public ContainerCommandResponseProto readBlock(
       return malformedRequest(request);
     }
     try {
-      ReadBlockRequestProto readBlock = request.getReadBlock();
-
-      BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID());
-      // This is a new api the block should always be checked.
-      BlockUtils.verifyReplicaIdx(kvContainer, blockID);
-      BlockUtils.verifyBCSId(kvContainer, blockID);
-
-      File blockFile = 
FILE_PER_BLOCK.getChunkFile(kvContainer.getContainerData(), blockID, "unused");
-
-      BlockData blockData = getBlockManager().getBlock(kvContainer, blockID);
-      List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
-      // To get the chunksize, check the first chunk. Either there is only 1 
chunk and its the largest, or there are
-      // multiple chunks and they are all the same size except the last one.
-      long bytesPerChunk = chunkInfos.get(0).getLen();
-      // The bytes per checksum is stored in the checksum data of each chunk, 
so check the first chunk as they all
-      // must be the same.
-      ContainerProtos.ChecksumType checksumType = 
chunkInfos.get(0).getChecksumData().getType();
-      ChecksumData checksumData = null;
-      int bytesPerChecksum = STREAMING_BYTES_PER_CHUNK;
-      if (checksumType == ContainerProtos.ChecksumType.NONE) {
-        checksumData = new ChecksumData(checksumType, 0);
-      } else {
-        bytesPerChecksum = 
chunkInfos.get(0).getChecksumData().getBytesPerChecksum();
-      }
-      // We have to align the read to checksum boundaries, so whatever offset 
is requested, we have to move back to the
-      // previous checksum boundary.
-      // eg if bytesPerChecksum is 512, and the requested offset is 600, we 
have to move back to 512.
-      // If the checksum type is NONE, we don't have to do this, but using no 
checksums should be rare in practice and
-      // it simplifies the code to always do this.
-      long adjustedOffset = readBlock.getOffset() - readBlock.getOffset() % 
bytesPerChecksum;
-      try (RandomAccessFile file = new RandomAccessFile(blockFile, "r");
-           FileChannel channel = file.getChannel()) {
-        ByteBuffer buffer = ByteBuffer.allocate(bytesPerChecksum);
-        channel.position(adjustedOffset);
-        while (channel.read(buffer) != -1) {
-          buffer.flip();
-          if (checksumType != ContainerProtos.ChecksumType.NONE) {
-            // As the checksums are stored "chunk by chunk", we need to figure 
out which chunk we start reading from,
-            // and its offset to pull out the correct checksum bytes for each 
read.
-            int chunkIndex = (int) (adjustedOffset / bytesPerChunk);
-            int chunkOffset = (int) (adjustedOffset % bytesPerChunk);
-            int checksumIndex = chunkOffset / bytesPerChecksum;
-            ByteString checksum = 
blockData.getChunks().get(chunkIndex).getChecksumData().getChecksums(checksumIndex);
-            checksumData = new ChecksumData(checksumType, bytesPerChecksum, 
Collections.singletonList(checksum));
-          }
-          streamObserver.onNext(getReadBlockResponse(request, checksumData, 
buffer, adjustedOffset));
-          buffer.clear();
-
-          adjustedOffset += bytesPerChecksum;
-        }
-      }
+      readBlockImpl(request, blockFile, kvContainer, streamObserver, false);
       // TODO metrics.incContainerBytesStats(Type.ReadBlock, 
readBlock.getLen());
     } catch (StorageContainerException ex) {
       responseProto = ContainerUtils.logAndReturnError(LOG, ex, request);
     } catch (IOException ioe) {
-      responseProto = ContainerUtils.logAndReturnError(LOG,
-          new StorageContainerException("Read Block failed", ioe, 
IO_EXCEPTION), request);
+      final StorageContainerException sce = new StorageContainerException(
+          "Failed to readBlock " + request.getReadBlock(), ioe, IO_EXCEPTION);
+      responseProto = ContainerUtils.logAndReturnError(LOG, sce, request);
+    } catch (Exception e) {
+      final StorageContainerException sce = new StorageContainerException(
+          "Failed to readBlock " + request.getReadBlock(), e, 
CONTAINER_INTERNAL_ERROR);
+      LOG.error("", sce);
+      responseProto = ContainerUtils.logAndReturnError(LOG, sce, request);
     }
     return responseProto;
   }
 
+  private void readBlockImpl(ContainerCommandRequestProto request, 
RandomAccessBlockFile blockFile,
+      Container kvContainer, StreamObserver<ContainerCommandResponseProto> 
streamObserver, boolean verifyChecksum)
+      throws IOException {
+    final ReadBlockRequestProto readBlock = request.getReadBlock();
+    int responseDataSize = readBlock.getResponseDataSize();
+    if (responseDataSize == 0) {
+      responseDataSize = 1 << 20;
+    }
+
+    final BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID());
+    if (!blockFile.isOpen()) {
+      final File file = 
FILE_PER_BLOCK.getChunkFile(kvContainer.getContainerData(), blockID, "unused");
+      blockFile.open(file);
+    }
+
+    // This is a new api the block should always be checked.
+    BlockUtils.verifyReplicaIdx(kvContainer, blockID);
+    BlockUtils.verifyBCSId(kvContainer, blockID);
+
+    final BlockData blockData = getBlockManager().getBlock(kvContainer, 
blockID);
+    final List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+    final int bytesPerChunk = Math.toIntExact(chunkInfos.get(0).getLen());
+    final ChecksumType checksumType = 
chunkInfos.get(0).getChecksumData().getType();
+    ChecksumData checksumData = null;
+    int bytesPerChecksum = STREAMING_BYTES_PER_CHUNK;
+    if (checksumType == ContainerProtos.ChecksumType.NONE) {
+      checksumData = new ChecksumData(checksumType, 0);
+    } else {
+      bytesPerChecksum = 
chunkInfos.get(0).getChecksumData().getBytesPerChecksum();
+    }
+    // We have to align the read to checksum boundaries, so whatever offset is 
requested, we have to move back to the
+    // previous checksum boundary.
+    // eg if bytesPerChecksum is 512, and the requested offset is 600, we have 
to move back to 512.
+    // If the checksum type is NONE, we don't have to do this, but using no 
checksums should be rare in practice and
+    // it simplifies the code to always do this.
+    long adjustedOffset = readBlock.getOffset() - readBlock.getOffset() % 
bytesPerChecksum;
+
+    final ByteBuffer buffer = ByteBuffer.allocate(responseDataSize);
+    blockFile.position(adjustedOffset);
+    int totalDataLength = 0;
+    int numResponses = 0;
+    for (boolean shouldRead = true; totalDataLength < readBlock.getLength() && 
shouldRead;) {
+      shouldRead = blockFile.read(buffer);
+      buffer.flip();
+      final int readLength = buffer.remaining();
+      assertTrue(readLength > 0, () -> "readLength = " + readLength + " <= 0");
+      if (checksumType != ContainerProtos.ChecksumType.NONE) {
+        final List<ByteString> checksums = getChecksums(adjustedOffset, 
readLength,
+            bytesPerChunk, bytesPerChecksum, chunkInfos);
+        LOG.debug("Read {} at adjustedOffset {}, readLength {}, bytesPerChunk 
{}, bytesPerChecksum {}",
+            readBlock, adjustedOffset, readLength, bytesPerChunk, 
bytesPerChecksum);
+        checksumData = new ChecksumData(checksumType, bytesPerChecksum, 
checksums);
+        if (verifyChecksum) {
+          Checksum.verifyChecksum(buffer.duplicate(), checksumData, 0);
+        }
+      }
+      final ContainerCommandResponseProto response = getReadBlockResponse(
+          request, checksumData, buffer, adjustedOffset);
+      final int dataLength = response.getReadBlock().getData().size();
+      LOG.debug("server onNext response {}: dataLength={}, numChecksums={}",
+          numResponses, dataLength, 
response.getReadBlock().getChecksumData().getChecksumsList().size());
+      streamObserver.onNext(response);
+      buffer.clear();
+
+      adjustedOffset += readLength;
+      totalDataLength += dataLength;
+      numResponses++;
+    }
+  }
+
+  static List<ByteString> getChecksums(long blockOffset, int readLength, int 
bytesPerChunk, int bytesPerChecksum,
+      final List<ContainerProtos.ChunkInfo> chunks) {
+    assertSame(0, blockOffset % bytesPerChecksum, "blockOffset % 
bytesPerChecksum");
+    final int numChecksums = readLength / bytesPerChecksum;
+    final List<ByteString> checksums = new ArrayList<>(numChecksums);
+    for (int i = 0; i < numChecksums; i++) {
+      // As the checksums are stored "chunk by chunk", we need to figure out 
which chunk we start reading from,
+      // and its offset to pull out the correct checksum bytes for each read.
+      final int n = i * bytesPerChecksum;
+      final long offset = blockOffset + n;
+      final int c = Math.toIntExact(offset / bytesPerChunk);
+      final int chunkOffset = Math.toIntExact(offset % bytesPerChunk);
+      final int csi = chunkOffset / bytesPerChecksum;
+
+      assertTrue(c < chunks.size(),
+          () -> "chunkIndex = " + c + " >= chunk.size()" + chunks.size());
+      final ContainerProtos.ChunkInfo chunk = chunks.get(c);
+      assertSame(bytesPerChunk, chunk.getLen(), "bytesPerChunk");
+      final ContainerProtos.ChecksumData checksumDataProto = 
chunks.get(c).getChecksumData();
+      assertSame(bytesPerChecksum, checksumDataProto.getBytesPerChecksum(), 
"bytesPerChecksum");
+      final List<ByteString> checksumsList = 
checksumDataProto.getChecksumsList();
+      assertTrue(csi < checksumsList.size(),
+          () -> "checksumIndex = " + csi + " >= checksumsList.size()" + 
checksumsList.size());
+      checksums.add(checksumsList.get(csi));
+    }
+    return checksums;
+  }
+
   @Override
   public void addFinalizedBlock(Container container, long localID) {
     KeyValueContainer keyValueContainer = (KeyValueContainer)container;
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 6b4d8f1bd7f..cb2b1fb27f3 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -401,6 +401,8 @@ message ListBlockResponseProto {
 message ReadBlockRequestProto {
   required DatanodeBlockID blockID = 1;
   required uint64 offset = 2;
+  optional uint64 length = 3;
+  optional uint32 responseDataSize = 4;
 }
 
 message ReadBlockResponseProto {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
index bb66a303155..ab46d2fd218 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
@@ -31,13 +31,33 @@
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import 
org.apache.hadoop.ozone.container.common.transport.server.GrpcXceiverService;
 import org.apache.hadoop.ozone.om.TestBucket;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
 /**
  * Tests {@link StreamBlockInputStream}.
  */
 public class TestStreamBlockInputStream extends TestInputStreamBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestStreamBlockInputStream.class);
+
+  {
+    GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"), 
Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.server.http"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.container"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.ha"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.safemode"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"),
 Level.ERROR);
+    GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"), 
Level.ERROR);
+    GenericTestUtils.setLogLevel(GrpcXceiverService.class, Level.ERROR);
+  }
+
   /**
    * Run the tests as a single test method to avoid needing a new mini-cluster
    * for each test.
@@ -46,6 +66,53 @@ public class TestStreamBlockInputStream extends 
TestInputStreamBase {
   private byte[] inputData;
   private TestBucket bucket;
 
+  @Test
+  void testReadKey() throws Exception {
+    try (MiniOzoneCluster cluster = newCluster()) {
+      cluster.waitForClusterToBeReady();
+
+      LOG.info("cluster ready");
+
+      OzoneConfiguration conf = cluster.getConf();
+      OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+      clientConfig.setStreamReadBlock(true);
+      OzoneConfiguration copy = new OzoneConfiguration(conf);
+      copy.setFromObject(clientConfig);
+      String keyName = getNewKeyName();
+      try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
+        bucket = TestBucket.newBuilder(client).build();
+        inputData = bucket.writeRandomBytes(keyName, DATA_LENGTH);
+        LOG.info("writeRandomBytes {} bytes", inputData.length);
+
+        for (int i = 1; i <= 10; i++) {
+          runTestReadKey(keyName, DATA_LENGTH / i);
+        }
+
+        for (int n = 4; n <= 16 << 10; n <<= 2) {
+          runTestReadKey(keyName, n << 10); // 4kB
+        }
+      }
+    }
+  }
+
+  private void runTestReadKey(String key, int bufferSize) throws Exception {
+    LOG.info("---------------------------------------------------------");
+    LOG.info("read {} bytes with bufferSize {}", DATA_LENGTH, bufferSize);
+    // Read the data fully into a large enough byte array
+    final byte[] buffer = new byte[bufferSize];
+    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) {
+      int pos = 0;
+      for (; pos < DATA_LENGTH;) {
+        final int read = keyInputStream.read(buffer, 0, buffer.length);
+        for (int i = 0; i < read; i++) {
+          assertEquals(inputData[pos + i], buffer[i], "pos=" + pos + ", i=" + 
i);
+        }
+        pos += read;
+      }
+      assertEquals(DATA_LENGTH, pos);
+    }
+  }
+
   @Test
   void testAll() throws Exception {
     try (MiniOzoneCluster cluster = newCluster()) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
new file mode 100644
index 00000000000..134a87bf2ad
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.OutputStream;
+import java.security.MessageDigest;
+import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.ozone.ClientConfigForTesting;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.om.TestBucket;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.jupiter.api.Test;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+/**
+ * Tests {@link StreamBlockInputStream}.
+ */
+public class TestStreamRead {
+  private TestBucket bucket;
+
+  {
+    GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"), 
Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.server.http"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.container"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.ha"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.scm.safemode"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.hdds.utils"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"),
 Level.ERROR);
+    
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"),
 Level.ERROR);
+    GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"), 
Level.ERROR);
+    GenericTestUtils.setLogLevel(LoggerFactory.getLogger(CodecBuffer.class), 
Level.ERROR);
+  }
+
+  static final int CHUNK_SIZE = 1 << 20;          // 1MB
+  static final int FLUSH_SIZE = 2 * CHUNK_SIZE;       // 2MB
+  static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;   // 4MB
+
+  static final int BLOCK_SIZE = 128 << 20;
+  static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("256M");
+
+  static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
+    final OzoneConfiguration conf = new OzoneConfiguration();
+
+    OzoneClientConfig config = conf.getObject(OzoneClientConfig.class);
+    config.setBytesPerChecksum(bytesPerChecksum);
+    conf.setFromObject(config);
+
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 5);
+    conf.setQuietMode(true);
+    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64, 
StorageUnit.MB);
+
+    ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
+        .setBlockSize(BLOCK_SIZE)
+        .setChunkSize(CHUNK_SIZE)
+        .setStreamBufferFlushSize(FLUSH_SIZE)
+        .setStreamBufferMaxSize(MAX_FLUSH_SIZE)
+        .applyTo(conf);
+
+    return MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+  }
+
+  @Test
+  void testReadKey512() throws Exception {
+    final SizeInBytes bytesPerChecksum = SizeInBytes.valueOf(512);
+    runTestReadKey(KEY_SIZE, bytesPerChecksum);
+  }
+
+  @Test
+  void testReadKey16k() throws Exception {
+    final SizeInBytes bytesPerChecksum = SizeInBytes.valueOf("16k");
+    runTestReadKey(KEY_SIZE, bytesPerChecksum);
+  }
+
+  @Test
+  void testReadKey256k() throws Exception {
+    final SizeInBytes bytesPerChecksum = SizeInBytes.valueOf("256k");
+    runTestReadKey(KEY_SIZE, bytesPerChecksum);
+  }
+
+  void runTestReadKey(SizeInBytes keySize, SizeInBytes bytesPerChecksum) 
throws Exception {
+    try (MiniOzoneCluster cluster = newCluster(bytesPerChecksum.getSizeInt())) 
{
+      cluster.waitForClusterToBeReady();
+
+      System.out.println("cluster ready");
+
+      OzoneConfiguration conf = cluster.getConf();
+      OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+      clientConfig.setStreamReadBlock(true);
+      OzoneConfiguration copy = new OzoneConfiguration(conf);
+      copy.setFromObject(clientConfig);
+
+      final int n = 10;
+      final SizeInBytes writeBufferSize = SizeInBytes.valueOf("8MB");
+      final SizeInBytes[] readBufferSizes = {
+          SizeInBytes.valueOf("4k"),
+          SizeInBytes.valueOf("1M"),
+          SizeInBytes.valueOf("8M"),
+          SizeInBytes.valueOf("32M"),
+      };
+
+      try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
+        bucket = TestBucket.newBuilder(client).build();
+
+        for (int i = 0; i < n; i++) {
+          final String keyName = "key" + i;
+          
System.out.println("---------------------------------------------------------");
+          System.out.printf("%s with %s bytes and %s bytesPerChecksum%n",
+              keyName, keySize, bytesPerChecksum);
+
+          final String md5 = createKey(bucket.delegate(), keyName, keySize, 
writeBufferSize);
+          for (SizeInBytes readBufferSize : readBufferSizes) {
+            runTestReadKey(keyName, keySize, readBufferSize, null);
+            runTestReadKey(keyName, keySize, readBufferSize, md5);
+          }
+        }
+      }
+    }
+  }
+
+  static void print(String name, long keySizeByte, long elapsedNanos, 
SizeInBytes bufferSize, String computedMD5) {
+    final double keySizeMb = keySizeByte * 1.0 / (1 << 20);
+    final double elapsedSeconds = elapsedNanos / 1_000_000_000.0;
+    System.out.printf("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f 
MB, md5=%s)%n",
+        name, keySizeMb / elapsedSeconds, elapsedSeconds, bufferSize, 
keySizeMb, computedMD5);
+  }
+
+  static String createKey(OzoneBucket bucket, String keyName, SizeInBytes 
keySize, SizeInBytes bufferSize)
+      throws Exception {
+    final byte[] buffer = new byte[bufferSize.getSizeInt()];
+    ThreadLocalRandom.current().nextBytes(buffer);
+
+    final long keySizeByte = keySize.getSize();
+    final long startTime = System.nanoTime();
+    try (OutputStream stream = bucket.createStreamKey(keyName, keySizeByte,
+        RatisReplicationConfig.getInstance(THREE), Collections.emptyMap())) {
+      for (long pos = 0; pos < keySizeByte;) {
+        final int writeSize = Math.toIntExact(Math.min(buffer.length, 
keySizeByte - pos));
+        stream.write(buffer, 0, writeSize);
+        pos += writeSize;
+      }
+    }
+    final long elapsedNanos = System.nanoTime() - startTime;
+
+    final MessageDigest md5 = MessageDigest.getInstance("MD5");
+    for (long pos = 0; pos < keySizeByte;) {
+      final int writeSize = Math.toIntExact(Math.min(buffer.length, 
keySizeByte - pos));
+      md5.update(buffer, 0, writeSize);
+      pos += writeSize;
+    }
+
+    final String computedMD5 = StringUtils.bytes2Hex(md5.digest());
+    print("createStreamKey", keySizeByte, elapsedNanos, bufferSize, 
computedMD5);
+    return computedMD5;
+  }
+
+  private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes 
bufferSize, String expectedMD5)
+      throws Exception {
+    final long keySizeByte = keySize.getSize();
+    final MessageDigest md5 = MessageDigest.getInstance("MD5");
+    // Read the data fully into a large enough byte array
+    final byte[] buffer = new byte[bufferSize.getSizeInt()];
+    final long startTime = System.nanoTime();
+    try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
+      int pos = 0;
+      for (; pos < keySizeByte;) {
+        final int read = keyInputStream.read(buffer, 0, buffer.length);
+        if (read == -1) {
+          break;
+        }
+
+        if (expectedMD5 != null) {
+          md5.update(buffer, 0, read);
+        }
+        pos += read;
+      }
+      assertEquals(keySizeByte, pos);
+    }
+    final long elapsedNanos = System.nanoTime() - startTime;
+
+    final String computedMD5;
+    if (expectedMD5 == null) {
+      computedMD5 = null;
+    } else {
+      computedMD5 = StringUtils.bytes2Hex(md5.digest());
+      assertEquals(expectedMD5, computedMD5);
+    }
+    print("readStreamKey", keySizeByte, elapsedNanos, bufferSize, computedMD5);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to