This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a5a8431f740 [SPARK-54137][SQL][CONNECT] Remove redundant 
observed-metrics responses
0a5a8431f740 is described below

commit 0a5a8431f740ab1c06b9da6527da5efdb6c02fe2
Author: Yihong He <[email protected]>
AuthorDate: Tue Dec 23 08:41:04 2025 +0900

    [SPARK-54137][SQL][CONNECT] Remove redundant observed-metrics responses
    
    ### What changes were proposed in this pull request?
    
    This PR removes the observed metrics response generation from 
SparkConnectPlanExecution in Spark Connect server since [creating the observed 
metrics response in 
ExecuteThreadRunner](https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L232-L240)
 is sufficient.
    
    **Changes**:
    - Removed the call to createObservedMetricsResponse() in the main query 
execution flow after generating metrics response
    - Removed the private method createObservedMetricsResponse() that was 
responsible for creating observed metrics responses from query execution
    
    ### Why are the changes needed?
    
    The createObservedMetricsResponse() method is being removed because:
    1. **The response is always empty**: In Spark Connect, [all observations 
must be registered first using 
executeHolder.observations](https://github.com/apache/spark/blob/ac5fce4e8bb43ab3ecd7cee8abd19c8bc80b156f/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L244-L255),
 and the method specifically filters out already-registered observations 
(!executeHolder.observations.contains(name)). This means the collected observed 
metrics will alway [...]
    2. **Misleading coverage**: The method only handles plans in the query 
execution path but doesn't cover commands, which can also produce observed 
metrics. This incomplete coverage could be misleading to users expecting 
observed metrics from commands. Removing this code simplifies the execution 
flow and eliminates potential confusion about observed metrics handling.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Since the method always returns an empty response due to the 
observation registration requirement, removing it has no functional impact on 
users.
    
    ### How was this patch tested?
    
    `build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor 2.1.47
    
    Closes #53445 from heyihong/SPARK-54137.
    
    Authored-by: Yihong He <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../execution/SparkConnectPlanExecution.scala      | 24 ----------------------
 1 file changed, 24 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
index 74a55c698636..7dad774eed0f 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala
@@ -87,10 +87,6 @@ private[execution] class 
SparkConnectPlanExecution(executeHolder: ExecuteHolder)
         responseObserver.onNext(createSchemaResponse(request.getSessionId, 
dataframe.schema))
         processAsArrowBatches(dataframe, responseObserver, executeHolder)
         
responseObserver.onNext(MetricGenerator.createMetricsResponse(sessionHolder, 
dataframe))
-        createObservedMetricsResponse(
-          request.getSessionId,
-          executeHolder.allObservationAndPlanIds,
-          dataframe).foreach(responseObserver.onNext)
       case proto.Plan.OpTypeCase.COMMAND =>
         val command = request.getPlan.getCommand
         planner.transformCommand(command) match {
@@ -326,26 +322,6 @@ private[execution] class 
SparkConnectPlanExecution(executeHolder: ExecuteHolder)
       .setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
       .build()
   }
-
-  private def createObservedMetricsResponse(
-      sessionId: String,
-      observationAndPlanIds: Map[String, Long],
-      dataframe: DataFrame): Option[ExecutePlanResponse] = {
-    val observedMetrics = dataframe.queryExecution.observedMetrics.collect {
-      case (name, row) if !executeHolder.observations.contains(name) =>
-        val values = SparkConnectPlanExecution.toObservedMetricsValues(row)
-        name -> values
-    }
-    if (observedMetrics.nonEmpty) {
-      Some(
-        SparkConnectPlanExecution
-          .createObservedMetricsResponse(
-            sessionId,
-            sessionHolder.serverSessionId,
-            observationAndPlanIds,
-            observedMetrics))
-    } else None
-  }
 }
 
 object SparkConnectPlanExecution {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to