This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-10685
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 508e2eaa504df13c5e693216aee27b0b28811b6f
Author: Sammi Chen <[email protected]>
AuthorDate: Wed Nov 6 12:50:00 2024 +0800

    HDDS-11622. Support domain socket creation (#7397)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  |  54 ++++
 .../apache/hadoop/hdds/scm/storage/DomainPeer.java | 111 +++++++++
 .../hdds/scm/storage/DomainSocketFactory.java      | 272 +++++++++++++++++++++
 3 files changed, 437 insertions(+)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index e31a2942cb..b962e74576 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -39,6 +39,60 @@ public class OzoneClientConfig {
   private static final Logger LOG =
       LoggerFactory.getLogger(OzoneClientConfig.class);
 
+  public static final boolean OZONE_READ_SHORT_CIRCUIT_DEFAULT = false;
+  public static final String OZONE_DOMAIN_SOCKET_PATH = 
"ozone.domain.socket.path";
+  public static final String OZONE_DOMAIN_SOCKET_PATH_DEFAULT = 
"/var/lib/ozone/dn_socket";
+  public static final String SHORT_CIRCUIT_PREFIX = "read.short-circuit.";
+  public static final short DATA_TRANSFER_VERSION = 28;
+  public static final byte DATA_TRANSFER_MAGIC_CODE = 99;
+
+  @Config(key = "read.short-circuit",
+      defaultValue = "false",
+      type = ConfigType.BOOLEAN,
+      description = "Whether read short-circuit is enabled or not",
+      tags = { ConfigTag.CLIENT, ConfigTag.DATANODE })
+  private boolean shortCircuitEnabled = OZONE_READ_SHORT_CIRCUIT_DEFAULT;
+
+  @Config(key = SHORT_CIRCUIT_PREFIX + "buffer.size",
+      defaultValue = "128KB",
+      type = ConfigType.SIZE,
+      description = "Buffer size of reader/writer.",
+      tags = { ConfigTag.CLIENT, ConfigTag.DATANODE })
+  private int shortCircuitBufferSize = 128 * 1024;
+
+  @Config(key = SHORT_CIRCUIT_PREFIX + "disable.interval",
+      defaultValue = "600",
+      type = ConfigType.LONG,
+      description = "If some unknown IO error happens on Domain socket read, 
short circuit read will be disabled " +
+          "temporarily for this period of time(seconds).",
+      tags = { ConfigTag.CLIENT })
+  private long shortCircuitReadDisableInterval = 60 * 10;
+
+  public boolean isShortCircuitEnabled() {
+    return shortCircuitEnabled;
+  }
+
+  public void setShortCircuit(boolean enabled) {
+    shortCircuitEnabled = enabled;
+  }
+
+
+  public int getShortCircuitBufferSize() {
+    return shortCircuitBufferSize;
+  }
+
+  public void setShortCircuitBufferSize(int size) {
+    this.shortCircuitBufferSize = size;
+  }
+
+  public long getShortCircuitReadDisableInterval() {
+    return shortCircuitReadDisableInterval;
+  }
+
+  public void setShortCircuitReadDisableInterval(long value) {
+    shortCircuitReadDisableInterval = value;
+  }
+
   /**
    * Enum for indicating what mode to use when combining chunk and block
    * checksums to define an aggregate FileChecksum. This should be considered
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java
new file mode 100644
index 0000000000..3fcebb7f0b
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainPeer.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * Represents a peer that we communicate with by using blocking I/O
+ * on a UNIX domain socket.
+ */
+public class DomainPeer implements Closeable {
+  private final DomainSocket socket;
+  private final OutputStream out;
+  private final InputStream in;
+  private final ReadableByteChannel channel;
+  public static final Logger LOG = LoggerFactory.getLogger(DomainPeer.class);
+
+  public DomainPeer(DomainSocket socket) {
+    this.socket = socket;
+    this.out = socket.getOutputStream();
+    this.in = socket.getInputStream();
+    this.channel = socket.getChannel();
+  }
+
+  public ReadableByteChannel getInputStreamChannel() {
+    return channel;
+  }
+
+  public void setReadTimeout(int timeoutMs) throws IOException {
+    socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs);
+  }
+
+  public int getReceiveBufferSize() throws IOException {
+    return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+  }
+
+  public void setWriteTimeout(int timeoutMs) throws IOException {
+    socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs);
+  }
+
+  public boolean isClosed() {
+    return !socket.isOpen();
+  }
+
+  public void close() throws IOException {
+    socket.close();
+    LOG.info("{} is closed", socket);
+  }
+
+  public String getRemoteAddressString() {
+    return "unix:{" + socket.toString() + "}";
+  }
+
+  public String getLocalAddressString() {
+    return "<local>";
+  }
+
+  public InputStream getInputStream() throws IOException {
+    return in;
+  }
+
+  public OutputStream getOutputStream() throws IOException {
+    return out;
+  }
+
+  @Override
+  public String toString() {
+    return "DomainPeer(" + getRemoteAddressString() + ")";
+  }
+
+  public DomainSocket getDomainSocket() {
+    return socket;
+  }
+
+  public boolean hasSecureChannel() {
+    //
+    // Communication over domain sockets is assumed to be secure, since it
+    // doesn't pass over any network. We also carefully control the privileges
+    // that can be used on the domain socket inode and its parent directories.
+    // See 
#{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0}
+    // for details.
+    //
+    // So unless you are running as root or the user launches the service, you 
cannot
+    // launch a man-in-the-middle attach on UNIX domain socket traffic.
+    //
+    return true;
+  }
+}
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java
new file mode 100644
index 0000000000..e62e2a6bfd
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/DomainSocketFactory.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *  A factory to help create DomainSocket.
+ */
+public final class DomainSocketFactory implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DomainSocketFactory.class);
+
+  /**
+   *  Domain socket path state.
+   */
+  public enum PathState {
+    NOT_CONFIGURED(false),
+    DISABLED(false),
+    VALID(true);
+
+    PathState(boolean usableForShortCircuit) {
+      this.usableForShortCircuit = usableForShortCircuit;
+    }
+
+    public boolean getUsableForShortCircuit() {
+      return usableForShortCircuit;
+    }
+    private final boolean usableForShortCircuit;
+  }
+
+  /**
+   *  Domain socket path.
+   */
+  public static class PathInfo {
+    private static final PathInfo NOT_CONFIGURED = new PathInfo("", 
PathState.NOT_CONFIGURED);
+    private static final PathInfo DISABLED = new PathInfo("", 
PathState.DISABLED);
+    private static final PathInfo VALID = new PathInfo("", PathState.VALID);
+
+    private final String path;
+    private final PathState state;
+
+    PathInfo(String path, PathState state) {
+      this.path = path;
+      this.state = state;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public PathState getPathState() {
+      return state;
+    }
+
+    @Override
+    public String toString() {
+      return "PathInfo{path=" + path + ", state=" + state + "}";
+    }
+  }
+
+  public static final String FEATURE = "short-circuit reads";
+  public static final String FEATURE_FLAG = "SC";
+  private static boolean nativeLibraryLoaded = false;
+  private static String nativeLibraryLoadFailureReason;
+  private long pathExpireMills;
+  private final ConcurrentHashMap<String, PathInfo> pathMap;
+  private Timer timer;
+  private boolean isEnabled = false;
+  private String domainSocketPath;
+
+  static {
+    // Try to load native hadoop library and set fallback flag appropriately
+    if (SystemUtils.IS_OS_WINDOWS) {
+      nativeLibraryLoadFailureReason = "UNIX Domain sockets are not available 
on Windows.";
+    } else {
+      LOG.info("Trying to load the custom-built native-hadoop library...");
+      try {
+        System.loadLibrary("hadoop");
+        LOG.info("Loaded the native-hadoop library");
+        nativeLibraryLoaded = true;
+      } catch (Throwable t) {
+        // Ignore failure to continue
+        LOG.info("Failed to load native-hadoop with error: " + t);
+        LOG.info("java.library.path=" + 
System.getProperty("java.library.path"));
+        nativeLibraryLoadFailureReason = "libhadoop cannot be loaded.";
+      }
+
+      if (!nativeLibraryLoaded) {
+        LOG.warn("Unable to load native-hadoop library for your platform... " +
+            "using builtin-java classes where applicable");
+      }
+    }
+  }
+
+  private static volatile DomainSocketFactory instance = null;
+
+  public static DomainSocketFactory getInstance(ConfigurationSource conf) {
+    if (instance == null) {
+      synchronized (DomainSocketFactory.class) {
+        if (instance == null) {
+          instance = new DomainSocketFactory(conf);
+        }
+      }
+    }
+    return instance;
+  }
+
+  private DomainSocketFactory(ConfigurationSource conf) {
+    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+    boolean shortCircuitEnabled = clientConfig.isShortCircuitEnabled();
+    PathInfo pathInfo;
+    long startTime = System.nanoTime();
+    if (!shortCircuitEnabled) {
+      LOG.info(FEATURE + " is disabled.");
+      pathInfo = PathInfo.NOT_CONFIGURED;
+    } else {
+      domainSocketPath = conf.get(OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH,
+          OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH_DEFAULT);
+      if (domainSocketPath.isEmpty()) {
+        throw new IllegalArgumentException(FEATURE + " is enabled but "
+            + OzoneClientConfig.OZONE_DOMAIN_SOCKET_PATH + " is not set.");
+      } else if (!nativeLibraryLoaded) {
+        LOG.warn(FEATURE + " cannot be used because " + 
nativeLibraryLoadFailureReason);
+        pathInfo = PathInfo.DISABLED;
+      } else {
+        pathInfo = PathInfo.VALID;
+        isEnabled = true;
+        timer = new Timer(DomainSocketFactory.class.getSimpleName() + 
"-Timer");
+        LOG.info(FEATURE + " is enabled within {} ns.", System.nanoTime() - 
startTime);
+      }
+    }
+    pathExpireMills = clientConfig.getShortCircuitReadDisableInterval() * 1000;
+    pathMap = new ConcurrentHashMap<>();
+    pathMap.put(domainSocketPath, pathInfo);
+  }
+
+  public boolean isServiceEnabled() {
+    return isEnabled;
+  }
+
+  public boolean isServiceReady() {
+    if (isEnabled) {
+      PathInfo status = pathMap.get(domainSocketPath);
+      return status.getPathState() == PathState.VALID;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Get information about a domain socket path. Caller must make sure that 
addr is a local address.
+   *
+   * @param addr         The local inet address to use.
+   * @return             Information about the socket path.
+   */
+  public PathInfo getPathInfo(InetSocketAddress addr) {
+    if (!isEnabled) {
+      return PathInfo.NOT_CONFIGURED;
+    }
+
+    if (!isServiceReady()) {
+      return PathInfo.DISABLED;
+    }
+
+    String escapedPath = DomainSocket.getEffectivePath(domainSocketPath, 
addr.getPort());
+    PathInfo status = pathMap.get(escapedPath);
+    if (status == null) {
+      PathInfo pathInfo = new PathInfo(escapedPath, PathState.VALID);
+      pathMap.putIfAbsent(escapedPath, pathInfo);
+      return pathInfo;
+    } else {
+      return status;
+    }
+  }
+
+  /**
+   * Create DomainSocket for addr. Caller must make sure that addr is a local 
address.
+   */
+  public DomainSocket createSocket(int readTimeoutMs, int writeTimeoutMs, 
InetSocketAddress addr) throws IOException {
+    if (!isEnabled || !isServiceReady()) {
+      return null;
+    }
+    boolean success = false;
+    DomainSocket sock = null;
+    String escapedPath = null;
+    long startTime = System.nanoTime();
+    try {
+      escapedPath = DomainSocket.getEffectivePath(domainSocketPath, 
addr.getPort());
+      sock = DomainSocket.connect(escapedPath);
+      sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, readTimeoutMs);
+      sock.setAttribute(DomainSocket.SEND_TIMEOUT, writeTimeoutMs);
+      success = true;
+      LOG.info("{} is created within {} ns", sock, System.nanoTime() - 
startTime);
+    } catch (IOException e) {
+      LOG.error("Failed to create DomainSocket", e);
+      throw e;
+    } finally {
+      if (!success) {
+        if (sock != null) {
+          IOUtils.closeQuietly(sock);
+        }
+        if (escapedPath != null) {
+          pathMap.put(escapedPath, PathInfo.DISABLED);
+          LOG.error("{} is disabled for {} ms due to current failure", 
escapedPath, pathExpireMills);
+          schedulePathEnable(escapedPath, pathExpireMills);
+        }
+        sock = null;
+      }
+    }
+    return sock;
+  }
+
+  public void disableShortCircuit() {
+    pathMap.put(domainSocketPath, PathInfo.DISABLED);
+    schedulePathEnable(domainSocketPath, pathExpireMills);
+  }
+
+  private void schedulePathEnable(String path, long delayMills) {
+    timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        pathMap.put(path, PathInfo.VALID);
+      }
+    }, delayMills);
+  }
+
+  @VisibleForTesting
+  public void clearPathMap() {
+    pathMap.clear();
+  }
+
+  public long getPathExpireMills() {
+    return pathExpireMills;
+  }
+
+  @Override
+  public void close() {
+    if (timer != null) {
+      timer.cancel();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to