This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 58d1b6595ff HDDS-13974. Use the same gRPC steam for reading the same
block. (#9369) Contributed by Stephen O'Donnell
58d1b6595ff is described below
commit 58d1b6595ff1bab9bb49d7bba9acabf83fd576c7
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Nov 25 17:35:04 2025 -0800
HDDS-13974. Use the same gRPC steam for reading the same block. (#9369)
Contributed by Stephen O'Donnell
---
.../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 179 ++---
.../hdds/scm/storage/BlockExtendedInputStream.java | 97 +++
.../hadoop/hdds/scm/storage/BlockInputStream.java | 93 +--
.../hdds/scm/storage/StreamBlockInputStream.java | 792 +++++++--------------
.../client/io/BlockInputStreamFactoryImpl.java | 8 +-
.../scm/storage/DummyStreamBlockInputStream.java | 143 ----
.../hdds/scm/storage/TestBlockInputStream.java | 4 +-
.../scm/storage/TestStreamBlockInputStream.java | 460 +++++++-----
.../hadoop/hdds/scm/StreamingReadResponse.java | 46 ++
.../hadoop/hdds/scm/StreamingReaderSpi.java} | 23 +-
.../apache/hadoop/hdds/scm/XceiverClientSpi.java | 9 +
.../ContainerCommandResponseBuilders.java | 20 +-
.../hdds/scm/storage/ContainerProtocolCalls.java | 37 +-
.../container/common/impl/HddsDispatcher.java | 16 +-
.../common/interfaces/ContainerDispatcher.java | 3 +-
.../ozone/container/keyvalue/KeyValueHandler.java | 136 ++--
.../container/keyvalue/TestKeyValueHandler.java | 93 ---
.../src/main/proto/DatanodeClientProtocol.proto | 6 +-
.../rpc/read/TestStreamBlockInputStream.java | 244 ++-----
19 files changed, 939 insertions(+), 1470 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 09e01593feb..b6a3d00f010 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -44,8 +44,6 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import
org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -65,6 +63,7 @@
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
@@ -386,25 +385,9 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(
});
}
- private XceiverClientReply sendCommandWithRetry(
- ContainerCommandRequestProto request, List<Validator> validators)
- throws IOException {
- ContainerCommandResponseProto responseProto = null;
- IOException ioException = null;
-
- // In case of an exception or an error, we will try to read from the
- // datanodes in the pipeline in a round-robin fashion.
- XceiverClientReply reply = new XceiverClientReply(null);
+ private List<DatanodeDetails> sortDatanodes(ContainerCommandRequestProto
request) throws IOException {
List<DatanodeDetails> datanodeList = null;
-
- DatanodeBlockID blockID = null;
- if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
- blockID = request.getGetBlock().getBlockID();
- } else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
- blockID = request.getReadChunk().getBlockID();
- } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
- blockID = request.getGetSmallFile().getBlock().getBlockID();
- }
+ DatanodeBlockID blockID = getRequestBlockID(request);
if (blockID != null) {
if (request.getCmdType() != ContainerProtos.Type.ReadChunk) {
@@ -442,6 +425,33 @@ private XceiverClientReply sendCommandWithRetry(
if (!allInService) {
datanodeList = sortDatanodeByOperationalState(datanodeList);
}
+ return datanodeList;
+ }
+
+ private static DatanodeBlockID
getRequestBlockID(ContainerCommandRequestProto request) {
+ DatanodeBlockID blockID = null;
+ if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
+ blockID = request.getGetBlock().getBlockID();
+ } else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
+ blockID = request.getReadChunk().getBlockID();
+ } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
+ blockID = request.getGetSmallFile().getBlock().getBlockID();
+ } else if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
+ blockID = request.getReadBlock().getBlockID();
+ }
+ return blockID;
+ }
+
+ private XceiverClientReply sendCommandWithRetry(
+ ContainerCommandRequestProto request, List<Validator> validators)
+ throws IOException {
+ ContainerCommandResponseProto responseProto = null;
+ IOException ioException = null;
+
+ // In case of an exception or an error, we will try to read from the
+ // datanodes in the pipeline in a round-robin fashion.
+ XceiverClientReply reply = new XceiverClientReply(null);
+ List<DatanodeDetails> datanodeList = sortDatanodes(request);
for (DatanodeDetails dn : datanodeList) {
try {
@@ -453,11 +463,7 @@ private XceiverClientReply sendCommandWithRetry(
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
reply.addDatanode(dn);
- if (request.getCmdType() == ContainerProtos.Type.ReadBlock) {
- responseProto = sendCommandReadBlock(request,
dn).getResponse().get();
- } else {
- responseProto = sendCommandAsync(request, dn).getResponse().get();
- }
+ responseProto = sendCommandAsync(request, dn).getResponse().get();
if (validators != null && !validators.isEmpty()) {
for (Validator validator : validators) {
validator.accept(request, responseProto);
@@ -510,6 +516,66 @@ private XceiverClientReply sendCommandWithRetry(
}
}
+ /**
+ * Starts a streaming read operation, intended to read entire blocks from
the datanodes. This method expects a
+ * {@link StreamingReaderSpi} to be passed in, which will be used to receive
the streamed data from the datanode.
+ * Upon successfully starting the streaming read, a {@link
StreamingReadResponse} is set into the pass StreamObserver,
+ * which contains information about the datanode used for the read, and the
request observer that can be used to
+ * manage the stream (e.g., to cancel it if needed). A semaphore is acquired
to limit the number of concurrent
+ * streaming reads so upon successful return of this method, the caller must
ensure to call
+ * {@link #completeStreamRead(StreamingReadResponse)} to release the
semaphore once the streaming read is complete.
+ * @param request The container command request to initiate the streaming
read.
+ * @param streamObserver The observer that will handle the streamed
responses.=
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public void streamRead(ContainerCommandRequestProto request,
+ StreamingReaderSpi streamObserver) throws IOException,
InterruptedException {
+ List<DatanodeDetails> datanodeList = sortDatanodes(request);
+ IOException lastException = null;
+ for (DatanodeDetails dn : datanodeList) {
+ try {
+ checkOpen(dn);
+ semaphore.acquire();
+ XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
+ if (stub == null) {
+ throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing command {} on datanode {}",
processForDebug(request), dn);
+ }
+ StreamObserver<ContainerCommandRequestProto> requestObserver = stub
+ .withDeadlineAfter(timeout, TimeUnit.SECONDS)
+ .send(streamObserver);
+ streamObserver.setStreamingReadResponse(new StreamingReadResponse(dn,
+ (ClientCallStreamObserver<ContainerCommandRequestProto>)
requestObserver));
+ requestObserver.onNext(request);
+ requestObserver.onCompleted();
+ return;
+ } catch (IOException e) {
+ LOG.error("Failed to start streaming read to DataNode {}", dn, e);
+ semaphore.release();
+ lastException = e;
+ }
+ }
+ if (lastException != null) {
+ throw lastException;
+ } else {
+ throw new IOException("Failed to start streaming read to any available
DataNodes");
+ }
+ }
+
+ /**
+ * This method should be called to indicate the end of streaming read. Its
primary purpose is to release the
+ * semaphore acquired when starting the streaming read, but is also used to
update any metrics or debug logs as
+ * needed.
+ */
+ @Override
+ public void completeStreamRead(StreamingReadResponse streamingReadResponse) {
+ semaphore.release();
+ }
+
private static List<DatanodeDetails> sortDatanodeByOperationalState(
List<DatanodeDetails> datanodeList) {
List<DatanodeDetails> sortedDatanodeList = new ArrayList<>(datanodeList);
@@ -629,69 +695,6 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
return new XceiverClientReply(replyFuture);
}
- public XceiverClientReply sendCommandReadBlock(
- ContainerCommandRequestProto request, DatanodeDetails dn)
- throws IOException, InterruptedException {
-
- CompletableFuture<ContainerCommandResponseProto> future =
- new CompletableFuture<>();
- ContainerCommandResponseProto.Builder response =
- ContainerCommandResponseProto.newBuilder();
- ContainerProtos.ReadBlockResponseProto.Builder readBlock =
- ContainerProtos.ReadBlockResponseProto.newBuilder();
- checkOpen(dn);
- DatanodeID dnId = dn.getID();
- Type cmdType = request.getCmdType();
- semaphore.acquire();
- long requestTime = System.currentTimeMillis();
- metrics.incrPendingContainerOpsMetrics(cmdType);
-
- final StreamObserver<ContainerCommandRequestProto> requestObserver =
- asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS)
- .send(new StreamObserver<ContainerCommandResponseProto>() {
- @Override
- public void onNext(
- ContainerCommandResponseProto responseProto) {
- if (responseProto.getResult() == Result.SUCCESS) {
- readBlock.addReadChunk(responseProto.getReadChunk());
- } else {
- future.complete(
- ContainerCommandResponseProto.newBuilder(responseProto)
- .setCmdType(Type.ReadBlock).build());
- }
- }
-
- @Override
- public void onError(Throwable t) {
- future.completeExceptionally(t);
- metrics.decrPendingContainerOpsMetrics(cmdType);
- metrics.addContainerOpsLatency(
- cmdType, Time.monotonicNow() - requestTime);
- semaphore.release();
- }
-
- @Override
- public void onCompleted() {
- if (readBlock.getReadChunkCount() > 0) {
- future.complete(response.setReadBlock(readBlock)
-
.setCmdType(Type.ReadBlock).setResult(Result.SUCCESS).build());
- }
- if (!future.isDone()) {
- future.completeExceptionally(new IOException(
- "Stream completed but no reply for request " +
- processForDebug(request)));
- }
- metrics.decrPendingContainerOpsMetrics(cmdType);
- metrics.addContainerOpsLatency(
- cmdType, System.currentTimeMillis() - requestTime);
- semaphore.release();
- }
- });
- requestObserver.onNext(request);
- requestObserver.onCompleted();
- return new XceiverClientReply(future);
- }
-
private synchronized void checkOpen(DatanodeDetails dn)
throws IOException {
if (closed) {
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
index 6753f600a91..dc47a9edaca 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
@@ -17,7 +17,20 @@
package org.apache.hadoop.hdds.scm.storage;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.thirdparty.io.grpc.Status;
/**
* Abstract class used as an interface for input streams related to Ozone
@@ -26,6 +39,8 @@
public abstract class BlockExtendedInputStream extends ExtendedInputStream
implements PartInputStream {
+ private static final org.slf4j.Logger LOG =
org.slf4j.LoggerFactory.getLogger(BlockExtendedInputStream.class);
+
public abstract BlockID getBlockID();
@Override
@@ -38,4 +53,86 @@ public long getRemaining() {
@Override
public abstract long getPos();
+
+ protected Pipeline setPipeline(Pipeline pipeline) throws IOException {
+ if (pipeline == null) {
+ return null;
+ }
+ long replicaIndexes =
pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();
+
+ if (replicaIndexes > 1) {
+ throw new IOException(String.format("Pipeline: %s has nodes containing
different replica indexes.",
+ pipeline));
+ }
+
+ // irrespective of the container state, we will always read via Standalone
protocol.
+ boolean okForRead = pipeline.getType() ==
HddsProtos.ReplicationType.STAND_ALONE
+ || pipeline.getType() == HddsProtos.ReplicationType.EC;
+ return okForRead ? pipeline : pipeline.copyForRead();
+ }
+
+ protected boolean shouldRetryRead(IOException cause, RetryPolicy
retryPolicy, int retries) throws IOException {
+ RetryPolicy.RetryAction retryAction;
+ try {
+ retryAction = retryPolicy.shouldRetry(cause, retries, 0, true);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+ if (retryAction.delayMillis > 0) {
+ try {
+ LOG.debug("Retry read after {}ms", retryAction.delayMillis);
+ Thread.sleep(retryAction.delayMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ String msg = "Interrupted: action=" + retryAction.action + ", retry
policy=" + retryPolicy;
+ throw new IOException(msg, e);
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ protected RetryPolicy getReadRetryPolicy(OzoneClientConfig config) {
+ return HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
+ TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
+ }
+
+ protected void refreshBlockInfo(IOException cause, BlockID blockID,
AtomicReference<Pipeline> pipelineRef,
+ AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef,
Function<BlockID, BlockLocationInfo> refreshFunction)
+ throws IOException {
+ LOG.info("Attempting to update pipeline and block token for block {} from
pipeline {}: {}",
+ blockID, pipelineRef.get().getId(), cause.getMessage());
+ if (refreshFunction != null) {
+ LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
+ BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
+ if (blockLocationInfo == null) {
+ LOG.warn("No new block location info for block {}", blockID);
+ } else {
+ pipelineRef.set(setPipeline(blockLocationInfo.getPipeline()));
+ LOG.info("New pipeline for block {}: {}", blockID,
blockLocationInfo.getPipeline());
+
+ tokenRef.set(blockLocationInfo.getToken());
+ if (blockLocationInfo.getToken() != null) {
+ OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
+ tokenId.readFromByteArray(tokenRef.get().getIdentifier());
+ LOG.info("A new token is added for block {}. Expiry: {}", blockID,
+ Instant.ofEpochMilli(tokenId.getExpiryDate()));
+ }
+ }
+ } else {
+ throw cause;
+ }
+ }
+
+ /**
+ * Check if this exception is because datanodes are not reachable.
+ */
+ protected boolean isConnectivityIssue(IOException ex) {
+ return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
+ }
+
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index c9731bcc461..6f6b513422f 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -22,11 +22,9 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
@@ -35,19 +33,16 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.token.Token;
-import org.apache.ratis.thirdparty.io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,14 +118,12 @@ public BlockInputStream(
this.blockInfo = blockInfo;
this.blockID = blockInfo.getBlockID();
this.length = blockInfo.getLength();
- setPipeline(pipeline);
+ pipelineRef.set(setPipeline(pipeline));
tokenRef.set(token);
this.verifyChecksum = config.isChecksumVerify();
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
- this.retryPolicy =
- HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
- TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
+ this.retryPolicy = getReadRetryPolicy(config);
}
// only for unit tests
@@ -182,7 +175,7 @@ public synchronized void initialize() throws IOException {
}
catchEx = ex;
}
- } while (shouldRetryRead(catchEx));
+ } while (shouldRetryRead(catchEx, retryPolicy, ++retries));
if (chunks == null) {
throw catchEx;
@@ -215,37 +208,8 @@ public synchronized void initialize() throws IOException {
}
}
- /**
- * Check if this exception is because datanodes are not reachable.
- */
- private boolean isConnectivityIssue(IOException ex) {
- return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
- }
-
private void refreshBlockInfo(IOException cause) throws IOException {
- LOG.info("Attempting to update pipeline and block token for block {} from
pipeline {}: {}",
- blockID, pipelineRef.get().getId(), cause.getMessage());
- if (refreshFunction != null) {
- LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
- BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
- if (blockLocationInfo == null) {
- LOG.warn("No new block location info for block {}", blockID);
- } else {
- setPipeline(blockLocationInfo.getPipeline());
- LOG.info("New pipeline for block {}: {}", blockID,
- blockLocationInfo.getPipeline());
-
- tokenRef.set(blockLocationInfo.getToken());
- if (blockLocationInfo.getToken() != null) {
- OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
- tokenId.readFromByteArray(tokenRef.get().getIdentifier());
- LOG.info("A new token is added for block {}. Expiry: {}",
- blockID, Instant.ofEpochMilli(tokenId.getExpiryDate()));
- }
- }
- } else {
- throw cause;
- }
+ refreshBlockInfo(cause, blockID, pipelineRef, tokenRef, refreshFunction);
}
/**
@@ -277,26 +241,6 @@ protected BlockData getBlockDataUsingClient() throws
IOException {
return response.getBlockData();
}
- private void setPipeline(Pipeline pipeline) throws IOException {
- if (pipeline == null) {
- return;
- }
- long replicaIndexes =
pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();
-
- if (replicaIndexes > 1) {
- throw new IOException(String.format("Pipeline: %s has nodes containing
different replica indexes.",
- pipeline));
- }
-
- // irrespective of the container state, we will always read via Standalone
- // protocol.
- boolean okForRead =
- pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
- || pipeline.getType() == HddsProtos.ReplicationType.EC;
- Pipeline readPipeline = okForRead ? pipeline : pipeline.copyForRead();
- pipelineRef.set(readPipeline);
- }
-
private static void validate(ContainerCommandResponseProto response)
throws IOException {
if (!response.hasGetBlock()) {
@@ -382,14 +326,14 @@ protected synchronized int
readWithStrategy(ByteReaderStrategy strategy)
} catch (SCMSecurityException ex) {
throw ex;
} catch (StorageContainerException e) {
- if (shouldRetryRead(e)) {
+ if (shouldRetryRead(e, retryPolicy, ++retries)) {
handleReadError(e);
continue;
} else {
throw e;
}
} catch (IOException ex) {
- if (shouldRetryRead(ex)) {
+ if (shouldRetryRead(ex, retryPolicy, ++retries)) {
if (isConnectivityIssue(ex)) {
handleReadError(ex);
} else {
@@ -573,31 +517,6 @@ private synchronized void storePosition() {
blockPosition = getPos();
}
- private boolean shouldRetryRead(IOException cause) throws IOException {
- RetryPolicy.RetryAction retryAction;
- try {
- retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true);
- } catch (IOException e) {
- throw e;
- } catch (Exception e) {
- throw new IOException(e);
- }
- if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
- if (retryAction.delayMillis > 0) {
- try {
- LOG.debug("Retry read after {}ms", retryAction.delayMillis);
- Thread.sleep(retryAction.delayMillis);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- String msg = "Interrupted: action=" + retryAction.action + ", retry
policy=" + retryPolicy;
- throw new IOException(msg, e);
- }
- }
- return true;
- }
- return false;
- }
-
private void handleReadError(IOException cause) throws IOException {
releaseClient();
final List<ChunkInputStream> inputStreams = this.chunkStreams;
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
index 3db7fd8f660..cb2f80ca7c8 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
@@ -17,46 +17,36 @@
package org.apache.hadoop.hdds.scm.storage;
-import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamingReadResponse;
+import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
-import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.security.token.Token;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,31 +56,26 @@
*/
public class StreamBlockInputStream extends BlockExtendedInputStream
implements Seekable, CanUnbuffer, ByteBufferReadable {
- private static final Logger LOG =
- LoggerFactory.getLogger(StreamBlockInputStream.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamBlockInputStream.class);
+ private static final int EOF = -1;
+ private static final Throwable CANCELLED_EXCEPTION = new
Throwable("Cancelled by client");
+
private final BlockID blockID;
private final long blockLength;
- private final AtomicReference<Pipeline> pipelineRef =
- new AtomicReference<>();
- private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
- new AtomicReference<>();
+ private final AtomicReference<Pipeline> pipelineRef = new
AtomicReference<>();
+ private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
new AtomicReference<>();
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
- private List<Long> bufferOffsets;
- private int bufferIndex;
- private long blockPosition = -1;
- private List<ByteBuffer> buffers;
- // Checks if the StreamBlockInputStream has already read data from the
container.
- private boolean allocated = false;
- private long bufferOffsetWrtBlockData;
- private long buffersSize;
- private static final int EOF = -1;
- private final List<XceiverClientSpi.Validator> validators;
+ private ByteBuffer buffer;
+ private long position = 0;
+ private boolean initialized = false;
+ private StreamingReader streamingReader;
+
private final boolean verifyChecksum;
private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final RetryPolicy retryPolicy;
- private int retries;
+ private int retries = 0;
public StreamBlockInputStream(
BlockID blockID, long length, Pipeline pipeline,
@@ -100,17 +85,12 @@ public StreamBlockInputStream(
OzoneClientConfig config) throws IOException {
this.blockID = blockID;
this.blockLength = length;
- setPipeline(pipeline);
+ pipelineRef.set(setPipeline(pipeline));
tokenRef.set(token);
this.xceiverClientFactory = xceiverClientFactory;
- this.validators = ContainerProtocolCalls.toValidatorList(
- (request, response) -> validateBlock(response));
this.verifyChecksum = config.isChecksumVerify();
+ this.retryPolicy = getReadRetryPolicy(config);
this.refreshFunction = refreshFunction;
- this.retryPolicy =
- HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
- TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
-
}
@Override
@@ -125,170 +105,54 @@ public long getLength() {
@Override
public synchronized long getPos() {
- if (blockLength == 0) {
- return 0;
- }
- if (blockPosition >= 0) {
- return blockPosition;
- }
-
- if (buffersHaveData()) {
- // BufferOffset w.r.t to BlockData + BufferOffset w.r.t buffers +
- // Position of current Buffer
- return bufferOffsetWrtBlockData + bufferOffsets.get(bufferIndex) +
- buffers.get(bufferIndex).position();
- }
- if (allocated && !dataRemainingInBlock()) {
- Preconditions.checkState(
- bufferOffsetWrtBlockData + buffersSize == blockLength,
- "EOF detected but not at the last byte of the chunk");
- return blockLength;
- }
- if (buffersAllocated()) {
- return bufferOffsetWrtBlockData + buffersSize;
- }
- return 0;
+ return position;
}
@Override
public synchronized int read() throws IOException {
- int dataout = EOF;
- int len = 1;
- int available;
- while (len > 0) {
- try {
- acquireClient();
- available = prepareRead(1);
- retries = 0;
- } catch (SCMSecurityException ex) {
- throw ex;
- } catch (StorageContainerException e) {
- handleStorageContainerException(e);
- continue;
- } catch (IOException ioe) {
- handleIOException(ioe);
- continue;
- }
- if (available == EOF) {
- // There is no more data in the chunk stream. The buffers should have
- // been released by now
- Preconditions.checkState(buffers == null);
- } else {
- dataout = Byte.toUnsignedInt(buffers.get(bufferIndex).get());
- }
-
- len -= available;
- if (bufferEOF()) {
- releaseBuffers(bufferIndex);
- }
+ checkOpen();
+ if (!dataAvailableToRead()) {
+ return EOF;
}
-
-
- return dataout;
-
-
+ position++;
+ return buffer.get();
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
- // According to the JavaDocs for InputStream, it is recommended that
- // subclasses provide an override of bulk read if possible for performance
- // reasons. In addition to performance, we need to do it for correctness
- // reasons. The Ozone REST service uses PipedInputStream and
- // PipedOutputStream to relay HTTP response data between a Jersey thread
and
- // a Netty thread. It turns out that PipedInputStream/PipedOutputStream
- // have a subtle dependency (bug?) on the wrapped stream providing separate
- // implementations of single-byte read and bulk read. Without this, get
key
- // responses might close the connection before writing all of the bytes
- // advertised in the Content-Length.
- if (b == null) {
- throw new NullPointerException();
- }
- if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- }
- if (len == 0) {
- return 0;
- }
- int total = 0;
- int available;
- while (len > 0) {
- try {
- acquireClient();
- available = prepareRead(len);
- retries = 0;
- } catch (SCMSecurityException ex) {
- throw ex;
- } catch (StorageContainerException e) {
- handleStorageContainerException(e);
- continue;
- } catch (IOException ioe) {
- handleIOException(ioe);
- continue;
- }
- if (available == EOF) {
- // There is no more data in the block stream. The buffers should have
- // been released by now
- Preconditions.checkState(buffers == null);
- return total != 0 ? total : EOF;
- }
- buffers.get(bufferIndex).get(b, off + total, available);
- len -= available;
- total += available;
+ ByteBuffer tmpBuffer = ByteBuffer.wrap(b, off, len);
+ return read(tmpBuffer);
+ }
- if (bufferEOF()) {
- releaseBuffers(bufferIndex);
+ @Override
+ public synchronized int read(ByteBuffer targetBuf) throws IOException {
+ checkOpen();
+ int read = 0;
+ while (targetBuf.hasRemaining()) {
+ if (!dataAvailableToRead()) {
+ break;
}
+ int toCopy = Math.min(buffer.remaining(), targetBuf.remaining());
+ ByteBuffer tmpBuf = buffer.duplicate();
+ tmpBuf.limit(tmpBuf.position() + toCopy);
+ targetBuf.put(tmpBuf);
+ buffer.position(tmpBuf.position());
+ position += toCopy;
+ read += toCopy;
}
- return total;
-
+ return read > 0 ? read : EOF;
}
- @Override
- public synchronized int read(ByteBuffer byteBuffer) throws IOException {
- if (byteBuffer == null) {
- throw new NullPointerException();
- }
- int len = byteBuffer.remaining();
- if (len == 0) {
- return 0;
+ private boolean dataAvailableToRead() throws IOException {
+ if (position >= blockLength) {
+ return false;
}
- int total = 0;
- int available;
- while (len > 0) {
- try {
- acquireClient();
- available = prepareRead(len);
- retries = 0;
- } catch (SCMSecurityException ex) {
- throw ex;
- } catch (StorageContainerException e) {
- handleStorageContainerException(e);
- continue;
- } catch (IOException ioe) {
- handleIOException(ioe);
- continue;
- }
- if (available == EOF) {
- // There is no more data in the block stream. The buffers should have
- // been released by now
- Preconditions.checkState(buffers == null);
- return total != 0 ? total : EOF;
- }
- ByteBuffer readBuf = buffers.get(bufferIndex);
- ByteBuffer tmpBuf = readBuf.duplicate();
- tmpBuf.limit(tmpBuf.position() + available);
- byteBuffer.put(tmpBuf);
- readBuf.position(tmpBuf.position());
-
- len -= available;
- total += available;
-
- if (bufferEOF()) {
- releaseBuffers(bufferIndex);
- }
+ initialize();
+ if (buffer == null || buffer.remaining() == 0) {
+ int loaded = fillBuffer();
+ return loaded != EOF;
}
- return total;
+ return true;
}
@Override
@@ -298,62 +162,44 @@ protected int readWithStrategy(ByteReaderStrategy
strategy) throws IOException {
@Override
public synchronized void seek(long pos) throws IOException {
- if (pos == 0 && blockLength == 0) {
- // It is possible for length and pos to be zero in which case
- // seek should return instead of throwing exception
- return;
+ checkOpen();
+ if (pos < 0) {
+ throw new IOException("Cannot seek to negative offset");
}
- if (pos < 0 || pos > blockLength) {
- throw new EOFException("EOF encountered at pos: " + pos + " for block: "
+ blockID);
+ if (pos > blockLength) {
+ throw new IOException("Cannot seek after the end of the block");
}
-
- if (buffersHavePosition(pos)) {
- // The bufferPosition is w.r.t the current block.
- // Adjust the bufferIndex and position to the seeked position.
- adjustBufferPosition(pos - bufferOffsetWrtBlockData);
- } else {
- blockPosition = pos;
+ if (pos == position) {
+ return;
}
+ closeStream();
+ position = pos;
}
@Override
+ // The seekable interface indicates that seekToNewSource should seek to a
new source of the data,
+ // ie a different datanode. This is not supported for now.
public synchronized boolean seekToNewSource(long l) throws IOException {
return false;
}
@Override
public synchronized void unbuffer() {
- blockPosition = getPos();
releaseClient();
- releaseBuffers();
}
- private void setPipeline(Pipeline pipeline) throws IOException {
- if (pipeline == null) {
- return;
- }
- long replicaIndexes =
pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count();
-
- if (replicaIndexes > 1) {
- throw new IOException(String.format("Pipeline: %s has nodes containing
different replica indexes.",
- pipeline));
+ private void closeStream() {
+ if (streamingReader != null) {
+ streamingReader.cancel();
+ streamingReader = null;
}
-
- // irrespective of the container state, we will always read via Standalone
- // protocol.
- boolean okForRead =
- pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
- || pipeline.getType() == HddsProtos.ReplicationType.EC;
- Pipeline readPipeline = okForRead ? pipeline :
pipeline.copyForRead().toBuilder()
- .setReplicationConfig(StandaloneReplicationConfig.getInstance(
- getLegacyFactor(pipeline.getReplicationConfig())))
- .build();
- pipelineRef.set(readPipeline);
+ initialized = false;
+ buffer = null;
}
protected synchronized void checkOpen() throws IOException {
if (xceiverClientFactory == null) {
- throw new IOException("StreamBlockInputStream has been closed.");
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED + " Block: "
+ blockID);
}
}
@@ -364,384 +210,234 @@ protected synchronized void acquireClient() throws
IOException {
try {
xceiverClient =
xceiverClientFactory.acquireClientForReadData(pipeline);
} catch (IOException ioe) {
- LOG.warn("Failed to acquire client for pipeline {}, block {}",
- pipeline, blockID);
+ LOG.warn("Failed to acquire client for pipeline {}, block {}",
pipeline, blockID);
throw ioe;
}
}
}
- private synchronized int prepareRead(int len) throws IOException {
- for (;;) {
- if (blockPosition >= 0) {
- if (buffersHavePosition(blockPosition)) {
- // The current buffers have the seeked position. Adjust the buffer
- // index and position to point to the buffer position.
- adjustBufferPosition(blockPosition - bufferOffsetWrtBlockData);
- } else {
- // Read a required block data to fill the buffers with seeked
- // position data
- readDataFromContainer(len);
- }
+ private void initialize() throws IOException {
+ if (initialized) {
+ return;
+ }
+ while (true) {
+ try {
+ acquireClient();
+ streamingReader = new StreamingReader();
+ ContainerProtocolCalls.readBlock(xceiverClient, position, blockID,
tokenRef.get(),
+ pipelineRef.get().getReplicaIndexes(), streamingReader);
+ initialized = true;
+ return;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ handleExceptions(new IOException("Interrupted", ie));
+ } catch (IOException ioe) {
+ handleExceptions(ioe);
}
- if (buffersHaveData()) {
- // Data is available from buffers
- ByteBuffer bb = buffers.get(bufferIndex);
- return Math.min(len, bb.remaining());
- } else if (dataRemainingInBlock()) {
- // There is more data in the block stream which has not
- // been read into the buffers yet.
- readDataFromContainer(len);
+ }
+ }
+
+ private void handleExceptions(IOException cause) throws IOException {
+ if (cause instanceof StorageContainerException ||
isConnectivityIssue(cause)) {
+ if (shouldRetryRead(cause, retryPolicy, retries++)) {
+ releaseClient();
+ refreshBlockInfo(cause);
+ LOG.warn("Refreshing block data to read block {} due to {}", blockID,
cause.getMessage());
} else {
- // All available input from this block stream has been consumed.
- return EOF;
+ throw cause;
}
+ } else {
+ throw cause;
}
+ }
-
+ private int fillBuffer() throws IOException {
+ if (!streamingReader.hasNext()) {
+ return EOF;
+ }
+ buffer = streamingReader.readNext();
+ return buffer == null ? EOF : buffer.limit();
}
- private boolean buffersHavePosition(long pos) {
- // Check if buffers have been allocated
- if (buffersAllocated()) {
- // Check if the current buffers cover the input position
- // Released buffers should not be considered when checking if position
- // is available
- return pos >= bufferOffsetWrtBlockData +
- bufferOffsets.get(0) &&
- pos < bufferOffsetWrtBlockData + buffersSize;
+ protected synchronized void releaseClient() {
+ if (xceiverClientFactory != null && xceiverClient != null) {
+ closeStream();
+ xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
+ xceiverClient = null;
}
- return false;
}
- /**
- * Check if the buffers have been allocated data and false otherwise.
- */
- @VisibleForTesting
- protected boolean buffersAllocated() {
- return buffers != null && !buffers.isEmpty();
+ @Override
+ public synchronized void close() throws IOException {
+ releaseClient();
+ xceiverClientFactory = null;
}
- /**
- * Adjust the buffers position to account for seeked position and/ or
checksum
- * boundary reads.
- * @param bufferPosition the position to which the buffers must be advanced
- */
- private void adjustBufferPosition(long bufferPosition) {
- // The bufferPosition is w.r.t the current buffers.
- // Adjust the bufferIndex and position to the seeked bufferPosition.
- bufferIndex = Collections.binarySearch(bufferOffsets, bufferPosition);
- // bufferIndex is negative if bufferPosition isn't found in bufferOffsets
- // count (bufferIndex = -bufferIndex - 2) to get bufferPosition is between
which offsets.
- if (bufferIndex < 0) {
- bufferIndex = -bufferIndex - 2;
- }
+ private void refreshBlockInfo(IOException cause) throws IOException {
+ refreshBlockInfo(cause, blockID, pipelineRef, tokenRef, refreshFunction);
+ }
- buffers.get(bufferIndex).position(
- (int) (bufferPosition - bufferOffsets.get(bufferIndex)));
-
- // Reset buffers > bufferIndex to position 0. We do this to reset any
- // previous reads/ seeks which might have updated any buffer position.
- // For buffers < bufferIndex, we do not need to reset the position as it
- // not required for this read. If a seek was done to a position in the
- // previous indices, the buffer position reset would be performed in the
- // seek call.
- for (int i = bufferIndex + 1; i < buffers.size(); i++) {
- buffers.get(i).position(0);
+ private synchronized void releaseStreamResources(StreamingReadResponse
response) {
+ if (xceiverClient != null) {
+ xceiverClient.completeStreamRead(response);
}
-
- // Reset the blockPosition as chunk stream has been initialized i.e. the
- // buffers have been allocated.
- blockPosition = -1;
}
/**
- * Reads full or partial Chunk from DN Container based on the current
- * position of the ChunkInputStream, the number of bytes of data to read
- * and the checksum boundaries.
- * If successful, then the read data in saved in the buffers so that
- * subsequent read calls can utilize it.
- * @param len number of bytes of data to be read
- * @throws IOException if there is an I/O error while performing the call
- * to Datanode
+ * Implementation of a StreamObserver used to received and buffer streaming
GRPC reads.
*/
- private synchronized void readDataFromContainer(int len) throws IOException {
- // index of first byte to be read from the block
- long startByteIndex;
- if (blockPosition >= 0) {
- // If seek operation was called to advance the buffer position, the
- // chunk should be read from that position onwards.
- startByteIndex = blockPosition;
- } else {
- // Start reading the block from the last blockPosition onwards.
- startByteIndex = bufferOffsetWrtBlockData + buffersSize;
- }
+ public class StreamingReader implements StreamingReaderSpi {
- // bufferOffsetWrtChunkData and buffersSize are updated after the data
- // is read from Container and put into the buffers, but if read fails
- // and is retried, we need the previous position. Position is reset after
- // successful read in adjustBufferPosition()
- blockPosition = getPos();
- bufferOffsetWrtBlockData = readData(startByteIndex, len);
- long tempOffset = 0L;
- buffersSize = 0L;
- bufferOffsets = new ArrayList<>(buffers.size());
- for (ByteBuffer buffer : buffers) {
- bufferOffsets.add(tempOffset);
- tempOffset += buffer.limit();
- buffersSize += buffer.limit();
+ private final BlockingQueue<ContainerProtos.ReadBlockResponseProto>
responseQueue = new LinkedBlockingQueue<>(1);
+ private final AtomicBoolean completed = new AtomicBoolean(false);
+ private final AtomicBoolean failed = new AtomicBoolean(false);
+ private final AtomicBoolean semaphoreReleased = new AtomicBoolean(false);
+ private final AtomicReference<Throwable> error = new AtomicReference<>();
+ private volatile StreamingReadResponse response;
+ public boolean hasNext() {
+ return !responseQueue.isEmpty() || !completed.get();
}
- bufferIndex = 0;
- allocated = true;
- adjustBufferPosition(startByteIndex - bufferOffsetWrtBlockData);
- }
+ public ByteBuffer readNext() throws IOException {
+ if (failed.get()) {
+ Throwable cause = error.get();
+ throw new IOException("Streaming read failed", cause);
+ }
- @VisibleForTesting
- protected long readData(long startByteIndex, long len)
- throws IOException {
- Pipeline pipeline = pipelineRef.get();
- buffers = new ArrayList<>();
- ReadBlockResponseProto response =
- ContainerProtocolCalls.readBlock(xceiverClient, startByteIndex,
- len, blockID, validators, tokenRef.get(),
pipeline.getReplicaIndexes(), verifyChecksum);
- List<ReadChunkResponseProto> readBlocks = response.getReadChunkList();
-
- for (ReadChunkResponseProto readBlock : readBlocks) {
- if (readBlock.hasDataBuffers()) {
- buffers.addAll(BufferUtils.getReadOnlyByteBuffers(
- readBlock.getDataBuffers().getBuffersList()));
- } else {
- throw new IOException("Unexpected error while reading chunk data " +
- "from container. No data returned.");
+ if (completed.get() && responseQueue.isEmpty()) {
+ return null; // Stream ended
}
- }
- return response.getReadChunk(0)
- .getChunkData().getOffset();
- }
- /**
- * Check if the buffers have any data remaining between the current
- * position and the limit.
- */
- private boolean buffersHaveData() {
- boolean hasData = false;
- if (buffersAllocated()) {
- int buffersLen = buffers.size();
- while (bufferIndex < buffersLen) {
- ByteBuffer buffer = buffers.get(bufferIndex);
- if (buffer != null && buffer.hasRemaining()) {
- // current buffer has data
- hasData = true;
- break;
+ ReadBlockResponseProto readBlock;
+ try {
+ readBlock = responseQueue.poll(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for response", e);
+ }
+ if (readBlock == null) {
+ if (failed.get()) {
+ Throwable cause = error.get();
+ throw new IOException("Streaming read failed", cause);
+ } else if (completed.get()) {
+ return null; // Stream ended
} else {
- if (bufferIndex < buffersLen - 1) {
- // move to next available buffer
- ++bufferIndex;
- Preconditions.checkState(bufferIndex < buffers.size());
- } else {
- // no more buffers remaining
- break;
- }
+ throw new IOException("Timed out waiting for response");
}
}
+ // The server always returns data starting from the last checksum
boundary. Therefore if the reader position is
+ // ahead of the position we received from the server, we need to adjust
the buffer position accordingly.
+ // If the reader position is behind
+ ByteBuffer buf = readBlock.getData().asReadOnlyByteBuffer();
+ long blockOffset = readBlock.getOffset();
+ long pos = getPos();
+ if (pos < blockOffset) {
+ // This should not happen, and if it does, we have a bug.
+ throw new IOException("Received data out of order. Position is " + pos
+ " but received data at "
+ + blockOffset);
+ }
+ if (pos > readBlock.getOffset()) {
+ int offset = (int)(pos - readBlock.getOffset());
+ buf.position(offset);
+ }
+ return buf;
}
- return hasData;
- }
-
- /**
- * Check if there is more data in the chunk which has not yet been read
- * into the buffers.
- */
- private boolean dataRemainingInBlock() {
- long bufferPos;
- if (blockPosition >= 0) {
- bufferPos = blockPosition;
- } else {
- bufferPos = bufferOffsetWrtBlockData + buffersSize;
+ private void releaseResources() {
+ boolean wasNotYetComplete = semaphoreReleased.getAndSet(true);
+ if (wasNotYetComplete) {
+ releaseStreamResources(response);
+ }
}
- return bufferPos < blockLength;
- }
-
- /**
- * Check if current buffer had been read till the end.
- */
- private boolean bufferEOF() {
- return allocated && buffersAllocated() &&
!buffers.get(bufferIndex).hasRemaining();
- }
-
- /**
- * Release the buffers upto the given index.
- * @param releaseUptoBufferIndex bufferIndex (inclusive) upto which the
- * buffers must be released
- */
- private void releaseBuffers(int releaseUptoBufferIndex) {
- int buffersLen = buffers.size();
- if (releaseUptoBufferIndex == buffersLen - 1) {
- // Before releasing all the buffers, if block EOF is not reached, then
- // blockPosition should be set to point to the last position of the
- // buffers. This should be done so that getPos() can return the current
- // block position
- blockPosition = bufferOffsetWrtBlockData +
- bufferOffsets.get(releaseUptoBufferIndex) +
- buffers.get(releaseUptoBufferIndex).capacity();
- // Release all the buffers
- releaseBuffers();
- } else {
- buffers = buffers.subList(releaseUptoBufferIndex + 1, buffersLen);
- bufferOffsets = bufferOffsets.subList(
- releaseUptoBufferIndex + 1, buffersLen);
- bufferIndex = 0;
+ @Override
+ public void onNext(ContainerProtos.ContainerCommandResponseProto
containerCommandResponseProto) {
+ try {
+ ReadBlockResponseProto readBlock =
containerCommandResponseProto.getReadBlock();
+ ByteBuffer data = readBlock.getData().asReadOnlyByteBuffer();
+ if (verifyChecksum) {
+ ChecksumData checksumData =
ChecksumData.getFromProtoBuf(readBlock.getChecksumData());
+ Checksum.verifyChecksum(data, checksumData, 0);
+ }
+ offerToQueue(readBlock);
+ } catch (OzoneChecksumException e) {
+ LOG.warn("Checksum verification failed for block {} from datanode {}",
+ getBlockID(), response.getDatanodeDetails(), e);
+ cancelDueToError(e);
+ }
}
- }
-
- /**
- * If EOF is reached, release the buffers.
- */
- private void releaseBuffers() {
- buffers = null;
- bufferIndex = 0;
- // We should not reset bufferOffsetWrtBlockData and buffersSize here
- // because when getPos() is called we use these
- // values and determine whether chunk is read completely or not.
- }
- protected synchronized void releaseClient() {
- if (xceiverClientFactory != null && xceiverClient != null) {
- xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
- xceiverClient = null;
+ @Override
+ public void onError(Throwable throwable) {
+ if (throwable instanceof StatusRuntimeException) {
+ if (((StatusRuntimeException)
throwable).getStatus().getCode().name().equals("CANCELLED")) {
+ // This is expected when the client cancels the stream.
+ setCompleted();
+ }
+ } else {
+ setFailed(throwable);
+ }
+ releaseResources();
}
- }
-
- private void validateBlock(
- ContainerProtos.ContainerCommandResponseProto response
- ) throws IOException {
- ReadBlockResponseProto readBlock = response.getReadBlock();
- for (ReadChunkResponseProto readChunk : readBlock.getReadChunkList()) {
- List<ByteString> byteStrings;
+ @Override
+ public void onCompleted() {
+ setCompleted();
+ releaseResources();
+ }
- ContainerProtos.ChunkInfo chunkInfo =
- readChunk.getChunkData();
- if (chunkInfo.getLen() <= 0) {
- throw new IOException("Failed to get chunk: chunkName == "
- + chunkInfo.getChunkName() + "len == " + chunkInfo.getLen());
- }
- byteStrings = readChunk.getDataBuffers().getBuffersList();
- long buffersLen = BufferUtils.getBuffersLen(byteStrings);
- if (buffersLen != chunkInfo.getLen()) {
- // Bytes read from chunk should be equal to chunk size.
- throw new OzoneChecksumException(String.format(
- "Inconsistent read for chunk=%s len=%d bytesRead=%d",
- chunkInfo.getChunkName(), chunkInfo.getLen(),
- buffersLen));
+ /**
+ * By calling cancel, the client will send a cancel signal to the server,
which will stop sending more data and
+ * cause the onError() to be called in this observer with a CANCELLED
exception.
+ */
+ public void cancel() {
+ if (response != null && response.getRequestObserver() != null) {
+ response.getRequestObserver().cancel("Cancelled by client",
CANCELLED_EXCEPTION);
+ setCompleted();
+ releaseResources();
}
+ }
-
- if (verifyChecksum) {
- ChecksumData checksumData = ChecksumData.getFromProtoBuf(
- chunkInfo.getChecksumData());
- int startIndex = (int) readChunk.getChunkData().getOffset() /
checksumData.getBytesPerChecksum();
-
- // ChecksumData stores checksum for each 'numBytesPerChecksum'
- // number of bytes in a list. Compute the index of the first
- // checksum to match with the read data
-
- Checksum.verifyChecksum(byteStrings, checksumData, startIndex);
+ public void cancelDueToError(Throwable exception) {
+ if (response != null && response.getRequestObserver() != null) {
+ response.getRequestObserver().onError(exception);
+ setFailed(exception);
+ releaseResources();
}
}
- }
-
- @VisibleForTesting
- protected synchronized void setBuffers(List<ByteBuffer> buffers) {
- this.buffers = buffers;
- }
- private boolean shouldRetryRead(IOException cause) throws IOException {
- RetryPolicy.RetryAction retryAction;
- try {
- retryAction = retryPolicy.shouldRetry(cause, ++retries, 0, true);
- } catch (IOException e) {
- throw e;
- } catch (Exception e) {
- throw new IOException(e);
+ private void setFailed(Throwable throwable) {
+ if (completed.get()) {
+ throw new IllegalArgumentException("Cannot mark a completed stream as
failed");
+ }
+ failed.set(true);
+ error.set(throwable);
}
- return retryAction.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
- }
- @VisibleForTesting
- public boolean isVerifyChecksum() {
- return verifyChecksum;
- }
+ private void setCompleted() {
+ if (!failed.get()) {
+ completed.set(true);
+ }
+ }
- private void refreshBlockInfo(IOException cause) throws IOException {
- LOG.info("Attempting to update pipeline and block token for block {} from
pipeline {}: {}",
- blockID, pipelineRef.get().getId(), cause.getMessage());
- if (refreshFunction != null) {
- LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
- BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
- if (blockLocationInfo == null) {
- LOG.warn("No new block location info for block {}", blockID);
- } else {
- setPipeline(blockLocationInfo.getPipeline());
- LOG.info("New pipeline for block {}: {}", blockID,
- blockLocationInfo.getPipeline());
-
- tokenRef.set(blockLocationInfo.getToken());
- if (blockLocationInfo.getToken() != null) {
- OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
- tokenId.readFromByteArray(tokenRef.get().getIdentifier());
- LOG.info("A new token is added for block {}. Expiry: {}",
- blockID, Instant.ofEpochMilli(tokenId.getExpiryDate()));
+ private void offerToQueue(ReadBlockResponseProto item) {
+ while (!completed.get() && !failed.get()) {
+ try {
+ if (responseQueue.offer(item, 100, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
}
}
- } else {
- throw cause;
}
- }
-
- @VisibleForTesting
- public synchronized ByteBuffer[] getCachedBuffers() {
- return buffers == null ? null :
- BufferUtils.getReadOnlyByteBuffers(buffers.toArray(new ByteBuffer[0]));
- }
- /**
- * Check if this exception is because datanodes are not reachable.
- */
- private boolean isConnectivityIssue(IOException ex) {
- return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
- }
-
- @Override
- public synchronized void close() throws IOException {
- releaseClient();
- releaseBuffers();
- xceiverClientFactory = null;
- }
-
- private void handleStorageContainerException(StorageContainerException e)
throws IOException {
- if (shouldRetryRead(e)) {
- releaseClient();
- refreshBlockInfo(e);
- } else {
- throw e;
+ @Override
+ public void setStreamingReadResponse(StreamingReadResponse
streamingReadResponse) {
+ response = streamingReadResponse;
}
}
- private void handleIOException(IOException ioe) throws IOException {
- if (shouldRetryRead(ioe)) {
- if (isConnectivityIssue(ioe)) {
- releaseClient();
- refreshBlockInfo(ioe);
- } else {
- releaseClient();
- }
- } else {
- throw ioe;
- }
- }
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 9249abaf42a..2765e2a00a8 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -90,9 +90,8 @@ public BlockExtendedInputStream create(ReplicationConfig
repConfig,
blockInfo, xceiverFactory, refreshFunction,
ecBlockStreamFactory, config);
} else if (config.isStreamReadBlock() &&
allDataNodesSupportStreamBlock(pipeline)) {
- return new StreamBlockInputStream(
- blockInfo.getBlockID(), blockInfo.getLength(),
- pipeline, token, xceiverFactory, refreshFunction, config);
+ return new StreamBlockInputStream(blockInfo.getBlockID(),
blockInfo.getLength(), pipeline, token, xceiverFactory,
+ refreshFunction, config);
} else {
return new BlockInputStream(blockInfo,
pipeline, token, xceiverFactory, refreshFunction,
@@ -104,8 +103,7 @@ private boolean allDataNodesSupportStreamBlock(Pipeline
pipeline) {
// return true only if all DataNodes in the pipeline are on a version
// that supports for reading a block by streaming chunks..
for (DatanodeDetails dn : pipeline.getNodes()) {
- if (dn.getCurrentVersion() <
- STREAM_BLOCK_SUPPORT.toProtoValue()) {
+ if (dn.getCurrentVersion() < STREAM_BLOCK_SUPPORT.toProtoValue()) {
return false;
}
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
deleted file mode 100644
index e141845954d..00000000000
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyStreamBlockInputStream.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.storage;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import org.apache.hadoop.hdds.client.BlockID;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.utils.BufferUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-
-/**
- * A dummy StreamBlockInputStream to mock read block call to DN.
- */
-class DummyStreamBlockInputStream extends StreamBlockInputStream {
-
- private final List<ByteString> readByteBuffers = new ArrayList<>();
- private final List<ChunkInfo> chunks;
- private final long[] chunkOffsets;
- private final Map<String, byte[]> chunkDataMap;
-
- @SuppressWarnings("parameternumber")
- DummyStreamBlockInputStream(
- BlockID blockId,
- long blockLen,
- Pipeline pipeline,
- Token<OzoneBlockTokenIdentifier> token,
- XceiverClientFactory xceiverClientManager,
- Function<BlockID, BlockLocationInfo> refreshFunction,
- OzoneClientConfig config,
- List<ChunkInfo> chunks,
- Map<String, byte[]> chunkDataMap) throws IOException {
- super(blockId, blockLen, pipeline, token, xceiverClientManager,
- refreshFunction, config);
- this.chunks = chunks;
- this.chunkDataMap = chunkDataMap;
- chunkOffsets = new long[chunks.size()];
- long temp = 0;
- for (int i = 0; i < chunks.size(); i++) {
- chunkOffsets[i] = temp;
- temp += chunks.get(i).getLen();
- }
- }
-
- @Override
- protected synchronized void checkOpen() throws IOException {
- // No action needed
- }
-
- @Override
- protected void acquireClient() {
- // No action needed
- }
-
- @Override
- protected void releaseClient() {
- // no-op
- }
-
- @Override
- protected long readData(long offset, long len) {
- int chunkIndex = Arrays.binarySearch(chunkOffsets, offset);
- if (chunkIndex < 0) {
- chunkIndex = -chunkIndex - 2;
- }
- ChunkInfo chunkInfo = chunks.get(chunkIndex);
- readByteBuffers.clear();
- long chunkOffset = offset - chunkInfo.getOffset();
- if (isVerifyChecksum()) {
- ChecksumData checksumData = ChecksumData.getFromProtoBuf(
- chunkInfo.getChecksumData());
- int bytesPerChecksum = checksumData.getBytesPerChecksum();
- chunkOffset = (chunkOffset / bytesPerChecksum) * bytesPerChecksum;
- }
- long bufferOffsetWrtBlockDataData = chunkOffsets[chunkIndex] + chunkOffset;
- while (len > 0) {
- ChunkInfo currentChunk = chunks.get(chunkIndex);
- int bufferCapacity =
currentChunk.getChecksumData().getBytesPerChecksum();
- long chunkLen = currentChunk.getLen();
- long remainingToRead = Math.min(chunkLen, len);
- if (isVerifyChecksum()) {
- if (len < chunkLen) {
- final ChecksumData checksumData =
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
- final long endByteIndex = len - 1;
- final int bytesPerChecksum = checksumData.getBytesPerChecksum();
- remainingToRead = (endByteIndex / bytesPerChecksum + 1) *
bytesPerChecksum;
- } else {
- remainingToRead = chunkLen;
- }
- }
-
- long bufferLen;
- while (remainingToRead > 0) {
- if (remainingToRead < bufferCapacity) {
- bufferLen = remainingToRead;
- } else {
- bufferLen = bufferCapacity;
- }
- ByteString byteString =
ByteString.copyFrom(chunkDataMap.get(chunks.get(chunkIndex).getChunkName()),
- (int) chunkOffset, (int) bufferLen);
-
- readByteBuffers.add(byteString);
-
- chunkOffset += bufferLen;
- remainingToRead -= bufferLen;
- len -= bufferLen;
- }
- chunkOffset = 0;
- chunkIndex++;
- }
- setBuffers(BufferUtils.getReadOnlyByteBuffers(readByteBuffers));
- return bufferOffsetWrtBlockDataData;
- }
-
- public List<ByteString> getReadByteBuffers() {
- return readByteBuffers;
- }
-}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 2e9a84cad42..1b5e7667e84 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -267,8 +267,8 @@ public void testSeekAndRead() throws Exception {
@Test
public void testRefreshPipelineFunction() throws Exception {
- LogCapturer logCapturer = LogCapturer.captureLogs(BlockInputStream.class);
- GenericTestUtils.setLogLevel(BlockInputStream.class, Level.DEBUG);
+ LogCapturer logCapturer =
LogCapturer.captureLogs(BlockExtendedInputStream.class);
+ GenericTestUtils.setLogLevel(BlockExtendedInputStream.class, Level.DEBUG);
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
AtomicBoolean isRefreshed = new AtomicBoolean();
createChunkList(5);
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
index 81cb6d4d62c..b2cb3fb865c 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java
@@ -17,248 +17,338 @@
package org.apache.hadoop.hdds.scm.storage;
-import static org.assertj.core.api.Assertions.assertThat;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-
-import com.google.common.primitives.Bytes;
-import java.io.EOFException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.function.Function;
+import java.util.stream.Stream;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamingReadResponse;
+import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+import org.apache.ratis.thirdparty.io.grpc.StatusException;
+import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.invocation.InvocationOnMock;
/**
* Tests for {@link TestStreamBlockInputStream}'s functionality.
*/
public class TestStreamBlockInputStream {
- private int blockSize;
- private static final int CHUNK_SIZE = 100;
- private static final int BYTES_PER_CHECKSUM = 20;
- private static final Random RANDOM = new Random();
- private DummyStreamBlockInputStream blockStream;
- private byte[] blockData;
- private List<ChunkInfo> chunks;
- private Map<String, byte[]> chunkDataMap;
+ private static final int BYTES_PER_CHECKSUM = 1024;
+ private static final int BLOCK_SIZE = 1024;
+ private StreamBlockInputStream blockStream;
+ private final OzoneConfiguration conf = new OzoneConfiguration();
+ private XceiverClientFactory xceiverClientFactory;
+ private XceiverClientGrpc xceiverClient;
private Checksum checksum;
- private BlockID blockID;
- private static final String CHUNK_NAME = "chunk-";
- private OzoneConfiguration conf = new OzoneConfiguration();
+ private ChecksumData checksumData;
+ private byte[] data;
+ private
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto>
requestObserver;
+ private Function<BlockID, BlockLocationInfo> refreshFunction;
@BeforeEach
public void setup() throws Exception {
+ Token<OzoneBlockTokenIdentifier> token = mock(Token.class);
+ when(token.encodeToUrlString()).thenReturn("url");
+
+ Set<HddsProtos.BlockTokenSecretProto.AccessModeProto> modes =
+
Collections.singleton(HddsProtos.BlockTokenSecretProto.AccessModeProto.READ);
+ OzoneBlockTokenIdentifier tokenIdentifier = new
OzoneBlockTokenIdentifier("owner", new BlockID(1, 1),
+ modes, Time.monotonicNow() + 10000, 10);
+ tokenIdentifier.setSecretKeyId(UUID.randomUUID());
+ when(token.getIdentifier()).thenReturn(tokenIdentifier.getBytes());
+ Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+
+ BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class);
+ when(blockLocationInfo.getPipeline()).thenReturn(pipeline);
+ when(blockLocationInfo.getToken()).thenReturn(token);
+
+ xceiverClient = mock(XceiverClientGrpc.class);
+ when(xceiverClient.getPipeline()).thenReturn(pipeline);
+ xceiverClientFactory = mock(XceiverClientFactory.class);
+ when(xceiverClientFactory.acquireClientForReadData(any()))
+ .thenReturn(xceiverClient);
+ requestObserver = mock(ClientCallStreamObserver.class);
+
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setStreamReadBlock(true);
- blockID = new BlockID(new ContainerBlockID(1, 1));
+ clientConfig.setMaxReadRetryCount(1);
+ refreshFunction = mock(Function.class);
+ when(refreshFunction.apply(any())).thenReturn(blockLocationInfo);
+ BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
checksum = new Checksum(ChecksumType.CRC32, BYTES_PER_CHECKSUM);
- createChunkList(5);
-
- Pipeline pipeline = MockPipeline.createSingleNodePipeline();
- blockStream = new DummyStreamBlockInputStream(blockID, blockSize, pipeline,
- null, null, mock(Function.class), clientConfig, chunks, chunkDataMap);
+ createDataAndChecksum();
+ blockStream = new StreamBlockInputStream(blockID, BLOCK_SIZE, pipeline,
+ token, xceiverClientFactory, refreshFunction, clientConfig);
}
- /**
- * Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
- * and the last chunk with length CHUNK_SIZE/2.
- */
- private void createChunkList(int numChunks)
- throws Exception {
-
- chunks = new ArrayList<>(numChunks);
- chunkDataMap = new HashMap<>();
- blockData = new byte[0];
- int i, chunkLen;
- byte[] byteData;
- String chunkName;
-
- for (i = 0; i < numChunks; i++) {
- chunkName = CHUNK_NAME + i;
- chunkLen = CHUNK_SIZE;
- if (i == numChunks - 1) {
- chunkLen = CHUNK_SIZE / 2;
+ @AfterEach
+ public void teardown() {
+ if (blockStream != null) {
+ try {
+ blockStream.close();
+ } catch (IOException e) {
+ // ignore
}
- byteData = generateRandomData(chunkLen);
- ChunkInfo chunkInfo = ChunkInfo.newBuilder()
- .setChunkName(chunkName)
- .setOffset(0)
- .setLen(chunkLen)
- .setChecksumData(checksum.computeChecksum(
- byteData, 0, chunkLen).getProtoBufMessage())
- .build();
-
- chunkDataMap.put(chunkName, byteData);
- chunks.add(chunkInfo);
-
- blockSize += chunkLen;
- blockData = Bytes.concat(blockData, byteData);
}
}
- static byte[] generateRandomData(int length) {
- byte[] bytes = new byte[length];
- RANDOM.nextBytes(bytes);
- return bytes;
+ @Test
+ public void testCloseStreamReleasesResources() throws IOException,
InterruptedException {
+ setupSuccessfulRead();
+ assertEquals(data[0], blockStream.read());
+ blockStream.close();
+ // Verify that cancel() was called on the requestObserver mock
+ verify(requestObserver).cancel(any(), any());
+ // Verify that release() was called on the xceiverClient mock
+ verify(xceiverClientFactory).releaseClientForReadData(xceiverClient,
false);
+ verify(xceiverClient, times(1)).completeStreamRead(any());
}
- /**
- * Match readData with the chunkData byte-wise.
- * @param readData Data read through ChunkInputStream
- * @param inputDataStartIndex first index (inclusive) in chunkData to compare
- * with read data
- * @param length the number of bytes of data to match starting from
- * inputDataStartIndex
- */
- private void matchWithInputData(byte[] readData, int inputDataStartIndex,
- int length) {
- for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
- assertEquals(blockData[i], readData[i - inputDataStartIndex], "i: " + i);
- }
+ @Test
+ public void testUnbufferReleasesResourcesAndResumesFromLastPosition() throws
IOException, InterruptedException {
+ setupSuccessfulRead();
+ assertEquals(data[0], blockStream.read());
+ assertEquals(1, blockStream.getPos());
+ blockStream.unbuffer();
+ // Verify that cancel() was called on the requestObserver mock
+ verify(requestObserver).cancel(any(), any());
+ // Verify that release() was called on the xceiverClient mock
+ verify(xceiverClientFactory).releaseClientForReadData(xceiverClient,
false);
+ verify(xceiverClient, times(1)).completeStreamRead(any());
+ // The next read should "rebuffer" and continue from the last position
+ assertEquals(data[1], blockStream.read());
+ assertEquals(2, blockStream.getPos());
}
- private void matchWithInputData(List<ByteString> byteStrings,
- int inputDataStartIndex, int length) {
- int offset = inputDataStartIndex;
- int totalBufferLen = 0;
- for (ByteString byteString : byteStrings) {
- int bufferLen = byteString.size();
- matchWithInputData(byteString.toByteArray(), offset, bufferLen);
- offset += bufferLen;
- totalBufferLen += bufferLen;
- }
- assertEquals(length, totalBufferLen);
+ @Test
+ public void testSeekReleasesTheStreamAndStartsFromNewPosition() throws
IOException, InterruptedException {
+ setupSuccessfulRead();
+ assertEquals(data[0], blockStream.read());
+ blockStream.seek(100);
+ assertEquals(100, blockStream.getPos());
+ // Verify that cancel() was called on the requestObserver mock
+ verify(requestObserver).cancel(any(), any());
+ verify(xceiverClient, times(1)).completeStreamRead(any());
+ // The xceiverClient should not be released
+ verify(xceiverClientFactory, never())
+ .releaseClientForReadData(xceiverClient, false);
+
+ assertEquals(data[100], blockStream.read());
+ assertEquals(101, blockStream.getPos());
}
- /**
- * Seek to a position and verify through getPos().
- */
- private void seekAndVerify(int pos) throws Exception {
- blockStream.seek(pos);
- assertEquals(pos, blockStream.getPos(),
- "Current position of buffer does not match with the sought position");
+ @Test
+ public void testErrorThrownIfStreamReturnsError() throws IOException,
InterruptedException {
+ // Note the error will only be thrown when the buffer needs to be
refilled. I think case, as its the first
+ // read it will try to fill the buffer and encounter the error, but a
reader could continue reading until the
+ // buffer is exhausted before seeing the error.
+ doAnswer((InvocationOnMock invocation) -> {
+ StreamingReaderSpi streamObserver = invocation.getArgument(1);
+ StreamingReadResponse resp =
+ new
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(),
requestObserver);
+ streamObserver.setStreamingReadResponse(resp);
+ streamObserver.onError(new IOException("Test induced error"));
+ return null;
+ }).when(xceiverClient).streamRead(any(), any());
+ assertThrows(IOException.class, () -> blockStream.read());
+ verify(xceiverClient, times(0)).completeStreamRead(any());
}
@Test
- public void testFullChunkRead() throws Exception {
- byte[] b = new byte[blockSize];
- int numBytesRead = blockStream.read(b, 0, blockSize);
- assertEquals(blockSize, numBytesRead);
- matchWithInputData(b, 0, blockSize);
+ public void seekOutOfBounds() throws IOException, InterruptedException {
+ setupSuccessfulRead();
+ assertThrows(IOException.class, () -> blockStream.seek(-1));
+ assertThrows(IOException.class, () -> blockStream.seek(BLOCK_SIZE + 1));
}
@Test
- public void testPartialChunkRead() throws Exception {
- int len = blockSize / 2;
- byte[] b = new byte[len];
-
- int numBytesRead = blockStream.read(b, 0, len);
- assertEquals(len, numBytesRead);
- matchWithInputData(b, 0, len);
-
- // To read block data from index 0 to 225 (len = 225), we need to read
- // chunk from offset 0 to 240 as the checksum boundary is at every 20
- // bytes. Verify that 60 bytes of chunk data are read and stored in the
- // buffers. Since checksum boundary is at every 20 bytes, there should be
- // 240/20 number of buffers.
- matchWithInputData(blockStream.getReadByteBuffers(), 0, 240);
+ public void readPastEOFReturnsEOF() throws IOException, InterruptedException
{
+ setupSuccessfulRead();
+ blockStream.seek(BLOCK_SIZE);
+ // Ensure the stream is at EOF even after two attempts to read
+ assertEquals(-1, blockStream.read());
+ assertEquals(-1, blockStream.read());
+ assertEquals(BLOCK_SIZE, blockStream.getPos());
}
@Test
- public void testSeek() throws Exception {
- seekAndVerify(0);
- EOFException eofException = assertThrows(EOFException.class, () ->
seekAndVerify(blockSize + 1));
- assertThat(eofException).hasMessage("EOF encountered at pos: " +
(blockSize + 1) + " for block: " + blockID);
-
- // Seek before read should update the BlockInputStream#blockPosition
- seekAndVerify(25);
-
- // Read from the sought position.
- // Reading from index 25 to 54 should result in the BlockInputStream
- // copying chunk data from index 20 to 59 into the buffers (checksum
- // boundaries).
- byte[] b = new byte[30];
- int numBytesRead = blockStream.read(b, 0, 30);
- assertEquals(30, numBytesRead);
- matchWithInputData(b, 25, 30);
- matchWithInputData(blockStream.getReadByteBuffers(), 20, 40);
-
- // After read, the position of the blockStream is evaluated from the
- // buffers and the chunkPosition should be reset to -1.
-
- // Only the last BYTES_PER_CHECKSUM will be cached in the buffers as
- // buffers are released after each checksum boundary is read. So the
- // buffers should contain data from index 40 to 59.
- // Seek to a position within the cached buffers. BlockPosition should
- // still not be used to set the position.
- seekAndVerify(45);
-
- // Seek to a position outside the current cached buffers. In this case, the
- // chunkPosition should be updated to the seeked position.
- seekAndVerify(75);
-
- // Read upto checksum boundary should result in all the buffers being
- // released and hence chunkPosition updated with current position of chunk.
- seekAndVerify(25);
- b = new byte[15];
- numBytesRead = blockStream.read(b, 0, 15);
- assertEquals(15, numBytesRead);
- matchWithInputData(b, 25, 15);
+ public void ensureExceptionThrownForReadAfterClosed() throws IOException,
InterruptedException {
+ setupSuccessfulRead();
+ blockStream.close();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10);
+ byte[] byteArray = new byte[10];
+ assertThrows(IOException.class, () -> blockStream.read());
+ assertThrows(IOException.class, () -> {
+ // Findbugs complains about ignored return value without this :(
+ int r = blockStream.read(byteArray, 0, 10);
+ });
+ assertThrows(IOException.class, () -> blockStream.read(byteBuffer));
+ assertThrows(IOException.class, () -> blockStream.seek(10));
+ }
+
+ @ParameterizedTest
+ @MethodSource("exceptionsTriggeringRefresh")
+ public void testRefreshFunctionCalledForAllDNsBadOnInitialize(IOException
thrown)
+ throws IOException, InterruptedException {
+ // In this case, if the first attempt to connect to any of the DNs fails,
it should retry by refreshing the pipeline
+
+ doAnswer((InvocationOnMock invocation) -> {
+ throw thrown;
+ }).doAnswer((InvocationOnMock invocation) -> {
+ StreamingReaderSpi streamObserver = invocation.getArgument(1);
+ StreamingReadResponse resp =
+ new
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(),
requestObserver);
+ streamObserver.setStreamingReadResponse(resp);
+ streamObserver.onNext(createChunkResponse(false));
+ streamObserver.onCompleted();
+ return null;
+ }).when(xceiverClient).streamRead(any(), any());
+ blockStream.read();
+ verify(refreshFunction, times(1)).apply(any());
+ }
+
+ @ParameterizedTest
+ @MethodSource("exceptionsNotTriggeringRefresh")
+ public void testRefreshNotCalledForAllDNsBadOnInitialize(IOException thrown)
+ throws IOException, InterruptedException {
+ // In this case, if the first attempt to connect to any of the DNs fails,
it should retry by refreshing the pipeline
+ doAnswer((InvocationOnMock invocation) -> {
+ throw thrown;
+ }).when(xceiverClient).streamRead(any(), any());
+ assertThrows(IOException.class, () -> blockStream.read());
+ verify(refreshFunction, times(0)).apply(any());
}
@Test
- public void testSeekAndRead() throws Exception {
- // Seek to a position and read data
- seekAndVerify(50);
- byte[] b1 = new byte[20];
- int numBytesRead = blockStream.read(b1, 0, 20);
- assertEquals(20, numBytesRead);
- matchWithInputData(b1, 50, 20);
-
- // Next read should start from the position of the last read + 1 i.e. 70
- byte[] b2 = new byte[20];
- numBytesRead = blockStream.read(b2, 0, 20);
- assertEquals(20, numBytesRead);
- matchWithInputData(b2, 70, 20);
-
- byte[] b3 = new byte[20];
- seekAndVerify(80);
- numBytesRead = blockStream.read(b3, 0, 20);
- assertEquals(20, numBytesRead);
- matchWithInputData(b3, 80, 20);
+ public void testExceptionThrownAfterRetriesExhausted() throws IOException,
InterruptedException {
+ // In this case, if the first attempt to connect to any of the DNs fails,
it should retry by refreshing the pipeline
+ doAnswer((InvocationOnMock invocation) -> {
+ throw new StorageContainerException(CONTAINER_NOT_FOUND);
+ }).when(xceiverClient).streamRead(any(), any());
+
+ assertThrows(IOException.class, () -> blockStream.read());
+ verify(refreshFunction, times(1)).apply(any());
}
@Test
- public void testUnbuffered() throws Exception {
- byte[] b1 = new byte[20];
- int numBytesRead = blockStream.read(b1, 0, 20);
- assertEquals(20, numBytesRead);
- matchWithInputData(b1, 0, 20);
+ public void testInvalidChecksumThrowsException() throws IOException,
InterruptedException {
+ doAnswer((InvocationOnMock invocation) -> {
+ StreamingReaderSpi streamObserver = invocation.getArgument(1);
+ StreamingReadResponse resp =
+ new
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(),
requestObserver);
+ streamObserver.setStreamingReadResponse(resp);
+ streamObserver.onNext(createChunkResponse(true));
+ streamObserver.onCompleted();
+ return null;
+ }).when(xceiverClient).streamRead(any(), any());
+ assertThrows(IOException.class, () -> blockStream.read());
+ }
- blockStream.unbuffer();
+ private void createDataAndChecksum() throws OzoneChecksumException {
+ data = new byte[BLOCK_SIZE];
+ new SecureRandom().nextBytes(data);
+ checksumData = checksum.computeChecksum(data);
+ }
+
+ private void setupSuccessfulRead() throws IOException, InterruptedException {
+ doAnswer((InvocationOnMock invocation) -> {
+ StreamingReaderSpi streamObserver = invocation.getArgument(1);
+ StreamingReadResponse resp =
+ new
StreamingReadResponse(MockDatanodeDetails.randomDatanodeDetails(),
requestObserver);
+ streamObserver.setStreamingReadResponse(resp);
+ streamObserver.onNext(createChunkResponse(false));
+ streamObserver.onCompleted();
+ return null;
+ }).when(xceiverClient).streamRead(any(), any());
+ }
+
+ private ContainerProtos.ContainerCommandResponseProto
createChunkResponse(boolean invalidChecksum) {
+ ContainerProtos.ReadBlockResponseProto response = invalidChecksum ?
+ createInValidChecksumResponse() : createValidResponse();
- assertFalse(blockStream.buffersAllocated());
+ return ContainerProtos.ContainerCommandResponseProto.newBuilder()
+ .setCmdType(ContainerProtos.Type.ReadBlock)
+ .setReadBlock(response)
+ .setResult(ContainerProtos.Result.SUCCESS)
+ .build();
+ }
+
+ private ContainerProtos.ReadBlockResponseProto createValidResponse() {
+ return ContainerProtos.ReadBlockResponseProto.newBuilder()
+ .setChecksumData(checksumData.getProtoBufMessage())
+ .setData(ByteString.copyFrom(data))
+ .setOffset(0)
+ .build();
+ }
+
+ private ContainerProtos.ReadBlockResponseProto
createInValidChecksumResponse() {
+ byte[] invalidData = new byte[data.length];
+ System.arraycopy(data, 0, invalidData, 0, data.length);
+ // Corrupt the data
+ invalidData[0] = (byte) (invalidData[0] + 1);
+ return ContainerProtos.ReadBlockResponseProto.newBuilder()
+ .setChecksumData(checksumData.getProtoBufMessage())
+ .setData(ByteString.copyFrom(invalidData))
+ .setOffset(0)
+ .build();
+ }
+
+ private static Stream<Arguments> exceptionsTriggeringRefresh() {
+ return Stream.of(
+ Arguments.of(new StorageContainerException(CONTAINER_NOT_FOUND)),
+ Arguments.of(new IOException(new ExecutionException(
+ new StatusException(Status.UNAVAILABLE))))
+ );
+ }
- // Next read should start from the position of the last read + 1 i.e. 20
- byte[] b2 = new byte[20];
- numBytesRead = blockStream.read(b2, 0, 20);
- assertEquals(20, numBytesRead);
- matchWithInputData(b2, 20, 20);
+ private static Stream<Arguments> exceptionsNotTriggeringRefresh() {
+ return Stream.of(
+ Arguments.of(new SCMSecurityException("Security problem")),
+ Arguments.of(new OzoneChecksumException("checksum missing")),
+ Arguments.of(new IOException("Some random exception."))
+ );
}
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
new file mode 100644
index 00000000000..ea8694cd8b7
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReadResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver;
+
+/**
+ * Streaming read response holding datanode details and
+ * request observer to send read requests.
+ */
+public class StreamingReadResponse {
+
+ private final DatanodeDetails dn;
+ private final
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto>
requestObserver;
+
+ public StreamingReadResponse(DatanodeDetails dn,
+ ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto>
requestObserver) {
+ this.dn = dn;
+ this.requestObserver = requestObserver;
+ }
+
+ public DatanodeDetails getDatanodeDetails() {
+ return dn;
+ }
+
+ public
ClientCallStreamObserver<ContainerProtos.ContainerCommandRequestProto>
getRequestObserver() {
+ return requestObserver;
+ }
+}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java
similarity index 62%
copy from
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
copy to
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java
index 6753f600a91..0206253784c 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/StreamingReaderSpi.java
@@ -15,27 +15,16 @@
* limitations under the License.
*/
-package org.apache.hadoop.hdds.scm.storage;
+package org.apache.hadoop.hdds.scm;
-import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
/**
- * Abstract class used as an interface for input streams related to Ozone
- * blocks.
+ * SPI for streaming reader to set the streaming read response.
*/
-public abstract class BlockExtendedInputStream extends ExtendedInputStream
- implements PartInputStream {
+public interface StreamingReaderSpi extends
StreamObserver<ContainerProtos.ContainerCommandResponseProto> {
- public abstract BlockID getBlockID();
+ void setStreamingReadResponse(StreamingReadResponse streamingReadResponse);
- @Override
- public long getRemaining() {
- return getLength() - getPos();
- }
-
- @Override
- public abstract long getLength();
-
- @Override
- public abstract long getPos();
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 35e65271bb1..f1bf7a8ef85 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -144,6 +144,15 @@ public ContainerCommandResponseProto sendCommand(
}
}
+ public void streamRead(ContainerCommandRequestProto request,
+ StreamingReaderSpi streamObserver) throws IOException,
InterruptedException {
+ throw new UnsupportedOperationException("Stream read is not supported");
+ }
+
+ public void completeStreamRead(StreamingReadResponse streamingReadResponse) {
+ throw new UnsupportedOperationException("Stream read is not supported");
+ }
+
public static IOException getIOExceptionForSendCommand(
ContainerCommandRequestProto request, Exception e) {
return new IOException("Failed to execute command "
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index ededa8d070b..3fc97e97ffa 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto;
+import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBufferToByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
@@ -335,19 +336,16 @@ public static ContainerCommandResponseProto
getReadChunkResponse(
}
public static ContainerCommandResponseProto getReadBlockResponse(
- ContainerCommandRequestProto request, DatanodeBlockID blockID,
- ChunkInfo chunkInfo, ChunkBufferToByteString data,
- Function<ByteBuffer, ByteString> byteBufferToByteString) {
+ ContainerCommandRequestProto request, ChecksumData checksumData,
ByteBuffer data, long offset) {
+
+ ContainerProtos.ReadBlockResponseProto response =
ContainerProtos.ReadBlockResponseProto.newBuilder()
+ .setChecksumData(checksumData.getProtoBufMessage())
+ .setData(ByteString.copyFrom(data))
+ .setOffset(offset)
+ .build();
- ReadChunkResponseProto.Builder response;
- response = ReadChunkResponseProto.newBuilder()
- .setChunkData(chunkInfo)
- .setDataBuffers(DataBuffers.newBuilder()
- .addAllBuffers(data.toByteStringList(byteBufferToByteString))
- .build())
- .setBlockID(blockID);
return getSuccessResponseBuilder(request)
- .setReadChunk(response)
+ .setReadBlock(response)
.build();
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 1117d29c1aa..c6e5d75b5ca 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -56,13 +56,13 @@
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockRequestProto;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadBlockResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
+import org.apache.hadoop.hdds.scm.StreamingReaderSpi;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
@@ -911,23 +911,18 @@ public static List<Validator> toValidatorList(Validator
validator) {
*
* @param xceiverClient client to perform call
* @param offset offset where block starts
- * @param len length of data to read
* @param blockID ID of the block
- * @param validators functions to validate the response
* @param token a token for this block (may be null)
- * @return container protocol read chunk response
* @throws IOException if there is an I/O error while performing the call
*/
@SuppressWarnings("checkstyle:ParameterNumber")
- public static ContainerProtos.ReadBlockResponseProto readBlock(
- XceiverClientSpi xceiverClient, long offset, long len, BlockID blockID,
- List<Validator> validators, Token<? extends TokenIdentifier> token,
- Map<DatanodeDetails, Integer> replicaIndexes, boolean verifyChecksum)
throws IOException {
+ public static void readBlock(
+ XceiverClientSpi xceiverClient, long offset, BlockID blockID, Token<?
extends TokenIdentifier> token,
+ Map<DatanodeDetails, Integer> replicaIndexes, StreamingReaderSpi
streamObserver)
+ throws IOException, InterruptedException {
final ReadBlockRequestProto.Builder readBlockRequest =
ReadBlockRequestProto.newBuilder()
- .setOffset(offset)
- .setVerifyChecksum(verifyChecksum)
- .setLen(len);
+ .setOffset(offset);
final ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadBlock)
.setContainerID(blockID.getContainerID());
@@ -935,18 +930,14 @@ public static ContainerProtos.ReadBlockResponseProto
readBlock(
builder.setEncodedToken(token.encodeToUrlString());
}
- return tryEachDatanode(xceiverClient.getPipeline(),
- d -> readBlock(xceiverClient,
- validators, blockID, builder, readBlockRequest, d, replicaIndexes),
- d -> toErrorMessage(blockID, d));
+ readBlock(xceiverClient, blockID, builder, readBlockRequest,
xceiverClient.getPipeline().getFirstNode(),
+ replicaIndexes, streamObserver);
}
- private static ReadBlockResponseProto readBlock(XceiverClientSpi
xceiverClient,
- List<Validator> validators,
BlockID blockID,
-
ContainerCommandRequestProto.Builder builder,
-
ReadBlockRequestProto.Builder readBlockBuilder,
- DatanodeDetails datanode,
- Map<DatanodeDetails,
Integer> replicaIndexes) throws IOException {
+ private static void readBlock(XceiverClientSpi xceiverClient,
+ BlockID blockID, ContainerCommandRequestProto.Builder builder,
ReadBlockRequestProto.Builder readBlockBuilder,
+ DatanodeDetails datanode, Map<DatanodeDetails, Integer> replicaIndexes,
+ StreamingReaderSpi streamObserver) throws IOException,
InterruptedException {
final DatanodeBlockID.Builder datanodeBlockID =
blockID.getDatanodeBlockIDProtobufBuilder();
int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
if (replicaIndex > 0) {
@@ -956,8 +947,6 @@ private static ReadBlockResponseProto
readBlock(XceiverClientSpi xceiverClient,
final ContainerCommandRequestProto request = builder
.setDatanodeUuid(datanode.getUuidString())
.setReadBlock(readBlockBuilder).build();
- ContainerCommandResponseProto response =
- xceiverClient.sendCommand(request, validators);
- return response.getReadBlock();
+ xceiverClient.streamRead(request, streamObserver);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index e6be4a490b4..6dba6abf9d0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -818,8 +818,8 @@ public StateMachine.DataChannel getStreamDataChannel(
@Override
public void streamDataReadOnly(ContainerCommandRequestProto msg,
- StreamObserver<ContainerCommandResponseProto>
streamObserver,
- DispatcherContext dispatcherContext) {
+ StreamObserver<ContainerCommandResponseProto> streamObserver,
+ DispatcherContext dispatcherContext) {
Type cmdType = msg.getCmdType();
String traceID = msg.getTraceID();
Span span = TracingUtil.importAndCreateSpan(cmdType.toString(), traceID);
@@ -829,8 +829,7 @@ public void streamDataReadOnly(ContainerCommandRequestProto
msg,
try (UncheckedAutoCloseable ignored = protocolMetrics.measure(cmdType)) {
Preconditions.checkNotNull(msg);
if (LOG.isTraceEnabled()) {
- LOG.trace("Command {}, trace ID: {}.", msg.getCmdType(),
- traceID);
+ LOG.trace("Command {}, trace ID: {}.", msg.getCmdType(), traceID);
}
PerformanceStringBuilder perf = new PerformanceStringBuilder();
@@ -849,20 +848,17 @@ public void
streamDataReadOnly(ContainerCommandRequestProto msg,
ContainerProtos.Result.CONTAINER_MISSING);
}
if (container == null) {
- throw new StorageContainerException(
- "ContainerID " + containerID + " does not exist",
+ throw new StorageContainerException("ContainerID " + containerID + "
does not exist",
ContainerProtos.Result.CONTAINER_NOT_FOUND);
}
ContainerType containerType = getContainerType(container);
Handler handler = getHandler(containerType);
if (handler == null) {
- throw new StorageContainerException("Invalid " +
- "ContainerType " + containerType,
+ throw new StorageContainerException("Invalid " + "ContainerType " +
containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
}
perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
- responseProto = handler.readBlock(
- msg, container, dispatcherContext, streamObserver);
+ responseProto = handler.readBlock(msg, container, dispatcherContext,
streamObserver);
long oPLatencyMS = Time.monotonicNow() - startTime;
metrics.incContainerOpsLatencies(cmdType, oPLatencyMS);
if (responseProto == null) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index d6c33455008..8a4a675187d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -98,7 +98,6 @@ default void streamDataReadOnly(
ContainerCommandRequestProto msg,
StreamObserver<ContainerCommandResponseProto> streamObserver,
DispatcherContext dispatcherContext) {
- throw new UnsupportedOperationException(
- "streamDataReadOnly not supported.");
+ throw new UnsupportedOperationException("streamDataReadOnly not
supported.");
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index ecf13cf12af..2f4177b2d06 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -59,6 +59,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static
org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient.createSingleNodePipeline;
import static
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.DEFAULT_LAYOUT;
+import static
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -69,7 +70,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -77,6 +80,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -88,7 +92,6 @@
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
@@ -124,6 +127,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.ChunkBufferToByteString;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
@@ -176,6 +180,7 @@ public class KeyValueHandler extends Handler {
private static final Logger LOG = LoggerFactory.getLogger(
KeyValueHandler.class);
+ private static final int STREAMING_BYTES_PER_CHUNK = 1024 * 64;
private final BlockManager blockManager;
private final ChunkManager chunkManager;
@@ -2055,110 +2060,81 @@ public ContainerCommandResponseProto readBlock(
ContainerCommandRequestProto request, Container kvContainer,
DispatcherContext dispatcherContext,
StreamObserver<ContainerCommandResponseProto> streamObserver) {
+
+ if (kvContainer.getContainerData().getLayoutVersion() != FILE_PER_BLOCK) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Only File Per Block is supported",
IO_EXCEPTION), request);
+ }
+
ContainerCommandResponseProto responseProto = null;
if (!request.hasReadBlock()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Malformed Read Block request. trace ID: {}",
- request.getTraceID());
+ LOG.debug("Malformed Read Block request. trace ID: {}",
request.getTraceID());
}
return malformedRequest(request);
}
try {
ReadBlockRequestProto readBlock = request.getReadBlock();
- BlockID blockID = BlockID.getFromProtobuf(
- readBlock.getBlockID());
+ BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID());
// This is a new api the block should always be checked.
BlockUtils.verifyReplicaIdx(kvContainer, blockID);
BlockUtils.verifyBCSId(kvContainer, blockID);
+ File blockFile =
FILE_PER_BLOCK.getChunkFile(kvContainer.getContainerData(), blockID, "unused");
+
BlockData blockData = getBlockManager().getBlock(kvContainer, blockID);
List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
- long blockOffset = 0;
- int chunkIndex = -1;
- long chunkOffset = 0;
- long offset = readBlock.getOffset();
- for (int i = 0; i < chunkInfos.size(); i++) {
- final long chunkLen = chunkInfos.get(i).getLen();
- blockOffset += chunkLen;
- if (blockOffset > offset) {
- chunkIndex = i;
- chunkOffset = offset - blockOffset + chunkLen;
- break;
- }
- }
- Preconditions.checkState(chunkIndex >= 0);
-
- if (dispatcherContext == null) {
- dispatcherContext = DispatcherContext.getHandleReadBlock();
- }
-
- ChunkBufferToByteString data;
+ // To get the chunksize, check the first chunk. Either there is only 1
chunk and its the largest, or there are
+ // multiple chunks and they are all the same size except the last one.
+ long bytesPerChunk = chunkInfos.get(0).getLen();
+ // The bytes per checksum is stored in the checksum data of each chunk,
so check the first chunk as they all
+ // must be the same.
+ ContainerProtos.ChecksumType checksumType =
chunkInfos.get(0).getChecksumData().getType();
+ ChecksumData checksumData = null;
+ int bytesPerChecksum = STREAMING_BYTES_PER_CHUNK;
+ if (checksumType == ContainerProtos.ChecksumType.NONE) {
+ checksumData = new ChecksumData(checksumType, 0);
+ } else {
+ bytesPerChecksum =
chunkInfos.get(0).getChecksumData().getBytesPerChecksum();
+ }
+ // We have to align the read to checksum boundaries, so whatever offset
is requested, we have to move back to the
+ // previous checksum boundary.
+ // eg if bytesPerChecksum is 512, and the requested offset is 600, we
have to move back to 512.
+ // If the checksum type is NONE, we don't have to do this, but using no
checksums should be rare in practice and
+ // it simplifies the code to always do this.
+ long adjustedOffset = readBlock.getOffset() - readBlock.getOffset() %
bytesPerChecksum;
+ try (RandomAccessFile file = new RandomAccessFile(blockFile, "r");
+ FileChannel channel = file.getChannel()) {
+ ByteBuffer buffer = ByteBuffer.allocate(bytesPerChecksum);
+ channel.position(adjustedOffset);
+ while (channel.read(buffer) != -1) {
+ buffer.flip();
+ if (checksumType != ContainerProtos.ChecksumType.NONE) {
+ // As the checksums are stored "chunk by chunk", we need to figure
out which chunk we start reading from,
+ // and its offset to pull out the correct checksum bytes for each
read.
+ int chunkIndex = (int) (adjustedOffset / bytesPerChunk);
+ int chunkOffset = (int) (adjustedOffset % bytesPerChunk);
+ int checksumIndex = chunkOffset / bytesPerChecksum;
+ ByteString checksum =
blockData.getChunks().get(chunkIndex).getChecksumData().getChecksums(checksumIndex);
+ checksumData = new ChecksumData(checksumType, bytesPerChecksum,
Collections.singletonList(checksum));
+ }
+ streamObserver.onNext(getReadBlockResponse(request, checksumData,
buffer, adjustedOffset));
+ buffer.clear();
- long len = readBlock.getLen();
- long adjustedChunkOffset, adjustedChunkLen;
- do {
- ContainerProtos.ChunkInfo chunk = chunkInfos.get(chunkIndex);
- if (readBlock.getVerifyChecksum()) {
- Pair<Long, Long> adjustedOffsetAndLength =
- computeChecksumBoundaries(chunk, chunkOffset, len);
- adjustedChunkOffset = adjustedOffsetAndLength.getLeft();
- adjustedChunkLen = adjustedOffsetAndLength.getRight();
- adjustedChunkOffset += chunk.getOffset();
- } else {
- adjustedChunkOffset = chunkOffset;
- adjustedChunkLen = Math.min(
- chunk.getLen() + chunk.getOffset() - chunkOffset, len);
+ adjustedOffset += bytesPerChecksum;
}
-
- ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
- ContainerProtos.ChunkInfo.newBuilder(chunk)
- .setOffset(adjustedChunkOffset)
- .setLen(adjustedChunkLen).build());
- BlockUtils.verifyReplicaIdx(kvContainer, blockID);
- BlockUtils.verifyBCSId(kvContainer, blockID);
- data = getChunkManager().readChunk(
- kvContainer, blockID, chunkInfo, dispatcherContext);
-
- Preconditions.checkNotNull(data, "Chunk data is null");
- streamObserver.onNext(
- getReadBlockResponse(request,
- blockData.getProtoBufMessage().getBlockID(),
- chunkInfo.getProtoBufMessage(),
- data, byteBufferToByteString));
- len -= adjustedChunkLen + adjustedChunkOffset - chunkOffset;
- chunkOffset = 0;
- chunkIndex++;
- } while (len > 0);
-
- metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen());
+ }
+ // TODO metrics.incContainerBytesStats(Type.ReadBlock,
readBlock.getLen());
} catch (StorageContainerException ex) {
responseProto = ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ioe) {
responseProto = ContainerUtils.logAndReturnError(LOG,
- new StorageContainerException("Read Block failed", ioe,
IO_EXCEPTION),
- request);
+ new StorageContainerException("Read Block failed", ioe,
IO_EXCEPTION), request);
}
return responseProto;
}
- private Pair<Long, Long> computeChecksumBoundaries(
- ContainerProtos.ChunkInfo chunkInfo, long startByteIndex, long dataLen) {
-
- int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum();
-
- // index of the last byte to be read from chunk, inclusively.
- final long endByteIndex = startByteIndex + dataLen - 1;
-
- long adjustedChunkOffset = (startByteIndex / bytesPerChecksum)
- * bytesPerChecksum; // inclusive
- final long endIndex = ((endByteIndex / bytesPerChecksum) + 1)
- * bytesPerChecksum; // exclusive
- long adjustedChunkLen =
- Math.min(endIndex, chunkInfo.getLen()) - adjustedChunkOffset;
- return Pair.of(adjustedChunkOffset, adjustedChunkLen);
- }
-
@Override
public void addFinalizedBlock(Container container, long localID) {
KeyValueContainer keyValueContainer = (KeyValueContainer)container;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index efede65e524..6afee1c5d77 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -42,10 +42,8 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMostOnce;
-import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -55,11 +53,9 @@
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -72,29 +68,22 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
-import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -105,28 +94,22 @@
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.GenericTestUtils.LogCapturer;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,10 +127,7 @@ public class TestKeyValueHandler {
private Path dbFile;
private static final long DUMMY_CONTAINER_ID = 9999;
- private static final long LOCAL_ID = 1;
private static final String DUMMY_PATH = "dummy/dir/doesnt/exist";
- private static final long CHUNK_SIZE = 1024 * 1024; // 1MB
- private static final long BYTES_PER_CHECKSUM = 256 * 1024;
private static final String DATANODE_UUID = UUID.randomUUID().toString();
private static final String CLUSTER_ID = UUID.randomUUID().toString();
@@ -958,77 +938,4 @@ private KeyValueHandler createKeyValueHandler(Path path)
throws IOException {
return kvHandler;
}
-
- @Test
- public void testReadBlock() throws IOException {
-
- StreamObserver<ContainerCommandResponseProto> streamObserver =
mock(StreamObserver.class);
- KeyValueContainer container = mock(KeyValueContainer.class);
- final KeyValueHandler kvHandler = new KeyValueHandler(new
OzoneConfiguration(),
- UUID.randomUUID().toString(), mock(ContainerSet.class),
mock(VolumeSet.class), mock(ContainerMetrics.class),
- mock(IncrementalReportSender.class),
mock(ContainerChecksumTreeManager.class));
- final KeyValueHandler keyValueHandler = spy(kvHandler);
- DispatcherContext dispatcherContext = mock(DispatcherContext.class);
-
- List<ContainerProtos.ChunkInfo> chunkInfoList = new ArrayList<>();
- BlockData blockData = new BlockData(new BlockID(1, 1));
- for (int i = 0; i < 4; i++) {
- chunkInfoList.add(ContainerProtos.ChunkInfo
- .newBuilder()
- .setOffset(0)
- .setLen(CHUNK_SIZE)
- .setChecksumData(
- ChecksumData.newBuilder().setBytesPerChecksum((int)
BYTES_PER_CHECKSUM)
- .setType(ChecksumType.CRC32).build())
- .setChunkName("chunkName" + i)
- .build());
- }
- blockData.setChunks(chunkInfoList);
-
- try (MockedStatic<BlockUtils> blockUtils = mockStatic(BlockUtils.class)) {
- BlockManager blockManager = mock(BlockManager.class);
- ChunkManager chunkManager = mock(ChunkManager.class);
- when(keyValueHandler.getBlockManager()).thenReturn(blockManager);
- when(keyValueHandler.getChunkManager()).thenReturn(chunkManager);
- when(blockManager.getBlock(any(), any())).thenReturn(blockData);
- ChunkBuffer data = ChunkBuffer.wrap(ByteBuffer.allocate(0));
- when(chunkManager.readChunk(any(), any(),
- any(), any()))
- .thenReturn(data);
- testReadBlock(0, 1, keyValueHandler, dispatcherContext,
- streamObserver, container);
- testReadBlock(0, CHUNK_SIZE + 1, keyValueHandler, dispatcherContext,
- streamObserver, container);
- testReadBlock(CHUNK_SIZE / 2, 2 * CHUNK_SIZE, keyValueHandler,
dispatcherContext,
- streamObserver, container);
- }
- }
-
- private static ContainerCommandRequestProto readBlockRequest(
- long offset, long length) {
- return ContainerCommandRequestProto.newBuilder()
- .setCmdType(Type.ReadBlock)
- .setReadBlock(
- ContainerProtos.ReadBlockRequestProto.newBuilder()
- .setBlockID(
- ContainerProtos.DatanodeBlockID.newBuilder()
- .setContainerID(DUMMY_CONTAINER_ID)
- .setLocalID(LOCAL_ID))
- .setOffset(offset)
- .setLen(length)
- .setVerifyChecksum(true))
- .setContainerID(DUMMY_CONTAINER_ID)
- .setDatanodeUuid(UUID.randomUUID().toString())
- .build();
- }
-
- private static void testReadBlock(
- long offset, long length, KeyValueHandler keyValueHandler,
DispatcherContext dispatcherContext,
- StreamObserver<ContainerCommandResponseProto> streamObserver,
KeyValueContainer container) {
- int responseCount = (int) (((offset + length - 1) / CHUNK_SIZE) + 1 -
(offset / CHUNK_SIZE));
- ContainerCommandRequestProto requestProto = readBlockRequest(offset,
length);
- keyValueHandler.readBlock(requestProto, container, dispatcherContext,
streamObserver);
- verify(streamObserver, times(responseCount)).onNext(any());
- clearInvocations(streamObserver);
- }
}
diff --git
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index c5548244560..6b4d8f1bd7f 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -401,12 +401,12 @@ message ListBlockResponseProto {
message ReadBlockRequestProto {
required DatanodeBlockID blockID = 1;
required uint64 offset = 2;
- required uint64 len = 3;
- required bool verifyChecksum = 4;
}
message ReadBlockResponseProto {
- repeated ReadChunkResponseProto readChunk = 1;
+ required ChecksumData checksumData = 1;
+ required uint64 offset = 2;
+ required bytes data = 3;
}
message EchoRequestProto {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
index 80ae5118467..bb66a303155 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
@@ -18,24 +18,21 @@
package org.apache.hadoop.ozone.client.rpc.read;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.storage.StreamBlockInputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
-import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
-import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.om.TestBucket;
+import org.junit.jupiter.api.Test;
/**
* Tests {@link StreamBlockInputStream}.
@@ -45,8 +42,12 @@ public class TestStreamBlockInputStream extends
TestInputStreamBase {
* Run the tests as a single test method to avoid needing a new mini-cluster
* for each test.
*/
- @ContainerLayoutTestInfo.ContainerTest
- void testAll(ContainerLayoutVersion layout) throws Exception {
+ private static final int DATA_LENGTH = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
+ private byte[] inputData;
+ private TestBucket bucket;
+
+ @Test
+ void testAll() throws Exception {
try (MiniOzoneCluster cluster = newCluster()) {
cluster.waitForClusterToBeReady();
@@ -55,14 +56,22 @@ void testAll(ContainerLayoutVersion layout) throws
Exception {
clientConfig.setStreamReadBlock(true);
OzoneConfiguration copy = new OzoneConfiguration(conf);
copy.setFromObject(clientConfig);
+ String keyName = getNewKeyName();
try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
- updateConfig(layout);
- TestBucket bucket = TestBucket.newBuilder(client).build();
-
- testBlockReadBuffers(bucket);
- testBufferRelease(bucket);
- testCloseReleasesBuffers(bucket);
- testReadEmptyBlock(bucket);
+ bucket = TestBucket.newBuilder(client).build();
+ inputData = bucket.writeRandomBytes(keyName, DATA_LENGTH);
+ testReadKeyFully(keyName);
+ testSeek(keyName);
+ testReadEmptyBlock();
+ }
+ keyName = getNewKeyName();
+ clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
+ copy.setFromObject(clientConfig);
+ try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
+ bucket = TestBucket.newBuilder(client).build();
+ inputData = bucket.writeRandomBytes(keyName, DATA_LENGTH);
+ testReadKeyFully(keyName);
+ testSeek(keyName);
}
}
}
@@ -71,175 +80,66 @@ void testAll(ContainerLayoutVersion layout) throws
Exception {
* Test to verify that data read from blocks is stored in a list of buffers
* with max capacity equal to the bytes per checksum.
*/
- private void testBlockReadBuffers(TestBucket bucket) throws Exception {
- String keyName = getNewKeyName();
- int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
- byte[] inputData = bucket.writeRandomBytes(keyName, dataLength);
-
- try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
-
- StreamBlockInputStream block0Stream =
- (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
-
-
- // To read 1 byte of chunk data, ChunkInputStream should get one full
- // checksum boundary worth of data from Container and store it in
buffers.
- IOUtils.readFully(block0Stream, new byte[1]);
- checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-
- // Read > checksum boundary of data from chunk0
- int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
- byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen);
- bucket.validateData(inputData, 0, readData);
-
- // The first checksum boundary size of data was already existing in the
- // ChunkStream buffers. Once that data is read, the next checksum
- // boundary size of data will be fetched again to read the remaining
data.
- // Hence, there should be 1 checksum boundary size of data stored in the
- // ChunkStreams buffers at the end of the read.
- checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-
- // Seek to a position in the third checksum boundary (so that current
- // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
- // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
- // data being read into the buffers. There should be 2 buffers in the
- // stream but the first buffer should be released after it is read
- // and the second buffer should have BYTES_PER_CHECKSUM capacity.
- int offset = 2 * BYTES_PER_CHECKSUM + 1;
- readData = readDataFromBlock(block0Stream, offset, readDataLen);
- bucket.validateData(inputData, offset, readData);
- checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-
-
- // Read the full chunk data -1 and verify that all chunk data is read
into
- // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
- // released once all chunk data is read.
- readData = readDataFromBlock(block0Stream, 0, CHUNK_SIZE - 1);
- bucket.validateData(inputData, 0, readData);
- checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-
- // Read the last byte of chunk and verify that the buffers are released.
- IOUtils.readFully(block0Stream, new byte[1]);
- assertNull(block0Stream.getCachedBuffers(),
- "ChunkInputStream did not release buffers after reaching EOF.");
+ private void testReadKeyFully(String key) throws Exception {
+ // Read the data fully into a large enough byte array
+ try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) {
+ byte[] readData = new byte[DATA_LENGTH];
+ int totalRead = keyInputStream.read(readData, 0, DATA_LENGTH);
+ assertEquals(DATA_LENGTH, totalRead);
+ for (int i = 0; i < DATA_LENGTH; i++) {
+ assertEquals(inputData[i], readData[i],
+ "Read data is not same as written data at index " + i);
+ }
}
- }
-
- private void testCloseReleasesBuffers(TestBucket bucket) throws Exception {
- String keyName = getNewKeyName();
- bucket.writeRandomBytes(keyName, CHUNK_SIZE);
-
- try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
- StreamBlockInputStream block0Stream =
- (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
-
- readDataFromBlock(block0Stream, 0, 1);
- assertNotNull(block0Stream.getCachedBuffers());
-
- block0Stream.close();
-
- assertNull(block0Stream.getCachedBuffers());
+ // Read the data 1 byte at a time
+ try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) {
+ for (int i = 0; i < DATA_LENGTH; i++) {
+ int b = keyInputStream.read();
+ assertEquals(inputData[i], (byte) b,
+ "Read data is not same as written data at index " + i);
+ }
}
- }
-
- /**
- * Test that ChunkInputStream buffers are released as soon as the last byte
- * of the buffer is read.
- */
- private void testBufferRelease(TestBucket bucket) throws Exception {
- String keyName = getNewKeyName();
- byte[] inputData = bucket.writeRandomBytes(keyName, CHUNK_SIZE);
-
- try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
-
- StreamBlockInputStream block0Stream =
- (StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
-
- // Read checksum boundary - 1 bytes of data
- int readDataLen = BYTES_PER_CHECKSUM - 1;
- byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen);
- bucket.validateData(inputData, 0, readData);
-
- // There should be 1 byte of data remaining in the buffer which is not
- // yet read. Hence, the buffer should not be released.
- checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
- assertEquals(1, block0Stream.getCachedBuffers()[0].remaining());
-
- // Reading the last byte in the buffer should result in all the buffers
- // being released.
- readData = readDataFromBlock(block0Stream, 1);
- bucket.validateData(inputData, readDataLen, readData);
- assertNull(block0Stream.getCachedBuffers(),
- "Chunk stream buffers not released after last byte is read");
-
- // Read more data to get the data till the next checksum boundary.
- readDataLen = BYTES_PER_CHECKSUM / 2;
- readDataFromBlock(block0Stream, readDataLen);
- // There should be one buffer and the buffer should not be released as
- // there is data pending to be read from the buffer
- checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
- ByteBuffer lastCachedBuffer = block0Stream.getCachedBuffers()[0];
- assertEquals(BYTES_PER_CHECKSUM - readDataLen,
- lastCachedBuffer.remaining());
-
- // Read more than the remaining data in buffer (but less than the next
- // checksum boundary).
- int position = (int) block0Stream.getPos();
- readDataLen = lastCachedBuffer.remaining() + BYTES_PER_CHECKSUM / 2;
- readData = readDataFromBlock(block0Stream, readDataLen);
- bucket.validateData(inputData, position, readData);
- // After reading the remaining data in the buffer, the buffer should be
- // released and next checksum size of data must be read into the buffers
- checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
- // Verify that the previously cached buffer is released by comparing it
- // with the current cached buffer
- assertNotEquals(lastCachedBuffer,
- block0Stream.getCachedBuffers()[0]);
+ // Read the data into a large enough ByteBuffer
+ try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) {
+ ByteBuffer readBuf = ByteBuffer.allocate(DATA_LENGTH);
+ int totalRead = keyInputStream.read(readBuf);
+ assertEquals(DATA_LENGTH, totalRead);
+ readBuf.flip();
+ for (int i = 0; i < DATA_LENGTH; i++) {
+ assertEquals(inputData[i], readBuf.get(),
+ "Read data is not same as written data at index " + i);
+ }
}
}
- private byte[] readDataFromBlock(StreamBlockInputStream
streamBlockInputStream,
- int offset, int readDataLength) throws
IOException {
- byte[] readData = new byte[readDataLength];
- streamBlockInputStream.seek(offset);
- IOUtils.readFully(streamBlockInputStream, readData);
- return readData;
- }
-
- private byte[] readDataFromBlock(StreamBlockInputStream
streamBlockInputStream,
- int readDataLength) throws IOException {
- byte[] readData = new byte[readDataLength];
- IOUtils.readFully(streamBlockInputStream, readData);
- return readData;
- }
-
- /**
- * Verify number of buffers and their capacities.
- * @param buffers chunk stream buffers
- */
- private void checkBufferSizeAndCapacity(ByteBuffer[] buffers) {
- assertEquals(1, buffers.length,
- "ChunkInputStream does not have expected number of " +
- "ByteBuffers");
- for (ByteBuffer buffer : buffers) {
- assertEquals(BYTES_PER_CHECKSUM, buffer.capacity(),
- "ChunkInputStream ByteBuffer capacity is wrong");
+ private void testSeek(String key) throws IOException {
+ java.util.Random random = new java.util.Random();
+ try (KeyInputStream keyInputStream = bucket.getKeyInputStream(key)) {
+ for (int i = 0; i < 100; i++) {
+ int position = random.nextInt(DATA_LENGTH);
+ keyInputStream.seek(position);
+ int b = keyInputStream.read();
+ assertEquals(inputData[position], (byte) b, "Read data is not same as
written data at index " + position);
+ }
+ StreamBlockInputStream blockStream = (StreamBlockInputStream)
keyInputStream.getPartStreams().get(0);
+ long length = blockStream.getLength();
+ blockStream.seek(10);
+ long position = blockStream.getPos();
+ assertThrows(IOException.class, () -> blockStream.seek(length + 1),
+ "Seek beyond block length should throw exception");
+ assertThrows(IOException.class, () -> blockStream.seek(-1),
+ "Seeking to a negative position should throw exception");
+ assertEquals(position, blockStream.getPos(),
+ "Position should not change after failed seek attempts");
}
}
- private void testReadEmptyBlock(TestBucket bucket) throws Exception {
+ private void testReadEmptyBlock() throws Exception {
String keyName = getNewKeyName();
- int dataLength = 10;
bucket.writeRandomBytes(keyName, 0);
-
try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
-
- byte[] readData = new byte[dataLength];
assertTrue(keyInputStream.getPartStreams().isEmpty());
- IOUtils.read(keyInputStream, readData);
- for (byte b : readData) {
- assertEquals((byte) 0, b);
- }
+ assertEquals(-1, keyInputStream.read());
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]