This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c93d40d2f0e [SPARK-44594][SS] Remove redundant method parameter in 
kafka connector
c93d40d2f0e is described below

commit c93d40d2f0e9ad55286ffecc1d678d1aa86ac4b6
Author: zhaomin <[email protected]>
AuthorDate: Mon Oct 16 07:51:29 2023 +0800

    [SPARK-44594][SS] Remove redundant method parameter in kafka connector
    
    ### What changes were proposed in this pull request?
    
    There are have redundant parameters in 
org.apache.spark.sql.kafka010.KafkaWriter#validateQuery and 
org.apache.spark.sql.kafka010.KafkaWriter#write, can remove them.
    ### Why are the changes needed?
    
    They are not used, remove them to make the code more concise.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Existing can test it.
    
    Closes #42198 from zhaomin1423/fix_kafka.
    
    Authored-by: zhaomin <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 .../main/scala/org/apache/spark/sql/kafka010/KafkaBatchWrite.scala | 2 +-
 .../src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala   | 7 +++----
 .../scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala  | 5 ++---
 .../scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala  | 2 +-
 .../src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala | 6 ++----
 5 files changed, 9 insertions(+), 13 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchWrite.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchWrite.scala
index 002da3c5132..03612bf40c0 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchWrite.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchWrite.scala
@@ -39,7 +39,7 @@ private[kafka010] class KafkaBatchWrite(
     schema: StructType)
   extends BatchWrite {
 
-  validateQuery(DataTypeUtils.toAttributes(schema), producerParams, topic)
+  validateQuery(DataTypeUtils.toAttributes(schema), topic)
 
   override def createBatchWriterFactory(info: PhysicalWriteInfo): 
KafkaBatchWriterFactory =
     KafkaBatchWriterFactory(topic, producerParams, schema)
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
index 08914d82fff..6ab4e91c53b 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
@@ -20,11 +20,10 @@ package org.apache.spark.sql.kafka010
 import java.{util => ju}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.streaming.Sink
 
 private[kafka010] class KafkaSink(
-    sqlContext: SQLContext,
     executorKafkaParams: ju.Map[String, Object],
     topic: Option[String]) extends Sink with Logging {
   @volatile private var latestBatchId = -1L
@@ -35,8 +34,8 @@ private[kafka010] class KafkaSink(
     if (batchId <= latestBatchId) {
       logInfo(s"Skipping already committed batch $batchId")
     } else {
-      KafkaWriter.write(sqlContext.sparkSession,
-        data.queryExecution, executorKafkaParams, topic)
+      KafkaWriter.write(data.queryExecution,
+        executorKafkaParams, topic)
       latestBatchId = batchId
     }
   }
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 27ebfaad617..9da1fe0280e 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -161,7 +161,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     val caseInsensitiveParameters = CaseInsensitiveMap(parameters)
     val defaultTopic = 
caseInsensitiveParameters.get(TOPIC_OPTION_KEY).map(_.trim)
     val specifiedKafkaParams = 
kafkaParamsForProducer(caseInsensitiveParameters)
-    new KafkaSink(sqlContext, specifiedKafkaParams, defaultTopic)
+    new KafkaSink(specifiedKafkaParams, defaultTopic)
   }
 
   override def createRelation(
@@ -179,8 +179,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     val caseInsensitiveParameters = CaseInsensitiveMap(parameters)
     val topic = caseInsensitiveParameters.get(TOPIC_OPTION_KEY).map(_.trim)
     val specifiedKafkaParams = 
kafkaParamsForProducer(caseInsensitiveParameters)
-    KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution, 
specifiedKafkaParams,
-      topic)
+    KafkaWriter.write(data.queryExecution, specifiedKafkaParams, topic)
 
     /* This method is suppose to return a relation that reads the data that 
was written.
      * We cannot support this for Kafka. Therefore, in order to make things 
consistent,
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
index 1fdf1b9293d..da8c9c33bb1 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
@@ -40,7 +40,7 @@ private[kafka010] class KafkaStreamingWrite(
     schema: StructType)
   extends StreamingWrite {
 
-  validateQuery(DataTypeUtils.toAttributes(schema), producerParams, topic)
+  validateQuery(DataTypeUtils.toAttributes(schema), topic)
 
   override def createStreamingWriterFactory(
       info: PhysicalWriteInfo): KafkaStreamWriterFactory =
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
index 92c51416f48..d1c4386e486 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
 import java.{util => ju}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -48,7 +48,6 @@ private[kafka010] object KafkaWriter extends Logging {
 
   def validateQuery(
       schema: Seq[Attribute],
-      kafkaParameters: ju.Map[String, Object],
       topic: Option[String] = None): Unit = {
     try {
       topicExpression(schema, topic)
@@ -62,12 +61,11 @@ private[kafka010] object KafkaWriter extends Logging {
   }
 
   def write(
-      sparkSession: SparkSession,
       queryExecution: QueryExecution,
       kafkaParameters: ju.Map[String, Object],
       topic: Option[String] = None): Unit = {
     val schema = queryExecution.analyzed.output
-    validateQuery(schema, kafkaParameters, topic)
+    validateQuery(schema, topic)
     queryExecution.toRdd.foreachPartition { iter =>
       val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
       Utils.tryWithSafeFinally(block = writeTask.execute(iter))(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to