This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new edd21069727 [SPARK-43923][CONNECT][FOLLOW-UP] Propagate extra tags to
SparkListenerConnectOperationFinished
edd21069727 is described below
commit edd210697272c03e3d97a6443a65d0a130353c05
Author: Martin Grund <[email protected]>
AuthorDate: Wed Aug 30 17:35:09 2023 +0200
[SPARK-43923][CONNECT][FOLLOW-UP] Propagate extra tags to
SparkListenerConnectOperationFinished
### What changes were proposed in this pull request?
The `SparkListenerConnectOperationFinished` message supports passing extra
tags, but the event method did not support them yet. This patch propagates the
extra tags and adds a test for it.
### Why are the changes needed?
Compatibility with the message interface.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #42732 from grundprinzip/SPARK-43923.
Authored-by: Martin Grund <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../spark/sql/connect/service/ExecuteEventsManager.scala | 7 +++++--
.../sql/connect/service/ExecuteEventsManagerSuite.scala | 13 +++++++++++++
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
index 23a67b7292b..9e8a945bcc3 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
@@ -205,7 +205,9 @@ case class ExecuteEventsManager(executeHolder:
ExecuteHolder, clock: Clock) {
* Number of rows that are returned to the user. None is expected when the
operation does not
* return any rows.
*/
- def postFinished(producedRowsCountOpt: Option[Long] = None): Unit = {
+ def postFinished(
+ producedRowsCountOpt: Option[Long] = None,
+ extraTags: Map[String, String] = Map.empty): Unit = {
assertStatus(
List(ExecuteStatus.Started, ExecuteStatus.ReadyForExecution),
ExecuteStatus.Finished)
@@ -217,7 +219,8 @@ case class ExecuteEventsManager(executeHolder:
ExecuteHolder, clock: Clock) {
jobTag,
operationId,
clock.getTimeMillis(),
- producedRowCount))
+ producedRowCount,
+ extraTags))
}
/**
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
index 12e67f2c59c..dbe8420eab0 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala
@@ -184,6 +184,19 @@ class ExecuteEventsManagerSuite
Some(100)))
}
+ test("SPARK-43923: post finished with extra tags") {
+ val events = setupEvents(ExecuteStatus.Started)
+ events.postFinished(Some(100), Map("someEvent" -> "true"))
+
verify(events.executeHolder.sessionHolder.session.sparkContext.listenerBus,
times(1))
+ .post(
+ SparkListenerConnectOperationFinished(
+ events.executeHolder.jobTag,
+ DEFAULT_QUERY_ID,
+ DEFAULT_CLOCK.getTimeMillis(),
+ Some(100),
+ Map("someEvent" -> "true")))
+ }
+
test("SPARK-43923: post closed") {
val events = setupEvents(ExecuteStatus.Finished)
events.postClosed()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]