This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch HDDS-10685
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-10685 by this push:
new 625a6ad85c HDDS-11651. Support short-circuit read channel for freon rk
command (#8316)
625a6ad85c is described below
commit 625a6ad85cd17d5f67fd264bad2d36c3f3da90a8
Author: Sammi Chen <[email protected]>
AuthorDate: Tue May 13 01:43:22 2025 +0800
HDDS-11651. Support short-circuit read channel for freon rk command (#8316)
---
hadoop-ozone/dist/pom.xml | 2 +-
.../dist/src/main/compose/ozone/short-circuit.yaml | 25 ++++++++++++++++
.../src/main/compose/ozone/test-short-circuit.sh | 35 ++++++++++++++++++++++
hadoop-ozone/dist/src/main/compose/testlib.sh | 1 +
.../src/main/smoketest/freon/read-write-key.robot | 7 +++++
.../hadoop/ozone/freon/RandomKeyGenerator.java | 34 +++++++++++++++++++++
6 files changed, 103 insertions(+), 1 deletion(-)
diff --git a/hadoop-ozone/dist/pom.xml b/hadoop-ozone/dist/pom.xml
index 46d50f0ec9..ff9402560e 100644
--- a/hadoop-ozone/dist/pom.xml
+++ b/hadoop-ozone/dist/pom.xml
@@ -25,7 +25,7 @@
<name>Apache Ozone Distribution</name>
<properties>
<!-- suffix appended to Ozone version to get Docker image version -->
- <docker.ozone-runner.version>20241216-1-jdk21</docker.ozone-runner.version>
+ <docker.ozone-runner.version>20250410-1-jdk21</docker.ozone-runner.version>
<docker.ozone-testkr5b.image>ghcr.io/apache/ozone-testkrb5:20241129-1</docker.ozone-testkr5b.image>
<docker.ozone.image>apache/ozone</docker.ozone.image>
<docker.ozone.image.flavor>-rocky</docker.ozone.image.flavor>
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/short-circuit.yaml
b/hadoop-ozone/dist/src/main/compose/ozone/short-circuit.yaml
new file mode 100644
index 0000000000..1baa4b4a01
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/compose/ozone/short-circuit.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+x-short-circuit-config:
+ &short-circuit-config
+ environment:
+ - OZONE-SITE.XML_ozone.client.read.short-circuit=true
+ - OZONE-SITE.XML_ozone.domain.socket.path=/opt/ozone_dn_socket
+
+services:
+ datanode:
+ <<: *short-circuit-config
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/test-short-circuit.sh
b/hadoop-ozone/dist/src/main/compose/ozone/test-short-circuit.sh
new file mode 100755
index 0000000000..bec702c212
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/compose/ozone/test-short-circuit.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#suite:misc
+
+COMPOSE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+export COMPOSE_DIR
+
+export SECURITY_ENABLED=false
+export OZONE_REPLICATION_FACTOR=3
+export SHORT_CIRCUIT_READ_ENABLED=true
+
+# shellcheck source=/dev/null
+source "$COMPOSE_DIR/../testlib.sh"
+
+export COMPOSE_FILE=docker-compose.yaml:short-circuit.yaml
+
+start_docker_env 3
+
+execute_robot_test datanode freon/read-write-key.robot
+
diff --git a/hadoop-ozone/dist/src/main/compose/testlib.sh
b/hadoop-ozone/dist/src/main/compose/testlib.sh
index 8888127ee9..e26dc3ce49 100755
--- a/hadoop-ozone/dist/src/main/compose/testlib.sh
+++ b/hadoop-ozone/dist/src/main/compose/testlib.sh
@@ -249,6 +249,7 @@ execute_robot_test(){
-v OM_SERVICE_ID:"${OM_SERVICE_ID:-om}" \
-v OZONE_DIR:"${OZONE_DIR}" \
-v SECURITY_ENABLED:"${SECURITY_ENABLED}" \
+ -v SHORT_CIRCUIT_READ_ENABLED:"${SHORT_CIRCUIT_READ_ENABLED:-false}" \
-v SCM:"${SCM}" \
${ARGUMENTS[@]} --log NONE --report NONE --output "$OUTPUT_PATH" \
"$SMOKETEST_DIR_INSIDE/$TEST"
diff --git a/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot
b/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot
index f98fdc1950..fa6104c4c7 100644
--- a/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot
@@ -62,3 +62,10 @@ Run 50 % of read-key tasks, 40 % list-key tasks and 10 % of
write-key tasks for
${result} = Execute ozone freon ockrw -n ${keysCount} -t
10 --percentage-read 50 --percentage-list 40 -r 100 -v voltest -b buckettest -p
performanceTest
Should contain ${result} Successful executions:
${keysCount}
+Run rk with key validation through short-circuit channel
+ Pass Execution If '${SHORT_CIRCUIT_READ_ENABLED}' == 'false' Skip
when short-circuit read is disabled
+
+ ${keysCount} = BuiltIn.Set Variable 10
+ ${result} = Execute ozone freon rk --numOfVolumes 1
--numOfBuckets 1 --numOfKeys ${keysCount} --keySize 1MB
--replication-type=RATIS --factor=THREE --validate-writes
--validation-channel=short-circuit
+ Should contain ${result} Status: Success
+ Should contain ${result} XceiverClientShortCircuit
is created for pipeline
diff --git
a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
index 3c2e364668..6de2bbe394 100644
---
a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
+++
b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageSize;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -161,6 +162,11 @@ public final class RandomKeyGenerator implements
Callable<Void>, FreonSubcommand
defaultValue = "1")
private int numOfValidateThreads = 1;
+ @Option(names = {"--validation-channel"},
+ description = "grpc or short-circuit.",
+ defaultValue = "grpc")
+ private String validationChannel = "grpc";
+
@Option(
names = {"--buffer-size", "--bufferSize"},
description = "Specifies the buffer size while writing. Full name " +
@@ -206,6 +212,7 @@ public final class RandomKeyGenerator implements
Callable<Void>, FreonSubcommand
private AtomicLong bucketCreationTime;
private AtomicLong keyCreationTime;
private AtomicLong keyWriteTime;
+ private AtomicLong keyReadTime;
private AtomicLong totalBytesWritten;
@@ -289,6 +296,21 @@ public Void call() throws Exception {
+ HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA + " is set to false.");
validateWrites = false;
}
+ OzoneClientConfig clientConfig =
ozoneConfiguration.getObject(OzoneClientConfig.class);
+ if (validationChannel.equalsIgnoreCase("grpc")) {
+ clientConfig.setShortCircuit(false);
+ ozoneConfiguration.setFromObject(clientConfig);
+ } else if (validationChannel.equalsIgnoreCase("short-circuit")) {
+ boolean shortCircuit = clientConfig.isShortCircuitEnabled();
+ if (!shortCircuit) {
+ LOG.error("Short-circuit read is not enabled");
+ return null;
+ }
+ } else {
+ LOG.error("'--validate-channel={}' is not supported", validationChannel);
+ return null;
+ }
+
init(ozoneConfiguration);
replicationConfig = replication.fromParamsOrConfig(ozoneConfiguration);
@@ -331,6 +353,7 @@ public Void call() throws Exception {
totalWritesValidated = new AtomicLong();
writeValidationSuccessCount = new AtomicLong();
writeValidationFailureCount = new AtomicLong();
+ keyReadTime = new AtomicLong();
validationQueue = new LinkedBlockingQueue<>();
validateExecutor = Executors.newFixedThreadPool(numOfValidateThreads);
@@ -361,11 +384,13 @@ public Void call() throws Exception {
} else {
progressbar.shutdown();
}
+ LOG.info("Data generation is completed");
if (validateExecutor != null) {
while (!validationQueue.isEmpty()) {
Thread.sleep(CHECK_INTERVAL_MILLIS);
}
+ LOG.info("Data validation is completed");
validateExecutor.shutdown();
validateExecutor.awaitTermination(Integer.MAX_VALUE,
TimeUnit.MILLISECONDS);
@@ -491,6 +516,13 @@ void printStats(PrintStream out) {
writeValidationSuccessCount);
out.println("Unsuccessful validation: " +
writeValidationFailureCount);
+
+ long averageKeyReadTime =
+ TimeUnit.NANOSECONDS.toMillis(keyReadTime.get()) /
numOfValidateThreads;
+ String prettyAverageKeyReadTime = DurationFormatUtils
+ .formatDuration(averageKeyReadTime, DURATION_FORMAT);
+ out.println(
+ "Average Time spent in key read and validation: " +
prettyAverageKeyReadTime);
}
out.println("Total Execution time: " + execTime);
out.println("***************************************************");
@@ -1184,9 +1216,11 @@ public void run() {
KeyValidate kv = validationQueue.poll(5, TimeUnit.SECONDS);
if (kv != null) {
OzoneInputStream is = kv.bucket.readKey(kv.keyName);
+ long keyReadStart = System.nanoTime();
dig.getMessageDigest().reset();
byte[] curDigest = dig.digest(is);
totalWritesValidated.getAndIncrement();
+ keyReadTime.getAndAdd(System.nanoTime() - keyReadStart);
if (MessageDigest.isEqual(kv.digest, curDigest)) {
writeValidationSuccessCount.getAndIncrement();
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]