This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f1f0ec323d HDDS-12007. BlockDataStreamOutput should only send one
PutBlock during close. (#7645)
f1f0ec323d is described below
commit f1f0ec323d3e5cb3b895c082aad3a02dd67303f6
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Jan 9 00:29:36 2025 -0800
HDDS-12007. BlockDataStreamOutput should only send one PutBlock during
close. (#7645)
---
.../hdds/scm/storage/BlockDataStreamOutput.java | 4 ++
.../server/ratis/ContainerStateMachine.java | 40 +------------
.../keyvalue/impl/KeyValueStreamDataChannel.java | 65 ++++------------------
.../impl/TestKeyValueStreamDataChannel.java | 65 +++++++++++++++++++++-
4 files changed, 81 insertions(+), 93 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 8c2883a437..342fcaba9a 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -410,6 +410,10 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
waitFuturesComplete();
final BlockData blockData = containerBlockData.build();
if (close) {
+ // HDDS-12007 changed datanodes to ignore the following PutBlock request.
+ // However, clients still have to send it for maintaining compatibility.
+ // Otherwise, new clients won't send a PutBlock.
+ // Then, old datanodes will fail since they expect a PutBlock.
final ContainerCommandRequestProto putBlockRequest
= ContainerProtocolCalls.getPutBlockRequest(
xceiverClient.getPipeline(), blockData, true, tokenString);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 23be4138b6..a032531162 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -524,21 +524,6 @@ public class ContainerStateMachine extends
BaseStateMachine {
return response;
}
- private CompletableFuture<ContainerCommandResponseProto> link(
- ContainerCommandRequestProto requestProto, LogEntryProto entry) {
- return CompletableFuture.supplyAsync(() -> {
- final DispatcherContext context = DispatcherContext
- .newBuilder(DispatcherContext.Op.STREAM_LINK)
- .setTerm(entry.getTerm())
- .setLogIndex(entry.getIndex())
- .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
- .setContainer2BCSIDMap(container2BCSIDMap)
- .build();
-
- return dispatchCommand(requestProto, context);
- }, executor);
- }
-
private CompletableFuture<Message> writeStateMachineData(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
@@ -689,29 +674,8 @@ public class ContainerStateMachine extends
BaseStateMachine {
final KeyValueStreamDataChannel kvStreamDataChannel =
(KeyValueStreamDataChannel) dataChannel;
-
- final ContainerCommandRequestProto request =
- kvStreamDataChannel.getPutBlockRequest();
-
- return link(request, entry).whenComplete((response, e) -> {
- if (e != null) {
- LOG.warn("Failed to link logEntry {} for request {}",
- TermIndex.valueOf(entry), request, e);
- }
- if (response != null) {
- final ContainerProtos.Result result = response.getResult();
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} to link logEntry {} for request {}, response: {}",
- result, TermIndex.valueOf(entry), request, response);
- }
- if (result == ContainerProtos.Result.SUCCESS) {
- kvStreamDataChannel.setLinked();
- return;
- }
- }
- // failed to link, cleanup
- kvStreamDataChannel.cleanUp();
- });
+ kvStreamDataChannel.setLinked();
+ return CompletableFuture.completedFuture(null);
}
private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index 7500860229..52838aff2e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -20,14 +20,11 @@ package org.apache.hadoop.ozone.container.keyvalue.impl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
@@ -36,9 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
/**
* This class is used to get the DataChannel for streaming.
@@ -53,8 +48,6 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
private final Buffers buffers = new Buffers(
BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX);
- private final AtomicReference<ContainerCommandRequestProto> putBlockRequest
- = new AtomicReference<>();
private final AtomicBoolean closed = new AtomicBoolean();
KeyValueStreamDataChannel(File file, ContainerData containerData,
@@ -90,7 +83,7 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
return src.get().remaining();
}
- private static void writeFully(ByteBuffer b, WriteMethod writeMethod)
+ static void writeFully(ByteBuffer b, WriteMethod writeMethod)
throws IOException {
while (b.remaining() > 0) {
final int written = writeMethod.applyAsInt(b);
@@ -100,11 +93,6 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
}
}
- public ContainerCommandRequestProto getPutBlockRequest() {
- return Objects.requireNonNull(putBlockRequest.get(),
- () -> "putBlockRequest == null, " + this);
- }
-
void assertOpen() throws IOException {
if (closed.get()) {
throw new IOException("Already closed: " + this);
@@ -115,7 +103,7 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
- putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
+ writeBuffers();
} finally {
super.close();
}
@@ -130,22 +118,23 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
}
}
- static ContainerCommandRequestProto closeBuffers(
- Buffers buffers, WriteMethod writeMethod) throws IOException {
+ /**
+ * Write the data in {@link #buffers} to the channel.
+ * Note that the PutBlock proto at the end is ignored; see HDDS-12007.
+ */
+ private void writeBuffers() throws IOException {
final ReferenceCountedObject<ByteBuf> ref = buffers.pollAll();
final ByteBuf buf = ref.retain();
- final ContainerCommandRequestProto putBlockRequest;
try {
- putBlockRequest = readPutBlockRequest(buf);
+ setEndIndex(buf);
// write the remaining data
- writeFully(buf.nioBuffer(), writeMethod);
+ writeFully(buf.nioBuffer(), super::writeFileChannel);
} finally {
ref.release();
}
- return putBlockRequest;
}
- private static int readProtoLength(ByteBuf b, int lengthIndex) {
+ static int readProtoLength(ByteBuf b, int lengthIndex) {
final int readerIndex = b.readerIndex();
LOG.debug("{}, lengthIndex = {}, readerIndex = {}",
b, lengthIndex, readerIndex);
@@ -158,8 +147,8 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
return b.nioBuffer().getInt();
}
- static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b)
- throws IOException {
+ /** Set end index to the proto index in order to ignore the proto. */
+ static void setEndIndex(ByteBuf b) {
// readerIndex protoIndex lengthIndex readerIndex+readableBytes
// V V V V
// format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---|
@@ -168,37 +157,7 @@ public class KeyValueStreamDataChannel extends
StreamDataChannelBase {
final int protoLength = readProtoLength(b.duplicate(), lengthIndex);
final int protoIndex = lengthIndex - protoLength;
- final ContainerCommandRequestProto proto;
- try {
- proto = readPutBlockRequest(b.slice(protoIndex,
protoLength).nioBuffer());
- } catch (Throwable t) {
- RatisHelper.debug(b, "catch", LOG);
- throw new IOException("Failed to readPutBlockRequest from " + b
- + ": readerIndex=" + readerIndex
- + ", protoIndex=" + protoIndex
- + ", protoLength=" + protoLength
- + ", lengthIndex=" + lengthIndex, t);
- }
-
// set index for reading data
b.writerIndex(protoIndex);
-
- return proto;
- }
-
- private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b)
- throws IOException {
- RatisHelper.debug(b, "readPutBlockRequest", LOG);
- final ByteString byteString = ByteString.copyFrom(b);
-
- final ContainerCommandRequestProto request =
- ContainerCommandRequestMessage.toProto(byteString, null);
-
- if (!request.hasPutBlock()) {
- throw new StorageContainerException(
- "Malformed PutBlock request. trace ID: " + request.getTraceID(),
- ContainerProtos.Result.MALFORMED_REQUEST);
- }
- return request;
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
index e6067e5c56..99793a0201 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
@@ -22,10 +22,14 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.ClientVersion;
import
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
@@ -58,9 +62,8 @@ import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX;
import static
org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.executePutBlockClose;
import static
org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.getProtoLength;
-import static
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.closeBuffers;
-import static
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.readPutBlockRequest;
import static
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeBuffers;
+import static
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeFully;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -106,6 +109,49 @@ public class TestKeyValueStreamDataChannel {
assertEquals(PUT_BLOCK_PROTO, proto);
}
+ static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b) throws
IOException {
+ // readerIndex protoIndex lengthIndex readerIndex+readableBytes
+ // V V V V
+ // format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---|
+ final int readerIndex = b.readerIndex();
+ final int lengthIndex = readerIndex + b.readableBytes() - 4;
+ final int protoLength =
KeyValueStreamDataChannel.readProtoLength(b.duplicate(), lengthIndex);
+ final int protoIndex = lengthIndex - protoLength;
+
+ final ContainerCommandRequestProto proto;
+ try {
+ proto = readPutBlockRequest(b.slice(protoIndex,
protoLength).nioBuffer());
+ } catch (Throwable t) {
+ RatisHelper.debug(b, "catch", LOG);
+ throw new IOException("Failed to readPutBlockRequest from " + b
+ + ": readerIndex=" + readerIndex
+ + ", protoIndex=" + protoIndex
+ + ", protoLength=" + protoLength
+ + ", lengthIndex=" + lengthIndex, t);
+ }
+
+ // set index for reading data
+ b.writerIndex(protoIndex);
+
+ return proto;
+ }
+
+ private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b)
+ throws IOException {
+ RatisHelper.debug(b, "readPutBlockRequest", LOG);
+ final ByteString byteString = ByteString.copyFrom(b);
+
+ final ContainerCommandRequestProto request =
+ ContainerCommandRequestMessage.toProto(byteString, null);
+
+ if (!request.hasPutBlock()) {
+ throw new StorageContainerException(
+ "Malformed PutBlock request. trace ID: " + request.getTraceID(),
+ Result.MALFORMED_REQUEST);
+ }
+ return request;
+ }
+
@Test
public void testBuffers() throws Exception {
final ExecutorService executor = Executors.newFixedThreadPool(32);
@@ -230,6 +276,21 @@ public class TestKeyValueStreamDataChannel {
new Reply(true, 0, putBlockRequest));
}
+ static ContainerCommandRequestProto closeBuffers(
+ Buffers buffers, WriteMethod writeMethod) throws IOException {
+ final ReferenceCountedObject<ByteBuf> ref = buffers.pollAll();
+ final ByteBuf buf = ref.retain();
+ final ContainerCommandRequestProto putBlockRequest;
+ try {
+ putBlockRequest = readPutBlockRequest(buf);
+ // write the remaining data
+ writeFully(buf.nioBuffer(), writeMethod);
+ } finally {
+ ref.release();
+ }
+ return putBlockRequest;
+ }
+
@Override
public CompletableFuture<DataStreamReply> writeAsync(
FilePositionCount filePositionCount, WriteOption... writeOptions) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]