This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a7b864fcdc7e [SPARK-51440][SS] classify the NPE when null topic field 
value is in kafka message data and there is no topic option
a7b864fcdc7e is described below

commit a7b864fcdc7e2fbf872675cd3829389ce3130a26
Author: huanliwang-db <huanli.w...@databricks.com>
AuthorDate: Tue Mar 11 16:01:17 2025 +0900

    [SPARK-51440][SS] classify the NPE when null topic field value is in kafka 
message data and there is no topic option
    
    ### What changes were proposed in this pull request?
    
    We are throwing out the NPE now when null topic field value is in kafka 
message data and there is no topic option. Introduce a new kafka error and 
throw out this classified exception instead.
    
    ### Why are the changes needed?
    
    Error classification for better user experience
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, error classification
    
    ### How was this patch tested?
    
    modify the existing unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    N/A
    
    Closes #50214 from huanliwang-db/kafka-error.
    
    Authored-by: huanliwang-db <huanli.w...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/kafka-error-conditions.json         | 5 +++++
 .../scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala    | 6 ++++++
 .../scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala     | 4 ++--
 .../scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala     | 9 ++++++---
 4 files changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json 
b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json
index 42905c06ca66..d6a7aa19d030 100644
--- 
a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json
+++ 
b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json
@@ -37,6 +37,11 @@
       "Specified: <specifiedPartitions> Assigned: <assignedPartitions>"
     ]
   },
+  "KAFKA_NULL_TOPIC_IN_DATA": {
+    "message" : [
+      "The Kafka message data sent to the producer contains a null topic. Use 
the `topic` option for setting a default topic."
+    ]
+  },
   "KAFKA_DATA_LOSS" : {
     "message" : [
       "Some data may have been lost because they are not available in Kafka 
any more;",
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
index a31fc56bf892..a6eb13e68c19 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
@@ -177,6 +177,12 @@ object KafkaExceptions {
         "specifiedPartitions" -> specifiedPartitions.toString,
         "assignedPartitions" -> assignedPartitions.toString))
   }
+
+  def nullTopicInData(): KafkaIllegalStateException = {
+    new KafkaIllegalStateException(
+      errorClass = "KAFKA_NULL_TOPIC_IN_DATA",
+      messageParameters = Map.empty)
+  }
 }
 
 /**
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
index e8f98262a897..83663386856d 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.header.internals.RecordHeader
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
UnsafeProjection}
+import org.apache.spark.sql.kafka010.KafkaExceptions.nullTopicInData
 import org.apache.spark.sql.kafka010.producer.{CachedKafkaProducer, 
InternalKafkaProducerPool}
 import org.apache.spark.sql.types.BinaryType
 
@@ -95,8 +96,7 @@ private[kafka010] abstract class KafkaRowWriter(
     val key = projectedRow.getBinary(1)
     val value = projectedRow.getBinary(2)
     if (topic == null) {
-      throw new NullPointerException(s"null topic present in the data. Use the 
" +
-        s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default 
topic.")
+      throw nullTopicInData()
     }
     val partition: Integer =
       if (projectedRow.isNullAt(4)) null else projectedRow.getInt(4)
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 5566785c4d56..82edba59995e 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.Cluster
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkConf, SparkContext, SparkException, TestUtils}
+import org.apache.spark.{SparkConf, SparkContext, TestUtils}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
 import org.apache.spark.sql.execution.streaming.{MemoryStream, 
MemoryStreamBase}
@@ -491,14 +491,17 @@ abstract class KafkaSinkBatchSuiteBase extends 
KafkaSinkSuiteBase {
 
   test("batch - null topic field value, and no topic option") {
     val df = Seq[(String, String)](null.asInstanceOf[String] -> 
"1").toDF("topic", "value")
-    val ex = intercept[SparkException] {
+    val ex = intercept[KafkaIllegalStateException] {
       df.write
         .format("kafka")
         .option("kafka.bootstrap.servers", testUtils.brokerAddress)
         .mode("append")
         .save()
     }
-    TestUtils.assertExceptionMsg(ex, "null topic present in the data")
+    checkError(
+      exception = ex,
+      condition = "KAFKA_NULL_TOPIC_IN_DATA"
+    )
   }
 
   protected def testUnsupportedSaveModes(msg: (SaveMode) => Seq[String]): Unit 
= {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to