This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f8f0429c706 [SPARK-42096][CONNECT] Some code cleanup for `connect` 
module
f8f0429c706 is described below

commit f8f0429c706f8abded9c69a16acc98ce52793c3c
Author: yangjie01 <[email protected]>
AuthorDate: Thu Jan 19 19:46:23 2023 -0800

    [SPARK-42096][CONNECT] Some code cleanup for `connect` module
    
    ### What changes were proposed in this pull request?
    This pr aims do some code cleanup for `connect` module:
    
    - weakens the access scope of functions(or val) that are currently only 
used inside the class from `public` to `private` (it can be changed to public 
when required)
    - remove a redundant collection conversion
    - remove a redundant type conversion
    - remove a redundant type conversion
    
    ### Why are the changes needed?
    do some code cleanup for `connect` module.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass Github Actions
    
    Closes #39620 from LuciferYang/code-cleanup-connnect.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/connect/client/SparkConnectClient.scala      |  2 +-
 .../scala/org/apache/spark/sql/connect/dsl/package.scala   |  4 ++--
 .../sql/connect/planner/LiteralValueProtoConverter.scala   |  2 +-
 .../spark/sql/connect/planner/SparkConnectPlanner.scala    | 12 ++++++------
 .../spark/sql/connect/service/SparkConnectService.scala    |  4 ++--
 .../sql/connect/service/SparkConnectStreamHandler.scala    | 14 +++++++-------
 6 files changed, 19 insertions(+), 19 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 87682fdd700..8ad84631531 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -94,7 +94,7 @@ object SparkConnectClient {
       this
     }
 
