This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8c635a0fa558 [SPARK-44259][CONNECT][TESTS] Make `connect-client-jvm`
pass on Java 21 except `RemoteSparkSession`-based tests
8c635a0fa558 is described below
commit 8c635a0fa5584b35d6dd2e5fb774a2a8de7201a2
Author: yangjie01 <[email protected]>
AuthorDate: Fri Jun 30 17:30:20 2023 -0700
[SPARK-44259][CONNECT][TESTS] Make `connect-client-jvm` pass on Java 21
except `RemoteSparkSession`-based tests
### What changes were proposed in this pull request?
This pr ignore all tests inherit `RemoteSparkSession` as default for Java
21 by override the `test` function in `RemoteSparkSession`, they are all
arrow-based tests due to the use of arrow data format for rpc communication in
connect.
```
23/06/30 11:45:41 ERROR SparkConnectService: Error during: execute. UserId:
. SessionId: e7479b73-d02c-47e9-85c8-40b3e9315561.
java.lang.UnsupportedOperationException: sun.misc.Unsafe or
java.nio.DirectByteBuffer.<init>(long, int) not available
at
org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at
org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at
org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at
org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at
org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at
org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:237)
at
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.$anonfun$next$3(ArrowConverters.scala:174)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1487)
at
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.next(ArrowConverters.scala:181)
at
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.next(ArrowConverters.scala:128)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at
org.apache.spark.sql.connect.service.SparkConnectStreamHandler$.processAsArrowBatches(SparkConnectStreamHandler.scala:178)
at
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:104)
at
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1(SparkConnectStreamHandler.scala:86)
at
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1$adapted(SparkConnectStreamHandler.scala:53)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$3(SessionHolder.scala:152)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:857)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:152)
at
org.apache.spark.JobArtifactSet$.withActive(JobArtifactSet.scala:109)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContext$1(SessionHolder.scala:122)
at
org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:209)
at
org.apache.spark.sql.connect.service.SessionHolder.withContext(SessionHolder.scala:121)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:151)
at
org.apache.spark.sql.connect.service.SessionHolder.withSessionBasedPythonPaths(SessionHolder.scala:137)
at
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:150)
at
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:53)
at
org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:166)
at
org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:584)
at
org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at
org.sparkproject.connect.grpc.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:346)
at
org.sparkproject.connect.grpc.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:860)
at
org.sparkproject.connect.grpc.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.sparkproject.connect.grpc.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
```
All ignored test related to https://github.com/apache/arrow/issues/35053,
so we should wait for upgrading to a new arrow version and re-enable them for
Java 21, the following TODO JIRA is created for that.
- Reenable Arrow-based connect tests in Java 21:
https://issues.apache.org/jira/browse/SPARK-44121
### Why are the changes needed?
Make Java 21 daily test can monitor other non-arrow based tests.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass GitHub Actions
- manually tests with Java 21:
```
java -version
openjdk version "21-ea" 2023-09-19
OpenJDK Runtime Environment Zulu21+65-CA (build 21-ea+26)
OpenJDK 64-Bit Server VM Zulu21+65-CA (build 21-ea+26, mixed mode, sharing)
```
```
build/sbt "connect-client-jvm/test" -Phive
```
```
[info] Run completed in 4 seconds, 640 milliseconds.
[info] Total number of tests run: 846
[info] Suites: completed 22, aborted 0
[info] Tests: succeeded 846, failed 0, canceled 167, ignored 1, pending 0
[info] All tests passed.
```
Closes #41805 from LuciferYang/SPARK-44259.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../connect/client/util/RemoteSparkSession.scala | 86 +++++++++++++---------
1 file changed, 52 insertions(+), 34 deletions(-)
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala
index e05828606d09..8d84dffc9d5b 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala
@@ -21,7 +21,9 @@ import java.util.concurrent.TimeUnit
import scala.io.Source
-import org.scalatest.BeforeAndAfterAll
+import org.apache.commons.lang3.{JavaVersion, SystemUtils}
+import org.scalactic.source.Position
+import org.scalatest.{BeforeAndAfterAll, Tag}
import sys.process._
import org.apache.spark.sql.SparkSession
@@ -170,41 +172,44 @@ trait RemoteSparkSession extends ConnectFunSuite with
BeforeAndAfterAll {
protected lazy val serverPort: Int = port
override def beforeAll(): Unit = {
- super.beforeAll()
- SparkConnectServerUtils.start()
- spark = SparkSession
- .builder()
- .client(SparkConnectClient.builder().port(serverPort).build())
- .create()
-
- // Retry and wait for the server to start
- val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min
- var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff
- var success = false
- val error = new RuntimeException(s"Failed to start the test server on port
$serverPort.")
-
- while (!success && System.nanoTime() < stop) {
- try {
- // Run a simple query to verify the server is really up and ready
- val result = spark
- .sql("select val from (values ('Hello'), ('World')) as t(val)")
- .collect()
- assert(result.length == 2)
- success = true
- debug("Spark Connect Server is up.")
- } catch {
- // ignored the error
- case e: Throwable =>
- error.addSuppressed(e)
- Thread.sleep(sleepInternalMs)
- sleepInternalMs *= 2
+ // TODO(SPARK-44121) Remove this check condition
+ if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) {
+ super.beforeAll()
+ SparkConnectServerUtils.start()
+ spark = SparkSession
+ .builder()
+ .client(SparkConnectClient.builder().port(serverPort).build())
+ .create()
+
+ // Retry and wait for the server to start
+ val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min
+ var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff
+ var success = false
+ val error = new RuntimeException(s"Failed to start the test server on
port $serverPort.")
+
+ while (!success && System.nanoTime() < stop) {
+ try {
+ // Run a simple query to verify the server is really up and ready
+ val result = spark
+ .sql("select val from (values ('Hello'), ('World')) as t(val)")
+ .collect()
+ assert(result.length == 2)
+ success = true
+ debug("Spark Connect Server is up.")
+ } catch {
+ // ignored the error
+ case e: Throwable =>
+ error.addSuppressed(e)
+ Thread.sleep(sleepInternalMs)
+ sleepInternalMs *= 2
+ }
}
- }
- // Throw error if failed
- if (!success) {
- debug(error)
- throw error
+ // Throw error if failed
+ if (!success) {
+ debug(error)
+ throw error
+ }
}
}
@@ -217,4 +222,17 @@ trait RemoteSparkSession extends ConnectFunSuite with
BeforeAndAfterAll {
spark = null
super.afterAll()
}
+
+ /**
+ * SPARK-44259: override test function to skip `RemoteSparkSession-based`
tests as default, we
+ * should delete this function after SPARK-44121 is completed.
+ */
+ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
+ pos: Position): Unit = {
+ super.test(testName, testTags: _*) {
+ // TODO(SPARK-44121) Re-enable Arrow-based connect tests in Java 21
+ assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17))
+ testFun
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]