This is an automated email from the ASF dual-hosted git repository.
sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1692b553fb94 [SPARK-53728][SDP] Print PipelineEvent Message with Error
In Test
1692b553fb94 is described below
commit 1692b553fb94d2e63c014b5a8cda841c22e1c4d3
Author: Jacky Wang <[email protected]>
AuthorDate: Mon Sep 29 07:41:01 2025 -0700
[SPARK-53728][SDP] Print PipelineEvent Message with Error In Test
### What changes were proposed in this pull request?
Pulling some changes from #51644
For debugging purposes, print PipelineEvent with error to the console. Most
tests expects pipeline to succeed, this makes it easier to see the error when
it happens. Otherwise, developers have to manually add println to print the
error message contained in the event, which is cumbersome.
### Why are the changes needed?
Easier to debug test failures, avoid the need to manually add println for
test failures.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52463 from JiaqiWang18/SPARK-53728-print-event-with-error-in-test.
Authored-by: Jacky Wang <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
---
.../connect/pipelines/PipelineEventSender.scala | 14 +------------
.../sql/pipelines/logging/PipelineEvent.scala | 18 ++++++++++++++++-
.../spark/sql/pipelines/utils/ExecutionTest.scala | 23 ++++++++++++++++++++--
3 files changed, 39 insertions(+), 16 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
index 4f263ebe372c..5ea8c6f70312 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
@@ -158,18 +158,6 @@ class PipelineEventSender(
}
private def constructProtoEvent(event: PipelineEvent): proto.PipelineEvent =
{
- val message = if (event.error.nonEmpty) {
- // Returns the message associated with a Throwable and all its causes
- def getExceptionMessages(throwable: Throwable): Seq[String] = {
- throwable.getMessage +:
- Option(throwable.getCause).map(getExceptionMessages).getOrElse(Nil)
- }
- val errorMessages = getExceptionMessages(event.error.get)
- s"""${event.message}
- |Error: ${errorMessages.mkString("\n")}""".stripMargin
- } else {
- event.message
- }
val protoEventBuilder = proto.PipelineEvent
.newBuilder()
.setTimestamp(
@@ -182,7 +170,7 @@ class PipelineEventSender(
.setSeconds(event.timestamp.getTime / 1000)
.setNanos(event.timestamp.getNanos)
.build())
- .setMessage(message)
+ .setMessage(event.messageWithError)
protoEventBuilder.build()
}
}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineEvent.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineEvent.scala
index 697f225df1f3..0a10ebaa69f7 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineEvent.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineEvent.scala
@@ -39,7 +39,23 @@ case class PipelineEvent(
message: String,
details: EventDetails,
error: Option[Throwable]
-)
+) {
+ /** Combines the message and error (if any) into a single string */
+ def messageWithError: String = {
+ if (error.nonEmpty) {
+ // Returns the message associated with a Throwable and all its causes
+ def getExceptionMessages(throwable: Throwable): Seq[String] = {
+ throwable.getMessage +:
+ Option(throwable.getCause).map(getExceptionMessages).getOrElse(Nil)
+ }
+ val errorMessages = getExceptionMessages(error.get)
+ s"""${message}
+ |Error: ${errorMessages.mkString("\n")}""".stripMargin
+ } else {
+ message
+ }
+ }
+}
/**
* Describes where the event originated from
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala
index 991a47d6b562..6c2c07e57498 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.pipelines.utils
+import org.scalatest.Assertions.fail
+
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.pipelines.common.{FlowStatus, RunState}
@@ -51,17 +53,34 @@ trait TestPipelineUpdateContextMixin {
* @param fullRefreshTables Set of tables to be fully refreshed.
* @param refreshTables Set of tables to be refreshed.
* @param resetCheckpointFlows Set of flows to be reset.
+ * @param failOnErrorEvent Whether to fail test when receiving event with
error.
*/
case class TestPipelineUpdateContext(
spark: SparkSession,
unresolvedGraph: DataflowGraph,
fullRefreshTables: TableFilter = NoTables,
refreshTables: TableFilter = AllTables,
- resetCheckpointFlows: FlowFilter = AllFlows
+ resetCheckpointFlows: FlowFilter = AllFlows,
+ failOnErrorEvent: Boolean = false
) extends PipelineUpdateContext {
val eventBuffer = new PipelineRunEventBuffer()
- override val eventCallback: PipelineEvent => Unit = eventBuffer.addEvent
+ override val eventCallback: PipelineEvent => Unit = { event =>
+ eventBuffer.addEvent(event)
+ // For debugging purposes, print the event to the console.
+ // Most tests expects pipeline to succeed, this makes it easier to see
+ // the error when it happens.
+ if (event.error.nonEmpty) {
+ // scalastyle:off println
+ println("\n=== Received Pipeline Event with Error ===")
+ println(event.messageWithError)
+ println("=================================")
+ // scalastyle:on println
+ if (failOnErrorEvent) {
+ fail(s"Pipeline event with error received:
${event.messageWithError}")
+ }
+ }
+ }
override def flowProgressEventLogger: FlowProgressEventLogger = {
new FlowProgressEventLogger(eventCallback = eventCallback)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]