-    object URIParams {
+    private object URIParams {
       val PARAM_USER_ID = "user_id"
       val PARAM_USE_SSL = "use_ssl"
       val PARAM_TOKEN = "token"
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 9032ea1dffd..1c98162c76e 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -294,7 +294,7 @@ package object dsl {
             proto.NAFill
               .newBuilder()
               .setInput(logicalPlan)
-              .addAllCols(cols.toSeq.asJava)
+              .addAllCols(cols.asJava)
               .addAllValues(Seq(toConnectProtoValue(value)).asJava)
               .build())
           .build()
@@ -615,7 +615,7 @@ package object dsl {
           .build()
       }
 
-      def createDefaultSortField(col: String): Expression.SortOrder = {
+      private def createDefaultSortField(col: String): Expression.SortOrder = {
         Expression.SortOrder
           .newBuilder()
           .setNullOrdering(Expression.SortOrder.NullOrdering.SORT_NULLS_FIRST)
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
index a29a640e7b8..38cce28ad6f 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
@@ -107,7 +107,7 @@ object LiteralValueProtoConverter {
     lit.getLiteralTypeCase match {
       case proto.Expression.Literal.LiteralTypeCase.STRING => lit.getString
 
-      case _ => 
toCatalystExpression(lit).asInstanceOf[expressions.Literal].value
+      case _ => toCatalystExpression(lit).value
     }
   }
 
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 56956e7fff1..3d63558eb3e 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -58,7 +58,7 @@ final case class InvalidCommandInput(
     extends Exception(message, cause)
 
 class SparkConnectPlanner(session: SparkSession) {
-  lazy val pythonExec =
+  private lazy val pythonExec =
     sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
 
   // The root of the query plan is a relation and we apply the transformations 
to it.
@@ -123,7 +123,7 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
-  def transformRelationPlugin(extension: ProtoAny): LogicalPlan = {
+  private def transformRelationPlugin(extension: ProtoAny): LogicalPlan = {
     SparkConnectPluginRegistry.relationRegistry
       // Lazily traverse the collection.
       .view
@@ -745,7 +745,7 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
-  def transformExpressionPlugin(extension: ProtoAny): Expression = {
+  private def transformExpressionPlugin(extension: ProtoAny): Expression = {
     SparkConnectPluginRegistry.expressionRegistry
       // Lazily traverse the collection.
       .view
@@ -1337,7 +1337,7 @@ class SparkConnectPlanner(session: SparkSession) {
    *
    * @param cf
    */
-  def handleCreateScalarFunction(cf: proto.CreateScalarFunction): Unit = {
+  private def handleCreateScalarFunction(cf: proto.CreateScalarFunction): Unit 
= {
     val function = SimplePythonFunction(
       cf.getSerializedFunction.toByteArray,
       Maps.newHashMap(),
@@ -1357,7 +1357,7 @@ class SparkConnectPlanner(session: SparkSession) {
     session.udf.registerPython(cf.getPartsList.asScala.head, udf)
   }
 
-  def handleCreateViewCommand(createView: proto.CreateDataFrameViewCommand): 
Unit = {
+  private def handleCreateViewCommand(createView: 
proto.CreateDataFrameViewCommand): Unit = {
     val viewType = if (createView.getIsGlobal) GlobalTempView else 
LocalTempView
 
     val tableIdentifier =
@@ -1392,7 +1392,7 @@ class SparkConnectPlanner(session: SparkSession) {
    *
    * @param writeOperation
    */
-  def handleWriteOperation(writeOperation: proto.WriteOperation): Unit = {
+  private def handleWriteOperation(writeOperation: proto.WriteOperation): Unit 
= {
     // Transform the input plan into the logical plan.
     val planner = new SparkConnectPlanner(session)
     val plan = planner.transformRelation(writeOperation.getInput)
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 61f035630f7..9b05d90f1e3 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -222,7 +222,7 @@ object SparkConnectService {
 
   // Type alias for the SessionCacheKey. Right now this is a String but allows 
us to switch to a
   // different or complex type easily.
-  private type SessionCacheKey = (String, String);
+  private type SessionCacheKey = (String, String)
 
   private var server: Server = _
 
@@ -259,7 +259,7 @@ object SparkConnectService {
   /**
    * Starts the GRPC Serivce.
    */
-  def startGRPCService(): Unit = {
+  private def startGRPCService(): Unit = {
     val debugMode = 
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
     val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
     val sb = NettyServerBuilder
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 9c1a8ca4dc4..cff3a451ceb 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -52,14 +52,14 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[ExecutePlanResp
     }
   }
 
-  def handlePlan(session: SparkSession, request: ExecutePlanRequest): Unit = {
+  private def handlePlan(session: SparkSession, request: ExecutePlanRequest): 
Unit = {
     // Extract the plan from the request and convert it to a logical plan
     val planner = new SparkConnectPlanner(session)
     val dataframe = Dataset.ofRows(session, 
planner.transformRelation(request.getPlan.getRoot))
     processAsArrowBatches(request.getClientId, dataframe)
   }
 
-  def processAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = {
+  private def processAsArrowBatches(clientId: String, dataframe: DataFrame): 
Unit = {
     val spark = dataframe.sparkSession
     val schema = dataframe.schema
     val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
@@ -167,7 +167,7 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[ExecutePlanResp
     }
   }
 
-  def sendMetricsToResponse(clientId: String, rows: DataFrame): 
ExecutePlanResponse = {
+  private def sendMetricsToResponse(clientId: String, rows: DataFrame): 
ExecutePlanResponse = {
     // Send a last batch with the metrics
     ExecutePlanResponse
       .newBuilder()
@@ -176,7 +176,7 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[ExecutePlanResp
       .build()
   }
 
-  def handleCommand(session: SparkSession, request: ExecutePlanRequest): Unit 
= {
+  private def handleCommand(session: SparkSession, request: 
ExecutePlanRequest): Unit = {
     val command = request.getPlan.getCommand
     val planner = new SparkConnectPlanner(session)
     planner.process(command)
@@ -209,17 +209,17 @@ object MetricGenerator extends AdaptiveSparkPlanHelper {
     b.build()
   }
 
-  def transformChildren(p: SparkPlan): 
Seq[ExecutePlanResponse.Metrics.MetricObject] = {
+  private def transformChildren(p: SparkPlan): 
Seq[ExecutePlanResponse.Metrics.MetricObject] = {
     allChildren(p).flatMap(c => transformPlan(c, p.id))
   }
 
-  def allChildren(p: SparkPlan): Seq[SparkPlan] = p match {
+  private def allChildren(p: SparkPlan): Seq[SparkPlan] = p match {
     case a: AdaptiveSparkPlanExec => Seq(a.executedPlan)
     case s: QueryStageExec => Seq(s.plan)
     case _ => p.children
   }
 
-  def transformPlan(
+  private def transformPlan(
       p: SparkPlan,
       parentId: Int): Seq[ExecutePlanResponse.Metrics.MetricObject] = {
     val mv = p.metrics.map(m =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to