This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f1f0ec323d HDDS-12007. BlockDataStreamOutput should only send one 
PutBlock during close. (#7645)
f1f0ec323d is described below

commit f1f0ec323d3e5cb3b895c082aad3a02dd67303f6
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Jan 9 00:29:36 2025 -0800

    HDDS-12007. BlockDataStreamOutput should only send one PutBlock during 
close. (#7645)
---
 .../hdds/scm/storage/BlockDataStreamOutput.java    |  4 ++
 .../server/ratis/ContainerStateMachine.java        | 40 +------------
 .../keyvalue/impl/KeyValueStreamDataChannel.java   | 65 ++++------------------
 .../impl/TestKeyValueStreamDataChannel.java        | 65 +++++++++++++++++++++-
 4 files changed, 81 insertions(+), 93 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 8c2883a437..342fcaba9a 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -410,6 +410,10 @@ public class BlockDataStreamOutput implements 
ByteBufferStreamOutput {
     waitFuturesComplete();
     final BlockData blockData = containerBlockData.build();
     if (close) {
+      // HDDS-12007 changed datanodes to ignore the following PutBlock request.
+      // However, clients still have to send it for maintaining compatibility.
+      // Otherwise, new clients won't send a PutBlock.
+      // Then, old datanodes will fail since they expect a PutBlock.
       final ContainerCommandRequestProto putBlockRequest
           = ContainerProtocolCalls.getPutBlockRequest(
               xceiverClient.getPipeline(), blockData, true, tokenString);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 23be4138b6..a032531162 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -524,21 +524,6 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     return response;
   }
 
-  private CompletableFuture<ContainerCommandResponseProto> link(
-      ContainerCommandRequestProto requestProto, LogEntryProto entry) {
-    return CompletableFuture.supplyAsync(() -> {
-      final DispatcherContext context = DispatcherContext
-          .newBuilder(DispatcherContext.Op.STREAM_LINK)
-          .setTerm(entry.getTerm())
-          .setLogIndex(entry.getIndex())
-          .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
-          .setContainer2BCSIDMap(container2BCSIDMap)
-          .build();
-
-      return dispatchCommand(requestProto, context);
-    }, executor);
-  }
-
   private CompletableFuture<Message> writeStateMachineData(
       ContainerCommandRequestProto requestProto, long entryIndex, long term,
       long startTime) {
@@ -689,29 +674,8 @@ public class ContainerStateMachine extends 
BaseStateMachine {
 
     final KeyValueStreamDataChannel kvStreamDataChannel =
         (KeyValueStreamDataChannel) dataChannel;
-
-    final ContainerCommandRequestProto request =
-        kvStreamDataChannel.getPutBlockRequest();
-
-    return link(request, entry).whenComplete((response, e) -> {
-      if (e != null) {
-        LOG.warn("Failed to link logEntry {} for request {}",
-            TermIndex.valueOf(entry), request, e);
-      }
-      if (response != null) {
-        final ContainerProtos.Result result = response.getResult();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{} to link logEntry {} for request {}, response: {}",
-              result, TermIndex.valueOf(entry), request, response);
-        }
-        if (result == ContainerProtos.Result.SUCCESS) {
-          kvStreamDataChannel.setLinked();
-          return;
-        }
-      }
-      // failed to link, cleanup
-      kvStreamDataChannel.cleanUp();
-    });
+    kvStreamDataChannel.setLinked();
+    return CompletableFuture.completedFuture(null);
   }
 
   private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index 7500860229..52838aff2e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -20,14 +20,11 @@ package org.apache.hadoop.ozone.container.keyvalue.impl;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.util.ReferenceCountedObject;
 import org.slf4j.Logger;
@@ -36,9 +33,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * This class is used to get the DataChannel for streaming.
@@ -53,8 +48,6 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
 
   private final Buffers buffers = new Buffers(
       BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX);
-  private final AtomicReference<ContainerCommandRequestProto> putBlockRequest
-      = new AtomicReference<>();
   private final AtomicBoolean closed = new AtomicBoolean();
 
   KeyValueStreamDataChannel(File file, ContainerData containerData,
@@ -90,7 +83,7 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
     return src.get().remaining();
   }
 
-  private static void writeFully(ByteBuffer b, WriteMethod writeMethod)
+  static void writeFully(ByteBuffer b, WriteMethod writeMethod)
       throws IOException {
     while (b.remaining() > 0) {
       final int written = writeMethod.applyAsInt(b);
@@ -100,11 +93,6 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
     }
   }
 
-  public ContainerCommandRequestProto getPutBlockRequest() {
-    return Objects.requireNonNull(putBlockRequest.get(),
-        () -> "putBlockRequest == null, " + this);
-  }
-
   void assertOpen() throws IOException {
     if (closed.get()) {
       throw new IOException("Already closed: " + this);
@@ -115,7 +103,7 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
   public void close() throws IOException {
     if (closed.compareAndSet(false, true)) {
       try {
-        putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
+        writeBuffers();
       } finally {
         super.close();
       }
@@ -130,22 +118,23 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
     }
   }
 
-  static ContainerCommandRequestProto closeBuffers(
-      Buffers buffers, WriteMethod writeMethod) throws IOException {
+  /**
+   * Write the data in {@link #buffers} to the channel.
+   * Note that the PutBlock proto at the end is ignored; see HDDS-12007.
+   */
+  private void writeBuffers() throws IOException {
     final ReferenceCountedObject<ByteBuf> ref = buffers.pollAll();
     final ByteBuf buf = ref.retain();
-    final ContainerCommandRequestProto putBlockRequest;
     try {
-      putBlockRequest = readPutBlockRequest(buf);
+      setEndIndex(buf);
       // write the remaining data
-      writeFully(buf.nioBuffer(), writeMethod);
+      writeFully(buf.nioBuffer(), super::writeFileChannel);
     } finally {
       ref.release();
     }
-    return putBlockRequest;
   }
 
-  private static int readProtoLength(ByteBuf b, int lengthIndex) {
+  static int readProtoLength(ByteBuf b, int lengthIndex) {
     final int readerIndex = b.readerIndex();
     LOG.debug("{}, lengthIndex = {}, readerIndex = {}",
         b, lengthIndex, readerIndex);
@@ -158,8 +147,8 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
     return b.nioBuffer().getInt();
   }
 
-  static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b)
-      throws IOException {
+  /** Set end index to the proto index in order to ignore the proto. */
+  static void setEndIndex(ByteBuf b) {
     //   readerIndex   protoIndex   lengthIndex    readerIndex+readableBytes
     //         V            V             V                              V
     // format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---|
@@ -168,37 +157,7 @@ public class KeyValueStreamDataChannel extends 
StreamDataChannelBase {
     final int protoLength = readProtoLength(b.duplicate(), lengthIndex);
     final int protoIndex = lengthIndex - protoLength;
 
-    final ContainerCommandRequestProto proto;
-    try {
-      proto = readPutBlockRequest(b.slice(protoIndex, 
protoLength).nioBuffer());
-    } catch (Throwable t) {
-      RatisHelper.debug(b, "catch", LOG);
-      throw new IOException("Failed to readPutBlockRequest from " + b
-          + ": readerIndex=" + readerIndex
-          + ", protoIndex=" + protoIndex
-          + ", protoLength=" + protoLength
-          + ", lengthIndex=" + lengthIndex, t);
-    }
-
     // set index for reading data
     b.writerIndex(protoIndex);
-
-    return proto;
-  }
-
-  private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b)
-      throws IOException {
-    RatisHelper.debug(b, "readPutBlockRequest", LOG);
-    final ByteString byteString = ByteString.copyFrom(b);
-
-    final ContainerCommandRequestProto request =
-        ContainerCommandRequestMessage.toProto(byteString, null);
-
-    if (!request.hasPutBlock()) {
-      throw new StorageContainerException(
-          "Malformed PutBlock request. trace ID: " + request.getTraceID(),
-          ContainerProtos.Result.MALFORMED_REQUEST);
-    }
-    return request;
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
index e6067e5c56..99793a0201 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
@@ -22,10 +22,14 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.ClientVersion;
 import 
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.io.StandardWriteOption;
@@ -58,9 +62,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import static 
org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX;
 import static 
org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.executePutBlockClose;
 import static 
org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.getProtoLength;
-import static 
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.closeBuffers;
-import static 
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.readPutBlockRequest;
 import static 
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeBuffers;
+import static 
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeFully;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -106,6 +109,49 @@ public class TestKeyValueStreamDataChannel {
     assertEquals(PUT_BLOCK_PROTO, proto);
   }
 
+  static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b) throws 
IOException {
+    //   readerIndex   protoIndex   lengthIndex    readerIndex+readableBytes
+    //         V            V             V                              V
+    // format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---|
+    final int readerIndex = b.readerIndex();
+    final int lengthIndex = readerIndex + b.readableBytes() - 4;
+    final int protoLength = 
KeyValueStreamDataChannel.readProtoLength(b.duplicate(), lengthIndex);
+    final int protoIndex = lengthIndex - protoLength;
+
+    final ContainerCommandRequestProto proto;
+    try {
+      proto = readPutBlockRequest(b.slice(protoIndex, 
protoLength).nioBuffer());
+    } catch (Throwable t) {
+      RatisHelper.debug(b, "catch", LOG);
+      throw new IOException("Failed to readPutBlockRequest from " + b
+          + ": readerIndex=" + readerIndex
+          + ", protoIndex=" + protoIndex
+          + ", protoLength=" + protoLength
+          + ", lengthIndex=" + lengthIndex, t);
+    }
+
+    // set index for reading data
+    b.writerIndex(protoIndex);
+
+    return proto;
+  }
+
+  private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b)
+      throws IOException {
+    RatisHelper.debug(b, "readPutBlockRequest", LOG);
+    final ByteString byteString = ByteString.copyFrom(b);
+
+    final ContainerCommandRequestProto request =
+        ContainerCommandRequestMessage.toProto(byteString, null);
+
+    if (!request.hasPutBlock()) {
+      throw new StorageContainerException(
+          "Malformed PutBlock request. trace ID: " + request.getTraceID(),
+          Result.MALFORMED_REQUEST);
+    }
+    return request;
+  }
+
   @Test
   public void testBuffers() throws Exception {
     final ExecutorService executor = Executors.newFixedThreadPool(32);
@@ -230,6 +276,21 @@ public class TestKeyValueStreamDataChannel {
           new Reply(true, 0, putBlockRequest));
     }
 
+    static ContainerCommandRequestProto closeBuffers(
+        Buffers buffers, WriteMethod writeMethod) throws IOException {
+      final ReferenceCountedObject<ByteBuf> ref = buffers.pollAll();
+      final ByteBuf buf = ref.retain();
+      final ContainerCommandRequestProto putBlockRequest;
+      try {
+        putBlockRequest = readPutBlockRequest(buf);
+        // write the remaining data
+        writeFully(buf.nioBuffer(), writeMethod);
+      } finally {
+        ref.release();
+      }
+      return putBlockRequest;
+    }
+
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(
         FilePositionCount filePositionCount, WriteOption... writeOptions) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to