This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new bad3e166aa3 HDDS-14035. StreamRead: Positioned-read should not do
pre-read. (#9425)
bad3e166aa3 is described below
commit bad3e166aa3e4e8d07526a8ccba0cefeab5119e9
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Dec 4 10:14:24 2025 -0800
HDDS-14035. StreamRead: Positioned-read should not do pre-read. (#9425)
---
.../hadoop/hdds/scm/storage/ByteBufferReader.java | 21 ++--
.../hdds/scm/storage/ExtendedInputStream.java | 26 +++--
.../hdds/scm/storage/MultipartInputStream.java | 39 ++++++-
.../hdds/scm/storage/StreamBlockInputStream.java | 120 ++++++++++++++-------
.../rpc/read/TestStreamBlockInputStream.java | 55 ++++++++++
.../ozone/client/rpc/read/TestStreamRead.java | 24 ++---
.../apache/hadoop/fs/ozone/OzoneFSInputStream.java | 8 ++
7 files changed, 220 insertions(+), 73 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java
index f4b493144e5..387126028b1 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferReader.java
@@ -17,11 +17,12 @@
package org.apache.hadoop.hdds.scm.storage;
-import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.Objects;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.ratis.util.Preconditions;
/**
* An {@link ByteReaderStrategy} implementation which supports ByteBuffer as
the
@@ -32,18 +33,22 @@ public class ByteBufferReader implements ByteReaderStrategy
{
private int targetLen;
public ByteBufferReader(ByteBuffer buf) {
- if (buf == null) {
- throw new NullPointerException();
- }
- this.readBuf = buf;
+ this.readBuf = Objects.requireNonNull(buf, "buf == null");
this.targetLen = buf.remaining();
}
+ ByteBuffer getBuffer() {
+ return readBuf;
+ }
+
+ int readImpl(InputStream inputStream) throws IOException {
+ return Preconditions.assertInstanceOf(inputStream,
ByteBufferReadable.class)
+ .read(readBuf);
+ }
+
@Override
public int readFromBlock(InputStream is, int numBytesToRead) throws
IOException {
- Preconditions.checkArgument(is != null);
- Preconditions.checkArgument(is instanceof ByteBufferReadable);
// change buffer limit
int bufferLimit = readBuf.limit();
if (numBytesToRead < targetLen) {
@@ -51,7 +56,7 @@ public int readFromBlock(InputStream is, int numBytesToRead)
throws
}
int numBytesRead;
try {
- numBytesRead = ((ByteBufferReadable)is).read(readBuf);
+ numBytesRead = readImpl(is);
} finally {
// restore buffer limit
if (numBytesToRead < targetLen) {
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
index 0ff7f1fdd78..75e483ad55e 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
@@ -36,6 +36,17 @@ public abstract class ExtendedInputStream extends InputStream
protected static final int EOF = -1;
+ /**
+ * Positioned read.
+ *
+ * @param position the starting position of the read.
+ * @param buffer the buffer for storing the data.
+ * @return true iff positioned read is supported in this implementation.
+ */
+ public boolean readFully(long position, ByteBuffer buffer) throws
IOException {
+ return false;
+ }
+
@Override
public synchronized int read() throws IOException {
byte[] buf = new byte[1];
@@ -47,19 +58,16 @@ public synchronized int read() throws IOException {
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
- ByteReaderStrategy strategy = new ByteArrayReader(b, off, len);
- int bufferLen = strategy.getTargetLength();
- if (bufferLen == 0) {
- return 0;
- }
- return readWithStrategy(strategy);
+ return read(new ByteArrayReader(b, off, len));
}
@Override
public synchronized int read(ByteBuffer byteBuffer) throws IOException {
- ByteReaderStrategy strategy = new ByteBufferReader(byteBuffer);
- int bufferLen = strategy.getTargetLength();
- if (bufferLen == 0) {
+ return read(new ByteBufferReader(byteBuffer));
+ }
+
+ public synchronized int read(ByteReaderStrategy strategy) throws IOException
{
+ if (strategy.getTargetLength() == 0) {
return 0;
}
return readWithStrategy(strategy);
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
index e48b704aade..a28658b1ebb 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
@@ -18,13 +18,16 @@
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.ratis.util.Preconditions;
/**
* A stream for accessing multipart streams.
@@ -36,6 +39,7 @@ public class MultipartInputStream extends ExtendedInputStream
{
// List of PartInputStream, one for each part of the key
private final List<? extends PartInputStream> partStreams;
+ private final boolean isStreamBlockInputStream;
// partOffsets[i] stores the index of the first data byte in
// partStream w.r.t the whole key data.
@@ -58,11 +62,11 @@ public class MultipartInputStream extends
ExtendedInputStream {
public MultipartInputStream(String keyName,
List<? extends PartInputStream> inputStreams) {
-
- Preconditions.checkNotNull(inputStreams);
+ Objects.requireNonNull(inputStreams, "inputStreams == null");
this.key = keyName;
- this.partStreams = inputStreams;
+ this.partStreams = Collections.unmodifiableList(inputStreams);
+ this.isStreamBlockInputStream = !inputStreams.isEmpty() &&
inputStreams.get(0) instanceof StreamBlockInputStream;
// Calculate and update the partOffsets
this.partOffsets = new long[inputStreams.size()];
@@ -70,6 +74,9 @@ public MultipartInputStream(String keyName,
long streamLength = 0L;
for (PartInputStream partInputStream : inputStreams) {
this.partOffsets[i++] = streamLength;
+ if (isStreamBlockInputStream) {
+ Preconditions.assertInstanceOf(partInputStream,
StreamBlockInputStream.class);
+ }
streamLength += partInputStream.getLength();
}
this.length = streamLength;
@@ -78,7 +85,7 @@ public MultipartInputStream(String keyName,
@Override
protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
throws IOException {
- Preconditions.checkArgument(strategy != null);
+ Objects.requireNonNull(strategy, "strategy == null");
checkOpen();
int totalReadLen = 0;
@@ -176,6 +183,28 @@ public synchronized void seek(long pos) throws IOException
{
prevPartIndex = partIndex;
}
+ @Override
+ public boolean readFully(long position, ByteBuffer buffer) throws
IOException {
+ if (!isStreamBlockInputStream) {
+ return false;
+ }
+
+ final long oldPos = getPos();
+ seek(position);
+ try {
+ read(new ByteBufferReader(buffer) {
+ @Override
+ int readImpl(InputStream inputStream) throws IOException {
+ return Preconditions.assertInstanceOf(inputStream,
StreamBlockInputStream.class)
+ .readFully(getBuffer(), false);
+ }
+ });
+ } finally {
+ seek(oldPos);
+ }
+ return true;
+ }
+
public synchronized void initialize() throws IOException {
// Pre-check that the stream has not been intialized already
if (initialized) {
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
index 0d1e8ff1003..72e5d11edc4 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java
@@ -19,6 +19,7 @@
import static org.apache.ratis.thirdparty.io.grpc.Status.Code.CANCELLED;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
@@ -27,13 +28,11 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.lang3.NotImplementedException;
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -51,6 +50,7 @@
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.util.Preconditions;
@@ -61,11 +61,13 @@
* An {@link java.io.InputStream} called from KeyInputStream to read a block
from the
* container.
*/
-public class StreamBlockInputStream extends BlockExtendedInputStream
- implements Seekable, CanUnbuffer, ByteBufferReadable {
+public class StreamBlockInputStream extends BlockExtendedInputStream {
private static final Logger LOG =
LoggerFactory.getLogger(StreamBlockInputStream.class);
private static final int EOF = -1;
+ private static final AtomicInteger STREAM_ID = new AtomicInteger(0);
+ private static final AtomicInteger READER_ID = new AtomicInteger(0);
+ private final String name = "stream" + STREAM_ID.getAndIncrement();
private final BlockID blockID;
private final long blockLength;
private final int responseDataSize = 1 << 20; // 1 MB
@@ -119,7 +121,7 @@ public synchronized long getPos() {
@Override
public synchronized int read() throws IOException {
checkOpen();
- if (!dataAvailableToRead(1)) {
+ if (!dataAvailableToRead(1, true)) {
return EOF;
}
position++;
@@ -134,10 +136,14 @@ public synchronized int read(byte[] b, int off, int len)
throws IOException {
@Override
public synchronized int read(ByteBuffer targetBuf) throws IOException {
+ return readFully(targetBuf, true);
+ }
+
+ synchronized int readFully(ByteBuffer targetBuf, boolean preRead) throws
IOException {
checkOpen();
int read = 0;
while (targetBuf.hasRemaining()) {
- if (!dataAvailableToRead(targetBuf.remaining())) {
+ if (!dataAvailableToRead(targetBuf.remaining(), preRead)) {
break;
}
int toCopy = Math.min(buffer.remaining(), targetBuf.remaining());
@@ -151,7 +157,7 @@ public synchronized int read(ByteBuffer targetBuf) throws
IOException {
return read > 0 ? read : EOF;
}
- private synchronized boolean dataAvailableToRead(int length) throws
IOException {
+ private synchronized boolean dataAvailableToRead(int length, boolean
preRead) throws IOException {
if (position >= blockLength) {
return false;
}
@@ -160,7 +166,7 @@ private synchronized boolean dataAvailableToRead(int
length) throws IOException
if (bufferHasRemaining()) {
return true;
}
- buffer = streamingReader.read(length);
+ buffer = streamingReader.read(length, preRead);
return bufferHasRemaining();
}
@@ -180,11 +186,12 @@ public synchronized void seek(long pos) throws
IOException {
throw new IOException("Cannot seek to negative offset");
}
if (pos > blockLength) {
- throw new IOException("Cannot seek after the end of the block");
+ throw new EOFException("Failed to seek to position " + pos + " > block
length = " + blockLength);
}
if (pos == position) {
return;
}
+ LOG.debug("{}: seek {} -> {}", this, position, pos);
closeStream();
position = pos;
requestedLength = pos;
@@ -204,6 +211,7 @@ public synchronized void unbuffer() {
private synchronized void closeStream() {
if (streamingReader != null) {
+ LOG.debug("Closing {}", streamingReader);
streamingReader.onCompleted();
streamingReader = null;
}
@@ -252,13 +260,14 @@ private synchronized void initialize() throws IOException
{
}
}
- synchronized void readBlock(int length) throws IOException {
+ synchronized void readBlock(int length, boolean preRead) throws IOException {
final long required = position + length - requestedLength;
- final long readLength = required + preReadSize;
+ final long preReadLength = preRead ? preReadSize : 0;
+ final long readLength = required + preReadLength;
if (readLength > 0) {
LOG.debug("position {}, length {}, requested {}, diff {}, readLength {},
preReadSize={}",
- position, length, requestedLength, required, readLength,
preReadSize);
+ position, length, requestedLength, required, readLength,
preReadLength);
readBlockImpl(readLength);
requestedLength += readLength;
}
@@ -314,10 +323,16 @@ private synchronized void releaseStreamResources() {
}
}
+ @Override
+ public String toString() {
+ return name;
+ }
+
/**
* Implementation of a StreamObserver used to received and buffer streaming
GRPC reads.
*/
public class StreamingReader implements StreamingReaderSpi {
+ private final String name = StreamBlockInputStream.this.name + "-reader" +
READER_ID.getAndIncrement();
/** Response queue: poll is blocking while offer is non-blocking. */
private final BlockingQueue<ReadBlockResponseProto> responseQueue = new
LinkedBlockingQueue<>();
@@ -336,7 +351,11 @@ void checkError() throws IOException {
}
}
- ReadBlockResponseProto poll() throws IOException {
+ ReadBlockResponseProto poll(int timeout, TimeUnit timeoutUnit) throws
IOException {
+ final long timeoutNanos = timeoutUnit.toNanos(timeout);
+ final long startTime = System.nanoTime();
+ final long pollTimeoutNanos = Math.min(timeoutNanos / 10, 100_000_000);
+
while (true) {
checkError();
if (future.isDone()) {
@@ -345,7 +364,7 @@ ReadBlockResponseProto poll() throws IOException {
final ReadBlockResponseProto proto;
try {
- proto = responseQueue.poll(1, TimeUnit.SECONDS);
+ proto = responseQueue.poll(pollTimeoutNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for response", e);
@@ -353,46 +372,53 @@ ReadBlockResponseProto poll() throws IOException {
if (proto != null) {
return proto;
}
+
+ final long elapsedNanos = System.nanoTime() - startTime;
+ if (elapsedNanos >= timeoutNanos) {
+ setFailedAndThrow(new TimeoutIOException(
+ "Timed out " + timeout + " " + timeoutUnit + " waiting for
response"));
+ return null;
+ }
}
}
- private ByteBuffer read(int length) throws IOException {
+ private ByteBuffer read(int length, boolean preRead) throws IOException {
checkError();
if (future.isDone()) {
return null; // Stream ended
}
- readBlock(length);
+ readBlock(length, preRead);
while (true) {
final ByteBuffer buf = readFromQueue();
- if (buf.hasRemaining()) {
+ if (buf != null && buf.hasRemaining()) {
return buf;
}
}
}
ByteBuffer readFromQueue() throws IOException {
- final ReadBlockResponseProto readBlock = poll();
+ final ReadBlockResponseProto readBlock = poll(10, TimeUnit.SECONDS);
// The server always returns data starting from the last checksum
boundary. Therefore if the reader position is
// ahead of the position we received from the server, we need to adjust
the buffer position accordingly.
// If the reader position is behind
- ByteBuffer buf = readBlock.getData().asReadOnlyByteBuffer();
- long blockOffset = readBlock.getOffset();
- long pos = getPos();
+ final ByteString data = readBlock.getData();
+ final ByteBuffer dataBuffer = data.asReadOnlyByteBuffer();
+ final long blockOffset = readBlock.getOffset();
+ final long pos = getPos();
if (pos < blockOffset) {
// This should not happen, and if it does, we have a bug.
- throw new IOException("Received data out of order. Position is " + pos
+ " but received data at "
- + blockOffset);
+ setFailedAndThrow(new IllegalStateException(
+ this + ": out of order, position " + pos + " < block offset " +
blockOffset));
}
- if (pos > readBlock.getOffset()) {
- int offset = (int)(pos - readBlock.getOffset());
- if (offset > buf.limit()) {
- offset = buf.limit();
- }
- buf.position(offset);
+ final long offset = pos - blockOffset;
+ if (offset > 0) {
+ dataBuffer.position(Math.toIntExact(Math.min(offset,
dataBuffer.limit())));
}
- return buf;
+ LOG.debug("{}: return response positon {}, length {} (block offset {},
length {})",
+ name, pos, dataBuffer.remaining(), blockOffset, data.size());
+ return dataBuffer;
}
private void releaseResources() {
@@ -448,20 +474,32 @@ StreamingReadResponse getResponse() {
return response.get();
}
- private void setFailed(Throwable throwable) {
+ private <T extends Throwable> void setFailedAndThrow(T throwable) throws T
{
+ if (setFailed(throwable)) {
+ throw throwable;
+ }
+ }
+
+ private boolean setFailed(Throwable throwable) {
final boolean completed = future.completeExceptionally(throwable);
if (!completed) {
- LOG.warn("Already failed: suppressed ", throwable);
+ LOG.warn("{}: Already completed, suppress ", this, throwable);
}
+ return completed;
}
private void setCompleted() {
final boolean changed = future.complete(null);
- if (!changed) {
+ if (changed) {
+ LOG.debug("{} setCompleted success", this);
+ } else {
try {
future.get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.warn("Failed to setCompleted", e);
+ LOG.debug("{} Failed to setCompleted: Already completed", this);
+ } catch (InterruptedException e) {
+ LOG.warn("{}: Interrupted setCompleted", this, e);
+ } catch (ExecutionException e) {
+ LOG.warn("{}: Failed to setCompleted: already completed
exceptionally", this, e);
}
}
@@ -471,8 +509,9 @@ private void setCompleted() {
private void offerToQueue(ReadBlockResponseProto item) {
if (LOG.isDebugEnabled()) {
final ContainerProtos.ChecksumData checksumData =
item.getChecksumData();
- LOG.debug("offerToQueue {} bytes, numChecksums {},
bytesPerChecksum={}",
- item.getData().size(), checksumData.getChecksumsList().size(),
checksumData.getBytesPerChecksum());
+ LOG.debug("{}: enqueue response offset {}, length {}, numChecksums {},
bytesPerChecksum={}",
+ name, item.getOffset(), item.getData().size(),
+ checksumData.getChecksumsList().size(),
checksumData.getBytesPerChecksum());
}
final boolean offered = responseQueue.offer(item);
Preconditions.assertTrue(offered, () -> "Failed to offer " + item);
@@ -483,5 +522,10 @@ public void setStreamingReadResponse(StreamingReadResponse
streamingReadResponse
final boolean set = response.compareAndSet(null, streamingReadResponse);
Preconditions.assertTrue(set, () -> "Failed to set
streamingReadResponse");
}
+
+ @Override
+ public String toString() {
+ return name;
+ }
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
index ad8929bdfb8..44b753210d9 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
@@ -56,7 +56,13 @@ public class TestStreamBlockInputStream extends
TestInputStreamBase {
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.container.common"),
Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ozone.om"),
Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.ratis"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("BackgroundPipelineScrubber"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("ExpiredContainerReplicaOpScrubber"),
Level.ERROR);
+
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("SCMHATransactionMonitor"),
Level.ERROR);
GenericTestUtils.setLogLevel(GrpcXceiverService.class, Level.ERROR);
+
+//
GenericTestUtils.setLogLevel(LoggerFactory.getLogger(StreamBlockInputStream.class),
Level.TRACE);
+//
GenericTestUtils.setLogLevel(LoggerFactory.getLogger(XceiverClientGrpc.class),
Level.TRACE);
}
/**
@@ -94,6 +100,8 @@ void runTestReadKey(int keyLength, boolean randomReadOffset,
OzoneConfiguration
LOG.info("---------------------------------------------------------");
LOG.info("writeRandomBytes {} bytes", inputData.length);
+ runTestPositionedRead(keyName, ByteBuffer.wrap(new
byte[inputData.length]));
+
for (int i = 1; i <= 10; i++) {
runTestReadKey(keyName, keyLength / i, randomReadOffset, keyLength);
}
@@ -129,6 +137,53 @@ private void runTestReadKey(String key, int bufferSize,
boolean randomReadOffset
}
}
+ void runTestPositionedRead(String key, ByteBuffer buffer) throws Exception {
+ try (KeyInputStream in = bucket.getKeyInputStream(key)) {
+ runTestPositionedRead(buffer, in, 0, 0);
+ runTestPositionedRead(buffer, in, 0, 1);
+ runTestPositionedRead(buffer, in, inputData.length, 0);
+ runTestPositionedRead(buffer, in, inputData.length - 1, 1);
+ for (int i = 0; i < 5; i++) {
+ runTestPositionedRead(buffer, in);
+ }
+ }
+ }
+
+ void runTestPositionedRead(ByteBuffer buffer, KeyInputStream in) throws
Exception {
+ final int position = ThreadLocalRandom.current().nextInt(inputData.length
- 1);
+ runTestPositionedRead(buffer, in, position, 0);
+ runTestPositionedRead(buffer, in, position, 1);
+ final int n = 2 + ThreadLocalRandom.current().nextInt(inputData.length - 1
- position);
+ runTestPositionedRead(buffer, in, position, n);
+ }
+
+ void runTestPositionedRead(ByteBuffer buffer, KeyInputStream in, int pos,
int length) throws Exception {
+ LOG.info("runTestPositionedRead: position={}, length={}", pos, length);
+ assertTrue(pos + length <= inputData.length);
+ buffer = buffer.duplicate();
+
+ // seek and read
+ buffer.position(0).limit(length);
+ in.seek(pos);
+ while (buffer.hasRemaining()) {
+ in.read(buffer);
+ }
+ assertData(pos, length, buffer);
+
+ // positioned read
+ buffer.position(0).limit(length);
+ in.readFully(pos, buffer);
+ assertData(pos, length, buffer);
+ }
+
+ void assertData(int pos, int length, ByteBuffer buffer) {
+ buffer.flip();
+ assertEquals(length, buffer.remaining());
+ for (int i = 0; i < length; i++) {
+ assertEquals(inputData[pos + i], buffer.get(i), "pos=" + pos + ", i=" +
i);
+ }
+ }
+
@Test
void testAll() throws Exception {
try (MiniOzoneCluster cluster = newCluster()) {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
index 601246f0752..29a43876630 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamRead.java
@@ -17,7 +17,7 @@
package org.apache.hadoop.ozone.client.rpc.read;
-import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.OutputStream;
@@ -50,8 +50,6 @@
* Tests {@link StreamBlockInputStream}.
*/
public class TestStreamRead {
- private TestBucket bucket;
-
{
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("com"), Level.ERROR);
GenericTestUtils.setLogLevel(LoggerFactory.getLogger("org.apache.hadoop.ipc"),
Level.ERROR);
@@ -70,8 +68,8 @@ public class TestStreamRead {
static final int FLUSH_SIZE = 2 * CHUNK_SIZE; // 2MB
static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE; // 4MB
- static final int BLOCK_SIZE = 128 << 20;
- static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("256M");
+ static final int BLOCK_SIZE = 64 << 20;
+ static final SizeInBytes KEY_SIZE = SizeInBytes.valueOf("128M");
static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
final OzoneConfiguration conf = new OzoneConfiguration();
@@ -93,7 +91,7 @@ static MiniOzoneCluster newCluster(int bytesPerChecksum)
throws Exception {
.applyTo(conf);
return MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(3)
+ .setNumDatanodes(1)
.build();
}
@@ -127,7 +125,7 @@ void runTestReadKey(SizeInBytes keySize, SizeInBytes
bytesPerChecksum) throws Ex
OzoneConfiguration copy = new OzoneConfiguration(conf);
copy.setFromObject(clientConfig);
- final int n = 10;
+ final int n = 5;
final SizeInBytes writeBufferSize = SizeInBytes.valueOf("8MB");
final SizeInBytes[] readBufferSizes = {
SizeInBytes.valueOf("32M"),
@@ -137,7 +135,7 @@ void runTestReadKey(SizeInBytes keySize, SizeInBytes
bytesPerChecksum) throws Ex
};
try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
- bucket = TestBucket.newBuilder(client).build();
+ final TestBucket bucket = TestBucket.newBuilder(client).build();
for (int i = 0; i < n; i++) {
final String keyName = "key" + i;
@@ -147,8 +145,8 @@ void runTestReadKey(SizeInBytes keySize, SizeInBytes
bytesPerChecksum) throws Ex
final String md5 = createKey(bucket.delegate(), keyName, keySize,
writeBufferSize);
for (SizeInBytes readBufferSize : readBufferSizes) {
- runTestReadKey(keyName, keySize, readBufferSize, null);
- runTestReadKey(keyName, keySize, readBufferSize, md5);
+ runTestReadKey(keyName, keySize, readBufferSize, null, bucket);
+ runTestReadKey(keyName, keySize, readBufferSize, md5, bucket);
}
}
}
@@ -170,7 +168,7 @@ static String createKey(OzoneBucket bucket, String keyName,
SizeInBytes keySize,
final long keySizeByte = keySize.getSize();
final long startTime = System.nanoTime();
try (OutputStream stream = bucket.createStreamKey(keyName, keySizeByte,
- RatisReplicationConfig.getInstance(THREE), Collections.emptyMap())) {
+ RatisReplicationConfig.getInstance(ONE), Collections.emptyMap())) {
for (long pos = 0; pos < keySizeByte;) {
final int writeSize = Math.toIntExact(Math.min(buffer.length,
keySizeByte - pos));
stream.write(buffer, 0, writeSize);
@@ -191,8 +189,8 @@ static String createKey(OzoneBucket bucket, String keyName,
SizeInBytes keySize,
return computedMD5;
}
- private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes
bufferSize, String expectedMD5)
- throws Exception {
+ private void runTestReadKey(String keyName, SizeInBytes keySize, SizeInBytes
bufferSize, String expectedMD5,
+ TestBucket bucket) throws Exception {
final long keySizeByte = keySize.getSize();
final MessageDigest md5 = MessageDigest.getInstance("MD5");
// Read the data fully into a large enough byte array
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
index e4133ae57a5..e640c1e6d17 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.hadoop.hdds.scm.storage.ExtendedInputStream;
import org.apache.hadoop.hdds.tracing.TracingUtil;
/**
@@ -168,6 +169,13 @@ public int read(long position, ByteBuffer buf) throws
IOException {
if (!buf.hasRemaining()) {
return 0;
}
+ if (inputStream instanceof ExtendedInputStream) {
+ final int remainingBeforeRead = buf.remaining();
+ if (((ExtendedInputStream) inputStream).readFully(position, buf)) {
+ return remainingBeforeRead - buf.remaining();
+ }
+ }
+
long oldPos = this.getPos();
int bytesRead;
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]