This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c93d40d2f0e [SPARK-44594][SS] Remove redundant method parameter in
kafka connector
c93d40d2f0e is described below
commit c93d40d2f0e9ad55286ffecc1d678d1aa86ac4b6
Author: zhaomin <[email protected]>
AuthorDate: Mon Oct 16 07:51:29 2023 +0800
[SPARK-44594][SS] Remove redundant method parameter in kafka connector
### What changes were proposed in this pull request?
There are have redundant parameters in
org.apache.spark.sql.kafka010.KafkaWriter#validateQuery and
org.apache.spark.sql.kafka010.KafkaWriter#write, can remove them.
### Why are the changes needed?
They are not used, remove them to make the code more concise.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Existing can test it.
Closes #42198 from zhaomin1423/fix_kafka.
Authored-by: zhaomin <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../main/scala/org/apache/spark/sql/kafka010/KafkaBatchWrite.scala | 2 +-
.../src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala | 7 +++----
.../scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala | 5 ++---
.../scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala | 2 +-
.../src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala | 6 ++----
5 files changed, 9 insertions(+), 13 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchWrite.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchWrite.scala
index 002da3c5132..03612bf40c0 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchWrite.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchWrite.scala
@@ -39,7 +39,7 @@ private[kafka010] class KafkaBatchWrite(
schema: StructType)
extends BatchWrite {
- validateQuery(DataTypeUtils.toAttributes(schema), producerParams, topic)
+ validateQuery(DataTypeUtils.toAttributes(schema), topic)
override def createBatchWriterFactory(info: PhysicalWriteInfo):
KafkaBatchWriterFactory =
KafkaBatchWriterFactory(topic, producerParams, schema)
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
index 08914d82fff..6ab4e91c53b 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala
@@ -20,11 +20,10 @@ package org.apache.spark.sql.kafka010
import java.{util => ju}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming.Sink
private[kafka010] class KafkaSink(
- sqlContext: SQLContext,
executorKafkaParams: ju.Map[String, Object],
topic: Option[String]) extends Sink with Logging {
@volatile private var latestBatchId = -1L
@@ -35,8 +34,8 @@ private[kafka010] class KafkaSink(
if (batchId <= latestBatchId) {
logInfo(s"Skipping already committed batch $batchId")
} else {
- KafkaWriter.write(sqlContext.sparkSession,
- data.queryExecution, executorKafkaParams, topic)
+ KafkaWriter.write(data.queryExecution,
+ executorKafkaParams, topic)
latestBatchId = batchId
}
}
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 27ebfaad617..9da1fe0280e 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -161,7 +161,7 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
val caseInsensitiveParameters = CaseInsensitiveMap(parameters)
val defaultTopic =
caseInsensitiveParameters.get(TOPIC_OPTION_KEY).map(_.trim)
val specifiedKafkaParams =
kafkaParamsForProducer(caseInsensitiveParameters)
- new KafkaSink(sqlContext, specifiedKafkaParams, defaultTopic)
+ new KafkaSink(specifiedKafkaParams, defaultTopic)
}
override def createRelation(
@@ -179,8 +179,7 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
val caseInsensitiveParameters = CaseInsensitiveMap(parameters)
val topic = caseInsensitiveParameters.get(TOPIC_OPTION_KEY).map(_.trim)
val specifiedKafkaParams =
kafkaParamsForProducer(caseInsensitiveParameters)
- KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution,
specifiedKafkaParams,
- topic)
+ KafkaWriter.write(data.queryExecution, specifiedKafkaParams, topic)
/* This method is suppose to return a relation that reads the data that
was written.
* We cannot support this for Kafka. Therefore, in order to make things
consistent,
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
index 1fdf1b9293d..da8c9c33bb1 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
@@ -40,7 +40,7 @@ private[kafka010] class KafkaStreamingWrite(
schema: StructType)
extends StreamingWrite {
- validateQuery(DataTypeUtils.toAttributes(schema), producerParams, topic)
+ validateQuery(DataTypeUtils.toAttributes(schema), topic)
override def createStreamingWriterFactory(
info: PhysicalWriteInfo): KafkaStreamWriterFactory =
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
index 92c51416f48..d1c4386e486 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import java.{util => ju}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -48,7 +48,6 @@ private[kafka010] object KafkaWriter extends Logging {
def validateQuery(
schema: Seq[Attribute],
- kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
try {
topicExpression(schema, topic)
@@ -62,12 +61,11 @@ private[kafka010] object KafkaWriter extends Logging {
}
def write(
- sparkSession: SparkSession,
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.analyzed.output
- validateQuery(schema, kafkaParameters, topic)
+ validateQuery(schema, topic)
queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]