This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f8f0429c706 [SPARK-42096][CONNECT] Some code cleanup for `connect`
module
f8f0429c706 is described below
commit f8f0429c706f8abded9c69a16acc98ce52793c3c
Author: yangjie01 <[email protected]>
AuthorDate: Thu Jan 19 19:46:23 2023 -0800
[SPARK-42096][CONNECT] Some code cleanup for `connect` module
### What changes were proposed in this pull request?
This pr aims do some code cleanup for `connect` module:
- weakens the access scope of functions(or val) that are currently only
used inside the class from `public` to `private` (it can be changed to public
when required)
- remove a redundant collection conversion
- remove a redundant type conversion
- remove a redundant type conversion
### Why are the changes needed?
do some code cleanup for `connect` module.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass Github Actions
Closes #39620 from LuciferYang/code-cleanup-connnect.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/connect/client/SparkConnectClient.scala | 2 +-
.../scala/org/apache/spark/sql/connect/dsl/package.scala | 4 ++--
.../sql/connect/planner/LiteralValueProtoConverter.scala | 2 +-
.../spark/sql/connect/planner/SparkConnectPlanner.scala | 12 ++++++------
.../spark/sql/connect/service/SparkConnectService.scala | 4 ++--
.../sql/connect/service/SparkConnectStreamHandler.scala | 14 +++++++-------
6 files changed, 19 insertions(+), 19 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 87682fdd700..8ad84631531 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -94,7 +94,7 @@ object SparkConnectClient {
this
}
- object URIParams {
+ private object URIParams {
val PARAM_USER_ID = "user_id"
val PARAM_USE_SSL = "use_ssl"
val PARAM_TOKEN = "token"
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 9032ea1dffd..1c98162c76e 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -294,7 +294,7 @@ package object dsl {
proto.NAFill
.newBuilder()
.setInput(logicalPlan)
- .addAllCols(cols.toSeq.asJava)
+ .addAllCols(cols.asJava)
.addAllValues(Seq(toConnectProtoValue(value)).asJava)
.build())
.build()
@@ -615,7 +615,7 @@ package object dsl {
.build()
}
- def createDefaultSortField(col: String): Expression.SortOrder = {
+ private def createDefaultSortField(col: String): Expression.SortOrder = {
Expression.SortOrder
.newBuilder()
.setNullOrdering(Expression.SortOrder.NullOrdering.SORT_NULLS_FIRST)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
index a29a640e7b8..38cce28ad6f 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
@@ -107,7 +107,7 @@ object LiteralValueProtoConverter {
lit.getLiteralTypeCase match {
case proto.Expression.Literal.LiteralTypeCase.STRING => lit.getString
- case _ =>
toCatalystExpression(lit).asInstanceOf[expressions.Literal].value
+ case _ => toCatalystExpression(lit).value
}
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 56956e7fff1..3d63558eb3e 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -58,7 +58,7 @@ final case class InvalidCommandInput(
extends Exception(message, cause)
class SparkConnectPlanner(session: SparkSession) {
- lazy val pythonExec =
+ private lazy val pythonExec =
sys.env.getOrElse("PYSPARK_PYTHON",
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
// The root of the query plan is a relation and we apply the transformations
to it.
@@ -123,7 +123,7 @@ class SparkConnectPlanner(session: SparkSession) {
}
}
- def transformRelationPlugin(extension: ProtoAny): LogicalPlan = {
+ private def transformRelationPlugin(extension: ProtoAny): LogicalPlan = {
SparkConnectPluginRegistry.relationRegistry
// Lazily traverse the collection.
.view
@@ -745,7 +745,7 @@ class SparkConnectPlanner(session: SparkSession) {
}
}
- def transformExpressionPlugin(extension: ProtoAny): Expression = {
+ private def transformExpressionPlugin(extension: ProtoAny): Expression = {
SparkConnectPluginRegistry.expressionRegistry
// Lazily traverse the collection.
.view
@@ -1337,7 +1337,7 @@ class SparkConnectPlanner(session: SparkSession) {
*
* @param cf
*/
- def handleCreateScalarFunction(cf: proto.CreateScalarFunction): Unit = {
+ private def handleCreateScalarFunction(cf: proto.CreateScalarFunction): Unit
= {
val function = SimplePythonFunction(
cf.getSerializedFunction.toByteArray,
Maps.newHashMap(),
@@ -1357,7 +1357,7 @@ class SparkConnectPlanner(session: SparkSession) {
session.udf.registerPython(cf.getPartsList.asScala.head, udf)
}
- def handleCreateViewCommand(createView: proto.CreateDataFrameViewCommand):
Unit = {
+ private def handleCreateViewCommand(createView:
proto.CreateDataFrameViewCommand): Unit = {
val viewType = if (createView.getIsGlobal) GlobalTempView else
LocalTempView
val tableIdentifier =
@@ -1392,7 +1392,7 @@ class SparkConnectPlanner(session: SparkSession) {
*
* @param writeOperation
*/
- def handleWriteOperation(writeOperation: proto.WriteOperation): Unit = {
+ private def handleWriteOperation(writeOperation: proto.WriteOperation): Unit
= {
// Transform the input plan into the logical plan.
val planner = new SparkConnectPlanner(session)
val plan = planner.transformRelation(writeOperation.getInput)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 61f035630f7..9b05d90f1e3 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -222,7 +222,7 @@ object SparkConnectService {
// Type alias for the SessionCacheKey. Right now this is a String but allows
us to switch to a
// different or complex type easily.
- private type SessionCacheKey = (String, String);
+ private type SessionCacheKey = (String, String)
private var server: Server = _
@@ -259,7 +259,7 @@ object SparkConnectService {
/**
* Starts the GRPC Serivce.
*/
- def startGRPCService(): Unit = {
+ private def startGRPCService(): Unit = {
val debugMode =
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
val sb = NettyServerBuilder
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 9c1a8ca4dc4..cff3a451ceb 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -52,14 +52,14 @@ class SparkConnectStreamHandler(responseObserver:
StreamObserver[ExecutePlanResp
}
}
- def handlePlan(session: SparkSession, request: ExecutePlanRequest): Unit = {
+ private def handlePlan(session: SparkSession, request: ExecutePlanRequest):
Unit = {
// Extract the plan from the request and convert it to a logical plan
val planner = new SparkConnectPlanner(session)
val dataframe = Dataset.ofRows(session,
planner.transformRelation(request.getPlan.getRoot))
processAsArrowBatches(request.getClientId, dataframe)
}
- def processAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = {
+ private def processAsArrowBatches(clientId: String, dataframe: DataFrame):
Unit = {
val spark = dataframe.sparkSession
val schema = dataframe.schema
val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
@@ -167,7 +167,7 @@ class SparkConnectStreamHandler(responseObserver:
StreamObserver[ExecutePlanResp
}
}
- def sendMetricsToResponse(clientId: String, rows: DataFrame):
ExecutePlanResponse = {
+ private def sendMetricsToResponse(clientId: String, rows: DataFrame):
ExecutePlanResponse = {
// Send a last batch with the metrics
ExecutePlanResponse
.newBuilder()
@@ -176,7 +176,7 @@ class SparkConnectStreamHandler(responseObserver:
StreamObserver[ExecutePlanResp
.build()
}
- def handleCommand(session: SparkSession, request: ExecutePlanRequest): Unit
= {
+ private def handleCommand(session: SparkSession, request:
ExecutePlanRequest): Unit = {
val command = request.getPlan.getCommand
val planner = new SparkConnectPlanner(session)
planner.process(command)
@@ -209,17 +209,17 @@ object MetricGenerator extends AdaptiveSparkPlanHelper {
b.build()
}
- def transformChildren(p: SparkPlan):
Seq[ExecutePlanResponse.Metrics.MetricObject] = {
+ private def transformChildren(p: SparkPlan):
Seq[ExecutePlanResponse.Metrics.MetricObject] = {
allChildren(p).flatMap(c => transformPlan(c, p.id))
}
- def allChildren(p: SparkPlan): Seq[SparkPlan] = p match {
+ private def allChildren(p: SparkPlan): Seq[SparkPlan] = p match {
case a: AdaptiveSparkPlanExec => Seq(a.executedPlan)
case s: QueryStageExec => Seq(s.plan)
case _ => p.children
}
- def transformPlan(
+ private def transformPlan(
p: SparkPlan,
parentId: Int): Seq[ExecutePlanResponse.Metrics.MetricObject] = {
val mv = p.metrics.map(m =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]