This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9269a0bfed56 [SPARK-49525][SS][CONNECT] Minor log improvement to 
Server Side Streaming Query ListenerBus Listener
9269a0bfed56 is described below

commit 9269a0bfed56429e999269dfdfd89aefcb1b7261
Author: Wei Liu <[email protected]>
AuthorDate: Fri Sep 6 15:10:41 2024 +0900

    [SPARK-49525][SS][CONNECT] Minor log improvement to Server Side Streaming 
Query ListenerBus Listener
    
    ### What changes were proposed in this pull request?
    
    Change the log of onQueryStarted and onQueryTerminated from `logDebug` to 
`logInfo`. They would be useful for debugging as they indicate the events are 
indeed being fired from the server. It won't add more logging burden as there 
won't be so many queries start / end, at least much less than `onQueryProgress`.
    
    ### Why are the changes needed?
    
    Debug improvement
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    No need
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48002 from WweiL/SPARK-49525-listener-bus-improvement.
    
    Authored-by: Wei Liu <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../spark/sql/connect/planner/SparkConnectPlanner.scala     | 13 ++++++++-----
 .../connect/service/SparkConnectListenerBusListener.scala   |  8 +++++---
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b6abab6ef766..bb6d52308c19 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult
 import org.apache.spark.connect.proto.Parse.ParseFormat
 import 
org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance
 import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.{DATAFRAME_ID, SESSION_ID}
 import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, 
TaskResourceProfile, TaskResourceRequest}
 import org.apache.spark.sql.{Dataset, Encoders, ForeachWriter, Observation, 
RelationalGroupedDataset, Row, SparkSession}
@@ -3052,10 +3052,13 @@ class SparkConnectPlanner(
       
sessionHolder.streamingServersideListenerHolder.streamingQueryStartedEventCache.remove(
         query.runId.toString))
     queryStartedEvent.foreach {
-      logDebug(
-        s"[SessionId: $sessionId][UserId: $userId][operationId: " +
-          s"${executeHolder.operationId}][query id: ${query.id}][query runId: 
${query.runId}] " +
-          s"Adding QueryStartedEvent to response")
+      logInfo(
+        log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
+          log"[UserId: ${MDC(LogKeys.USER_ID, userId)}] " +
+          log"[operationId: ${MDC(LogKeys.OPERATION_ID, 
executeHolder.operationId)}] " +
+          log"[query id: ${MDC(LogKeys.QUERY_ID, query.id)}]" +
+          log"[query runId: ${MDC(LogKeys.QUERY_RUN_ID, query.runId)}] " +
+          log"Adding QueryStartedEvent to response")
       e => resultBuilder.setQueryStartedEventJson(e.json)
     }
 
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala
index 5b2205757648..7a0c067ab430 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala
@@ -160,9 +160,11 @@ private[sql] class SparkConnectListenerBusListener(
   }
 
   override def onQueryTerminated(event: 
StreamingQueryListener.QueryTerminatedEvent): Unit = {
-    logDebug(
-      s"[SessionId: ${sessionHolder.sessionId}][UserId: 
${sessionHolder.userId}] " +
-        s"Sending QueryTerminatedEvent to client, id: ${event.id} runId: 
${event.runId}.")
+    logInfo(
+      log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" +
+        log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " +
+        log"Sending QueryTerminatedEvent to client, id: 
${MDC(LogKeys.QUERY_ID, event.id)} " +
+        log"runId: ${MDC(LogKeys.QUERY_RUN_ID, event.runId)}.")
     send(event.json, StreamingQueryEventType.QUERY_TERMINATED_EVENT)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to