This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c9ae4ab7ead2 [SPARK-53999][CORE] Native KQueue Transport support on
BSD/MacOS
c9ae4ab7ead2 is described below
commit c9ae4ab7ead2482fe13955fb6d3ba777755a07b6
Author: Kent Yao <[email protected]>
AuthorDate: Thu Oct 23 10:36:52 2025 -0700
[SPARK-53999][CORE] Native KQueue Transport support on BSD/MacOS
### What changes were proposed in this pull request?
This PR adds Native KQUEUE via JNI support for transport, such as shuffle,
file, and rpc procedures
### Why are the changes needed?
Feature parity between Linux and MacOS/BSD platforms
### Does this PR introduce _any_ user-facing change?
Yes, a new option for io.mode
### How was this patch tested?
new unit tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #52703 from yaooqinn/SPARK-53999.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../java/org/apache/spark/network/util/IOMode.java | 15 ++++++++--
.../org/apache/spark/network/util/NettyUtils.java | 6 ++++
.../scala/org/apache/spark/ShuffleNettySuite.scala | 32 ++++++++++++++++++++--
3 files changed, 48 insertions(+), 5 deletions(-)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java
b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java
index 6b208d95bbfb..6ab401b9a0d5 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java
@@ -19,9 +19,18 @@ package org.apache.spark.network.util;
/**
* Selector for which form of low-level IO we should use.
- * NIO is always available, while EPOLL is only available on Linux.
- * AUTO is used to select EPOLL if it's available, or NIO otherwise.
*/
public enum IOMode {
- NIO, EPOLL
+ /**
+ * Java NIO (Selector), cross-platform portable
+ */
+ NIO,
+ /**
+ * Native EPOLL via JNI, Linux only
+ */
+ EPOLL,
+ /**
+ * Native KQUEUE via JNI, MacOS/BSD only
+ */
+ KQUEUE
}
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 2dd1c8f2e4a7..da4b3109bbe1 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -26,6 +26,9 @@ import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueServerSocketChannel;
+import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -68,6 +71,7 @@ public class NettyUtils {
return switch (mode) {
case NIO -> new NioEventLoopGroup(numThreads, threadFactory);
case EPOLL -> new EpollEventLoopGroup(numThreads, threadFactory);
+ case KQUEUE -> new KQueueEventLoopGroup(numThreads, threadFactory);
};
}
@@ -76,6 +80,7 @@ public class NettyUtils {
return switch (mode) {
case NIO -> NioSocketChannel.class;
case EPOLL -> EpollSocketChannel.class;
+ case KQUEUE -> KQueueSocketChannel.class;
};
}
@@ -84,6 +89,7 @@ public class NettyUtils {
return switch (mode) {
case NIO -> NioServerSocketChannel.class;
case EPOLL -> EpollServerSocketChannel.class;
+ case KQUEUE -> KQueueServerSocketChannel.class;
};
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index 378a36184513..18a8453d60be 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -17,14 +17,42 @@
package org.apache.spark
-import org.scalatest.BeforeAndAfterAll
+import org.scalactic.source.Position
+import org.scalatest.{BeforeAndAfterAll, Tag}
-class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
+import org.apache.spark.network.util.IOMode
+import org.apache.spark.util.Utils
+
+abstract class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with Netty shuffle
mode.
+ def ioMode: IOMode = IOMode.NIO
+ def shouldRunTests: Boolean = true
override def beforeAll(): Unit = {
super.beforeAll()
conf.set("spark.shuffle.blockTransferService", "netty")
+ conf.set("spark.shuffle.io.mode", ioMode.toString)
+ }
+
+ override protected def test(testName: String, testTags: Tag*)(testBody: =>
Any)(
+ implicit pos: Position): Unit = {
+ if (!shouldRunTests) {
+ ignore(s"$testName [disabled on ${Utils.osName} with $ioMode]")(testBody)
+ } else {
+ super.test(testName, testTags: _*) {testBody}
+ }
}
}
+
+class ShuffleNettyNioSuite extends ShuffleNettySuite
+
+class ShuffleNettyEpollSuite extends ShuffleNettySuite {
+ override def shouldRunTests: Boolean = Utils.isLinux
+ override def ioMode: IOMode = IOMode.EPOLL
+}
+
+class ShuffleNettyKQueueSuite extends ShuffleNettySuite {
+ override def shouldRunTests: Boolean = Utils.isMac
+ override def ioMode: IOMode = IOMode.KQUEUE
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]