This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2b5cd7dc59e8 [SPARK-52096][SQL][SS] Reclassify kafka source offset assertion error 2b5cd7dc59e8 is described below commit 2b5cd7dc59e852e9b60fdd896ccfbcfb275b7071 Author: Yuchen Liu <170372783+eason-yuchen-...@users.noreply.github.com> AuthorDate: Thu May 15 22:26:27 2025 +0900 [SPARK-52096][SQL][SS] Reclassify kafka source offset assertion error ### What changes were proposed in this pull request? This PR classifies the assertion error saying that the start offset of Kafka source is greater than end offset to either user error or Kafka internal error. There are different possible cases that can lead to this error. Here is a list in the time order that we can catch them: 1. If the user specifies startingOffset and endingOffset in a comparable way (either by index or timestamp), we can compare them before starting the query. 2. If the user provides startingOffset and endingOffset but they are incomparable initially, we should compare them as soon as they are resolved by contacting the kafka brokers. 3. If the user sets startingOffset to some value and endingOffset to latest, we should compare them after the query has started and the latest offset has been fetched. 4. It is a Kafka topic-partition is broken. ``` java.lang.AssertionError: assertion failed: Beginning offset 49041276637 is after the ending offset 48774253518 for topic xxx partition 1. You either provided an invalid fromOffset, or the Kafka topic has been damaged at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:79) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406) ``` ### Why are the changes needed? Most of the time this assertion error is fired due to user errors, and we have the information to make it explicit. This PR tries to utilize all the information to separate user errors from system errors. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Creates some unit tests in `KafkaRelationSuiteBase`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50866 from eason-yuchen-liu/kafkaOffsetErrorMsg. Authored-by: Yuchen Liu <170372783+eason-yuchen-...@users.noreply.github.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../resources/error/kafka-error-conditions.json | 25 ++++- .../spark/sql/kafka010/KafkaExceptions.scala | 71 +++++++++++++ .../sql/kafka010/KafkaOffsetReaderAdmin.scala | 12 ++- .../sql/kafka010/KafkaOffsetReaderConsumer.scala | 12 ++- .../spark/sql/kafka010/KafkaSourceProvider.scala | 59 +++++++++++ .../apache/spark/sql/kafka010/KafkaSourceRDD.scala | 21 ++-- .../spark/sql/kafka010/KafkaRelationSuite.scala | 113 +++++++++++++++++++++ 7 files changed, 304 insertions(+), 9 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json index d6a7aa19d030..4260d96654d7 100644 --- a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json +++ b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json @@ -5,6 +5,12 @@ "topic-partitions for pre-fetched offset: <tpsForPrefetched>, topic-partitions for end offset: <tpsForEndOffset>." ] }, + "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_START_OFFSET_AND_END_OFFSET" : { + "message" : [ + "The specified start offset and end offset should have the same topic-partitions.", + "Topic-partitions for start offset: <tpsForStartOffset>, topic-partitions for end offset: <tpsForEndOffset>." + ] + }, "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED" : { "message" : [ "For Kafka data source with Trigger.AvailableNow, end offset should have lower or equal offset per each topic partition than pre-fetched offset. The error could be transient - restart your query, and report if you still see the same issue.", @@ -87,7 +93,8 @@ }, "PARTITION_OFFSET_CHANGED" : { "message" : [ - "Partition <topicPartition> offset was changed from <prevOffset> to <newOffset>." + "Partition <topicPartition> offset was changed from <prevOffset> to <newOffset>.", + "This could be either 1) a user error that the start offset is set beyond available offset when starting query, or 2) the kafka topic-partition is deleted and re-created." ] }, "START_OFFSET_RESET" : { @@ -97,5 +104,21 @@ } }, "sqlState" : "22000" + }, + "RESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET" : { + "message" : [ + "The resolved start offset <startOffset> is greater than the resolved end offset <endOffset> for topic <topic> partition <partition>.", + "Please investigate in this order:", + "1. Check whether start offset is set beyond the end offset or the latest available offset after resolution. Possible inputs are:", + " (a) Start offset is set to a timestamp, and the end offset is set to an index of offset.", + " (b) Start offset is set to an index of offset, and the end offset is set to a timestamp.", + " (c) Start offset is set, and the end offset is set to latest.", + "2. Check whether the Kafka topic-partition is deleted and re-created." + ] + }, + "UNRESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET" : { + "message" : [ + "The specified start <offsetType> <startOffset> is greater than the end <offsetType> <endOffset> for topic <topic> partition <partition>." + ] } } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala index a6eb13e68c19..156bb71d777d 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala @@ -183,6 +183,57 @@ object KafkaExceptions { errorClass = "KAFKA_NULL_TOPIC_IN_DATA", messageParameters = Map.empty) } + + def unmatchedTopicPartitionsBetweenOffsets( + startOffset: Set[TopicPartition], + endOffset: Set[TopicPartition]): KafkaIllegalArgumentException = { + new KafkaIllegalArgumentException( + errorClass = "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_START_OFFSET_AND_END_OFFSET", + messageParameters = Map( + "tpsForStartOffset" -> startOffset.mkString(", "), + "tpsForEndOffset" -> endOffset.mkString(", "))) + } + + def unresolvedStartOffsetGreaterThanEndOffset( + startOffset: Long, + endOffset: Long, + topicPartition: TopicPartition): KafkaIllegalArgumentException = { + new KafkaIllegalArgumentException( + errorClass = "UNRESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET", + messageParameters = Map( + "offsetType" -> "offset", + "startOffset" -> startOffset.toString, + "endOffset" -> endOffset.toString, + "topic" -> topicPartition.topic, + "partition" -> topicPartition.partition.toString)) + } + + def unresolvedStartTimestampGreaterThanEndTimestamp( + startOffset: Long, + endOffset: Long, + topicPartition: TopicPartition): KafkaIllegalArgumentException = { + new KafkaIllegalArgumentException( + errorClass = "UNRESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET", + messageParameters = Map( + "offsetType" -> "timestamp", + "startOffset" -> startOffset.toString, + "endOffset" -> endOffset.toString, + "topic" -> topicPartition.topic, + "partition" -> topicPartition.partition.toString)) + } + + def resolvedStartOffsetGreaterThanEndOffset( + startOffset: Long, + endOffset: Long, + topicPartition: TopicPartition): KafkaIllegalStateException = { + new KafkaIllegalStateException( + errorClass = "RESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET", + messageParameters = Map( + "startOffset" -> startOffset.toString, + "endOffset" -> endOffset.toString, + "topic" -> topicPartition.topic, + "partition" -> topicPartition.partition.toString)) + } } /** @@ -204,3 +255,23 @@ private[kafka010] class KafkaIllegalStateException( override def getCondition: String = errorClass } + +/** + * Illegal argument exception thrown with an error class. + */ +private[kafka010] class KafkaIllegalArgumentException( + errorClass: String, + messageParameters: Map[String, String], + cause: Throwable = null) + extends IllegalArgumentException( + KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage( + errorClass, messageParameters), cause) + with SparkThrowable { + + override def getSqlState: String = + KafkaExceptionsHelper.errorClassesJsonReader.getSqlState(errorClass) + + override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava + + override def getCondition: String = errorClass +} diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index bfc6139bdb72..86cec6fc041c 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -401,6 +401,14 @@ private[kafka010] class KafkaOffsetReaderAdmin( throw new IllegalStateException(s"$tp doesn't have a from offset") ) val untilOffset = untilPartitionOffsets(tp) + + KafkaSourceProvider.checkStartOffsetNotGreaterThanEndOffset( + fromOffset, + untilOffset, + tp, + KafkaExceptions.resolvedStartOffsetGreaterThanEndOffset + ) + KafkaOffsetRange(tp, fromOffset, untilOffset, None) }.toSeq @@ -504,7 +512,9 @@ private[kafka010] class KafkaOffsetReaderAdmin( if (untilOffset < fromOffset) { reportDataLoss( s"Partition $tp's offset was changed from " + - s"$fromOffset to $untilOffset, some data may have been missed", + s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " + + "offset is set beyond available offset when starting query, or 2) the kafka " + + "topic-partition is deleted and re-created.", () => KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset)) } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index 7aadde7218f5..fb06797d1fe3 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -450,6 +450,14 @@ private[kafka010] class KafkaOffsetReaderConsumer( // fromPartitionOffsets throw new IllegalStateException(s"$tp doesn't have a from offset")) val untilOffset = untilPartitionOffsets(tp) + + KafkaSourceProvider.checkStartOffsetNotGreaterThanEndOffset( + fromOffset, + untilOffset, + tp, + KafkaExceptions.resolvedStartOffsetGreaterThanEndOffset + ) + KafkaOffsetRange(tp, fromOffset, untilOffset, None) }.toSeq @@ -553,7 +561,9 @@ private[kafka010] class KafkaOffsetReaderConsumer( if (untilOffset < fromOffset) { reportDataLoss( s"Partition $tp's offset was changed from " + - s"$fromOffset to $untilOffset, some data may have been missed", + s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " + + "offset is set beyond available offset when starting query, or 2) the kafka " + + "topic-partition is deleted and re-created.", () => KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset)) } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 4cb9fa8df805..82ad75e028af 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -24,6 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.spark.internal.{Logging, LogKeys, MDC} @@ -141,6 +142,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister LatestOffsetRangeLimit) assert(endingRelationOffsets != EarliestOffsetRangeLimit) + checkOffsetLimitValidity(startingRelationOffsets, endingRelationOffsets) + val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean new KafkaRelation( @@ -463,6 +466,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) + checkOffsetLimitValidity(startingRelationOffsets, endingRelationOffsets) + new KafkaBatch( strategy(caseInsensitiveOptions), caseInsensitiveOptions, @@ -608,6 +613,60 @@ private[kafka010] object KafkaSourceProvider extends Logging { private val serClassName = classOf[ByteArraySerializer].getName private val deserClassName = classOf[ByteArrayDeserializer].getName + def checkStartOffsetNotGreaterThanEndOffset( + startOffset: Long, + endOffset: Long, + topicPartition: TopicPartition, + exception: (Long, Long, TopicPartition) => Exception): Unit = { + // earliest or latest offsets are negative and should not be compared + if (startOffset > endOffset && startOffset >= 0 && endOffset >= 0) { + throw exception(startOffset, endOffset, topicPartition) + } + } + + def checkOffsetLimitValidity( + startOffset: KafkaOffsetRangeLimit, + endOffset: KafkaOffsetRangeLimit): Unit = { + startOffset match { + case start: SpecificOffsetRangeLimit if endOffset.isInstanceOf[SpecificOffsetRangeLimit] => + val end = endOffset.asInstanceOf[SpecificOffsetRangeLimit] + if (start.partitionOffsets.keySet != end.partitionOffsets.keySet) { + throw KafkaExceptions.unmatchedTopicPartitionsBetweenOffsets( + start.partitionOffsets.keySet, end.partitionOffsets.keySet + ) + } + start.partitionOffsets.foreach { + case (tp, startOffset) => + checkStartOffsetNotGreaterThanEndOffset( + startOffset, + end.partitionOffsets(tp), + tp, + KafkaExceptions.unresolvedStartOffsetGreaterThanEndOffset + ) + } + + case start: SpecificTimestampRangeLimit + if endOffset.isInstanceOf[SpecificTimestampRangeLimit] => + val end = endOffset.asInstanceOf[SpecificTimestampRangeLimit] + if (start.topicTimestamps.keySet != end.topicTimestamps.keySet) { + throw KafkaExceptions.unmatchedTopicPartitionsBetweenOffsets( + start.topicTimestamps.keySet, end.topicTimestamps.keySet + ) + } + start.topicTimestamps.foreach { + case (tp, startOffset) => + checkStartOffsetNotGreaterThanEndOffset( + startOffset, + end.topicTimestamps(tp), + tp, + KafkaExceptions.unresolvedStartTimestampGreaterThanEndTimestamp + ) + } + + case _ => // do nothing + } + } + def getKafkaOffsetRangeLimit( params: CaseInsensitiveMap[String], globalOffsetTimestampOptionKey: String, diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index bc7f8b6b44f9..0f962a29588a 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -21,7 +21,7 @@ import java.{util => ju} import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.internal.LogKeys.{FROM_OFFSET, PARTITION_ID, TOPIC} import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD @@ -75,11 +75,12 @@ private[kafka010] class KafkaSourceRDD( sourcePartition.offsetRange.topicPartition, executorKafkaParams) val range = resolveRange(consumer, sourcePartition.offsetRange) - assert( - range.fromOffset <= range.untilOffset, - s"Beginning offset ${range.fromOffset} is after the ending offset ${range.untilOffset} " + - s"for topic ${range.topic} partition ${range.partition}. " + - "You either provided an invalid fromOffset, or the Kafka topic has been damaged") + if (range.fromOffset < 0 || range.untilOffset < 0) { + throw SparkException.internalError( + s"Should not have negative offsets for topic ${range.topic} partition ${range.partition} " + + s"at this point: fromOffset ${range.fromOffset} untilOffset ${range.untilOffset}" + ) + } if (range.fromOffset == range.untilOffset) { logInfo(log"Beginning offset ${MDC(FROM_OFFSET, range.fromOffset)} is the same as ending " + log"offset skipping ${MDC(TOPIC, range.topic)} ${MDC(PARTITION_ID, range.partition)}") @@ -137,6 +138,14 @@ private[kafka010] class KafkaSourceRDD( } else { range.untilOffset } + + KafkaSourceProvider.checkStartOffsetNotGreaterThanEndOffset( + fromOffset, + untilOffset, + range.topicPartition, + KafkaExceptions.resolvedStartOffsetGreaterThanEndOffset + ) + KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, range.preferredLoc) } else { diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 1a884533e818..f99db196ce7c 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -21,6 +21,8 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.util.Locale import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.duration.DurationInt + import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition @@ -616,6 +618,95 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession assert(df.rdd.collectPartitions().flatMap(_.map(_.getString(0))).toSet === (0 to 30).map(_.toString).toSet) } + + test("topic-partition for start offset and end offset must match") { + val e = intercept[IllegalArgumentException] { + spark.read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", "test") + .option( + "startingOffsets", + """ + |{"test": {"0": 1, "1": 1, "2": 1}} + |""".stripMargin + ) + .option( + "endingOffsets", + """ + |{"test": {"0": 0, "1": 0}} + |""".stripMargin + ) + .load() + .collect() + } + + assert(e.getMessage.contains( + "The specified start offset and end offset should have the same topic-partitions.") + ) + } + + test("unresolved start offset greater than end offset") { + val e = intercept[IllegalArgumentException] { + spark.read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", "test") + .option( + "startingOffsets", + """ + |{"test": {"0": 1, "1": 1, "2": 1}} + |""".stripMargin + ) + .option( + "endingOffsets", + """ + |{"test": {"0": 0, "1": 0, "2": 0}} + |""".stripMargin + ) + .load() + .collect() + } + + assert(e.getMessage.contains( + "The specified start offset 1 is greater than the end offset 0 for")) + } + + test("resolved start offset greater than end offset (without latest)") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + val timestamp1 = + testUtils.sendMessages(topic, Seq("0", "0").toArray, Some(0))(1)._2.timestamp() + val timestamp2 = + testUtils.sendMessages(topic, Seq("0", "0").toArray, Some(1))(1)._2.timestamp() + val timestamp3 = + testUtils.sendMessages(topic, Seq("0", "0").toArray, Some(2))(1)._2.timestamp() + val df = spark.read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", topic) + .option( + "startingOffsets", + s""" + |{"$topic": {"0": 3, "1": 3, "2": 3}} + |""".stripMargin + ) + .option( + "endingOffsetsByTimestamp", + s""" + |{"$topic": {"0": $timestamp1, "1": $timestamp2, "2": $timestamp3}} + |""".stripMargin + ) + .load() + + eventually(timeout(60.seconds)) { + val e = intercept[IllegalStateException] { + df.collect() + } + assert(e.getMessage.contains( + "The resolved start offset 3 is greater than the resolved end offset 1 for")) + } + } } class KafkaRelationSuiteWithAdminV1 extends KafkaRelationSuiteV1 { @@ -641,6 +732,28 @@ class KafkaRelationSuiteV1 extends KafkaRelationSuiteBase { case _: LogicalRelation => true }.nonEmpty) } + + test("resolved start offset greater than end offset (with latest)") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + val df = spark.read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", topic) + .option( + "startingOffsets", + s""" + |{"$topic": {"0": 1, "1": 1, "2": 1}} + |""".stripMargin + ) + .load() + + val e = intercept[IllegalStateException] { + df.collect() + } + assert(e.getMessage.contains( + "The resolved start offset 1 is greater than the resolved end offset 0 for")) + } } class KafkaRelationSuiteV2 extends KafkaRelationSuiteBase { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org