This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch HDDS-10685
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-10685 by this push:
     new 625a6ad85c HDDS-11651. Support short-circuit read channel for freon rk 
command (#8316)
625a6ad85c is described below

commit 625a6ad85cd17d5f67fd264bad2d36c3f3da90a8
Author: Sammi Chen <[email protected]>
AuthorDate: Tue May 13 01:43:22 2025 +0800

    HDDS-11651. Support short-circuit read channel for freon rk command (#8316)
---
 hadoop-ozone/dist/pom.xml                          |  2 +-
 .../dist/src/main/compose/ozone/short-circuit.yaml | 25 ++++++++++++++++
 .../src/main/compose/ozone/test-short-circuit.sh   | 35 ++++++++++++++++++++++
 hadoop-ozone/dist/src/main/compose/testlib.sh      |  1 +
 .../src/main/smoketest/freon/read-write-key.robot  |  7 +++++
 .../hadoop/ozone/freon/RandomKeyGenerator.java     | 34 +++++++++++++++++++++
 6 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/hadoop-ozone/dist/pom.xml b/hadoop-ozone/dist/pom.xml
index 46d50f0ec9..ff9402560e 100644
--- a/hadoop-ozone/dist/pom.xml
+++ b/hadoop-ozone/dist/pom.xml
@@ -25,7 +25,7 @@
   <name>Apache Ozone Distribution</name>
   <properties>
     <!-- suffix appended to Ozone version to get Docker image version -->
-    <docker.ozone-runner.version>20241216-1-jdk21</docker.ozone-runner.version>
+    <docker.ozone-runner.version>20250410-1-jdk21</docker.ozone-runner.version>
     
<docker.ozone-testkr5b.image>ghcr.io/apache/ozone-testkrb5:20241129-1</docker.ozone-testkr5b.image>
     <docker.ozone.image>apache/ozone</docker.ozone.image>
     <docker.ozone.image.flavor>-rocky</docker.ozone.image.flavor>
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/short-circuit.yaml 
b/hadoop-ozone/dist/src/main/compose/ozone/short-circuit.yaml
new file mode 100644
index 0000000000..1baa4b4a01
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/compose/ozone/short-circuit.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+x-short-circuit-config:
+  &short-circuit-config
+  environment:
+    - OZONE-SITE.XML_ozone.client.read.short-circuit=true
+    - OZONE-SITE.XML_ozone.domain.socket.path=/opt/ozone_dn_socket
+
+services:
+  datanode:
+    <<: *short-circuit-config
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/test-short-circuit.sh 
b/hadoop-ozone/dist/src/main/compose/ozone/test-short-circuit.sh
new file mode 100755
index 0000000000..bec702c212
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/compose/ozone/test-short-circuit.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#suite:misc
+
+COMPOSE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+export COMPOSE_DIR
+
+export SECURITY_ENABLED=false
+export OZONE_REPLICATION_FACTOR=3
+export SHORT_CIRCUIT_READ_ENABLED=true
+
+# shellcheck source=/dev/null
+source "$COMPOSE_DIR/../testlib.sh"
+
+export COMPOSE_FILE=docker-compose.yaml:short-circuit.yaml
+
+start_docker_env 3
+
+execute_robot_test datanode freon/read-write-key.robot
+
diff --git a/hadoop-ozone/dist/src/main/compose/testlib.sh 
b/hadoop-ozone/dist/src/main/compose/testlib.sh
index 8888127ee9..e26dc3ce49 100755
--- a/hadoop-ozone/dist/src/main/compose/testlib.sh
+++ b/hadoop-ozone/dist/src/main/compose/testlib.sh
@@ -249,6 +249,7 @@ execute_robot_test(){
       -v OM_SERVICE_ID:"${OM_SERVICE_ID:-om}" \
       -v OZONE_DIR:"${OZONE_DIR}" \
       -v SECURITY_ENABLED:"${SECURITY_ENABLED}" \
+      -v SHORT_CIRCUIT_READ_ENABLED:"${SHORT_CIRCUIT_READ_ENABLED:-false}" \
       -v SCM:"${SCM}" \
       ${ARGUMENTS[@]} --log NONE --report NONE --output "$OUTPUT_PATH" \
       "$SMOKETEST_DIR_INSIDE/$TEST"
diff --git a/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot 
b/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot
index f98fdc1950..fa6104c4c7 100644
--- a/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot
@@ -62,3 +62,10 @@ Run 50 % of read-key tasks, 40 % list-key tasks and 10 % of 
write-key tasks for
     ${result} =        Execute          ozone freon ockrw -n ${keysCount} -t 
10 --percentage-read 50 --percentage-list 40 -r 100 -v voltest -b buckettest -p 
performanceTest
                        Should contain   ${result}   Successful executions: 
${keysCount}
 
+Run rk with key validation through short-circuit channel
+    Pass Execution If   '${SHORT_CIRCUIT_READ_ENABLED}' == 'false'    Skip 
when short-circuit read is disabled
+
+    ${keysCount} =     BuiltIn.Set Variable   10
+    ${result} =        Execute          ozone freon rk --numOfVolumes 1 
--numOfBuckets 1 --numOfKeys ${keysCount} --keySize 1MB 
--replication-type=RATIS --factor=THREE --validate-writes 
--validation-channel=short-circuit
+                       Should contain   ${result}   Status: Success
+                       Should contain   ${result}   XceiverClientShortCircuit 
is created for pipeline
diff --git 
a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
 
b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
index 3c2e364668..6de2bbe394 100644
--- 
a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
+++ 
b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
@@ -58,6 +58,7 @@
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageSize;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -161,6 +162,11 @@ public final class RandomKeyGenerator implements 
Callable<Void>, FreonSubcommand
       defaultValue = "1")
   private int numOfValidateThreads = 1;
 
+  @Option(names = {"--validation-channel"},
+      description = "grpc or short-circuit.",
+      defaultValue = "grpc")
+  private String validationChannel = "grpc";
+
   @Option(
       names = {"--buffer-size", "--bufferSize"},
       description = "Specifies the buffer size while writing. Full name " +
@@ -206,6 +212,7 @@ public final class RandomKeyGenerator implements 
Callable<Void>, FreonSubcommand
   private AtomicLong bucketCreationTime;
   private AtomicLong keyCreationTime;
   private AtomicLong keyWriteTime;
+  private AtomicLong keyReadTime;
 
   private AtomicLong totalBytesWritten;
 
@@ -289,6 +296,21 @@ public Void call() throws Exception {
           + HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA + " is set to false.");
       validateWrites = false;
     }
+    OzoneClientConfig clientConfig = 
ozoneConfiguration.getObject(OzoneClientConfig.class);
+    if (validationChannel.equalsIgnoreCase("grpc")) {
+      clientConfig.setShortCircuit(false);
+      ozoneConfiguration.setFromObject(clientConfig);
+    } else if (validationChannel.equalsIgnoreCase("short-circuit")) {
+      boolean shortCircuit = clientConfig.isShortCircuitEnabled();
+      if (!shortCircuit) {
+        LOG.error("Short-circuit read is not enabled");
+        return null;
+      }
+    } else {
+      LOG.error("'--validate-channel={}' is not supported", validationChannel);
+      return null;
+    }
+
     init(ozoneConfiguration);
 
     replicationConfig = replication.fromParamsOrConfig(ozoneConfiguration);
@@ -331,6 +353,7 @@ public Void call() throws Exception {
       totalWritesValidated = new AtomicLong();
       writeValidationSuccessCount = new AtomicLong();
       writeValidationFailureCount = new AtomicLong();
+      keyReadTime = new AtomicLong();
 
       validationQueue = new LinkedBlockingQueue<>();
       validateExecutor = Executors.newFixedThreadPool(numOfValidateThreads);
@@ -361,11 +384,13 @@ public Void call() throws Exception {
     } else {
       progressbar.shutdown();
     }
+    LOG.info("Data generation is completed");
 
     if (validateExecutor != null) {
       while (!validationQueue.isEmpty()) {
         Thread.sleep(CHECK_INTERVAL_MILLIS);
       }
+      LOG.info("Data validation is completed");
       validateExecutor.shutdown();
       validateExecutor.awaitTermination(Integer.MAX_VALUE,
           TimeUnit.MILLISECONDS);
@@ -491,6 +516,13 @@ void printStats(PrintStream out) {
           writeValidationSuccessCount);
       out.println("Unsuccessful validation: " +
           writeValidationFailureCount);
+
+      long averageKeyReadTime =
+          TimeUnit.NANOSECONDS.toMillis(keyReadTime.get()) / 
numOfValidateThreads;
+      String prettyAverageKeyReadTime = DurationFormatUtils
+          .formatDuration(averageKeyReadTime, DURATION_FORMAT);
+      out.println(
+          "Average Time spent in key read and validation: " + 
prettyAverageKeyReadTime);
     }
     out.println("Total Execution time: " + execTime);
     out.println("***************************************************");
@@ -1184,9 +1216,11 @@ public void run() {
           KeyValidate kv = validationQueue.poll(5, TimeUnit.SECONDS);
           if (kv != null) {
             OzoneInputStream is = kv.bucket.readKey(kv.keyName);
+            long keyReadStart = System.nanoTime();
             dig.getMessageDigest().reset();
             byte[] curDigest = dig.digest(is);
             totalWritesValidated.getAndIncrement();
+            keyReadTime.getAndAdd(System.nanoTime() - keyReadStart);
             if (MessageDigest.isEqual(kv.digest, curDigest)) {
               writeValidationSuccessCount.getAndIncrement();
             } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to