This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0a5a8431f740 [SPARK-54137][SQL][CONNECT] Remove redundant
observed-metrics responses
0a5a8431f740 is described below
commit 0a5a8431f740ab1c06b9da6527da5efdb6c02fe2
Author: Yihong He <[email protected]>
AuthorDate: Tue Dec 23 08:41:04 2025 +0900
[SPARK-54137][SQL][CONNECT] Remove redundant observed-metrics responses
### What changes were proposed in this pull request?
This PR removes the observed metrics response generation from
SparkConnectPlanExecution in Spark Connect server since [creating the observed
metrics response in
ExecuteThreadRunner](https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L232-L240)
is sufficient.
**Changes**:
- Removed the call to createObservedMetricsResponse() in the main query
execution flow after generating metrics response
- Removed the private method createObservedMetricsResponse() that was
responsible for creating observed metrics responses from query execution
### Why are the changes needed?
The createObservedMetricsResponse() method is being removed because:
1. **The response is always empty**: In Spark Connect, [all observations
must be registered first using
executeHolder.observations](https://github.com/apache/spark/blob/ac5fce4e8bb43ab3ecd7cee8abd19c8bc80b156f/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L244-L255),
and the method specifically filters out already-registered observations
(!executeHolder.observations.contains(name)). This means the collected observed
metrics will alway [...]
2. **Misleading coverage**: The method only handles plans in the query
execution path but doesn't cover commands, which can also produce observed
metrics. This incomplete coverage could be misleading to users expecting
observed metrics from commands. Removing this code simplifies the execution
flow and eliminates potential confusion about observed metrics handling.
### Does this PR introduce _any_ user-facing change?
No. Since the method always returns an empty response due to the
observation registration requirement, removing it has no functional impact on
users.
### How was this patch tested?
`build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite`
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor 2.1.47
Closes #53445 from heyihong/SPARK-54137.
Authored-by: Yihong He <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../execution/SparkConnectPlanExecution.scala | 24 ----------------------
1 file changed, 24 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
index 74a55c698636..7dad774eed0f 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
@@ -87,10 +87,6 @@ private[execution] class
SparkConnectPlanExecution(executeHolder: ExecuteHolder)
responseObserver.onNext(createSchemaResponse(request.getSessionId,
dataframe.schema))
processAsArrowBatches(dataframe, responseObserver, executeHolder)
responseObserver.onNext(MetricGenerator.createMetricsResponse(sessionHolder,
dataframe))
- createObservedMetricsResponse(
- request.getSessionId,
- executeHolder.allObservationAndPlanIds,
- dataframe).foreach(responseObserver.onNext)
case proto.Plan.OpTypeCase.COMMAND =>
val command = request.getPlan.getCommand
planner.transformCommand(command) match {
@@ -326,26 +322,6 @@ private[execution] class
SparkConnectPlanExecution(executeHolder: ExecuteHolder)
.setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
.build()
}
-
- private def createObservedMetricsResponse(
- sessionId: String,
- observationAndPlanIds: Map[String, Long],
- dataframe: DataFrame): Option[ExecutePlanResponse] = {
- val observedMetrics = dataframe.queryExecution.observedMetrics.collect {
- case (name, row) if !executeHolder.observations.contains(name) =>
- val values = SparkConnectPlanExecution.toObservedMetricsValues(row)
- name -> values
- }
- if (observedMetrics.nonEmpty) {
- Some(
- SparkConnectPlanExecution
- .createObservedMetricsResponse(
- sessionId,
- sessionHolder.serverSessionId,
- observationAndPlanIds,
- observedMetrics))
- } else None
- }
}
object SparkConnectPlanExecution {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]