This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b5cd7dc59e8 [SPARK-52096][SQL][SS] Reclassify kafka source offset 
assertion error
2b5cd7dc59e8 is described below

commit 2b5cd7dc59e852e9b60fdd896ccfbcfb275b7071
Author: Yuchen Liu <170372783+eason-yuchen-...@users.noreply.github.com>
AuthorDate: Thu May 15 22:26:27 2025 +0900

    [SPARK-52096][SQL][SS] Reclassify kafka source offset assertion error
    
    ### What changes were proposed in this pull request?
    
    This PR classifies the assertion error saying that the start offset of 
Kafka source is greater than end offset to either user error or Kafka internal 
error. There are different possible cases that can lead to this error. Here is 
a list in the time order that we can catch them:
    1. If the user specifies startingOffset and endingOffset in a comparable 
way (either by index or timestamp), we can compare them before starting the 
query.
    2. If the user provides startingOffset and endingOffset but they are 
incomparable initially, we should compare them as soon as they are resolved by 
contacting the kafka brokers.
    3. If the user sets startingOffset to some value and endingOffset to 
latest, we should compare them after the query has started and the latest 
offset has been fetched.
    4. It is a Kafka topic-partition is broken.
    ```
    java.lang.AssertionError: assertion failed: Beginning offset 49041276637 is 
after the ending offset 48774253518 for topic xxx partition 1. You either 
provided an invalid fromOffset, or the Kafka topic has been damaged
            at scala.Predef$.assert(Predef.scala:223)
            at 
org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:79)
            at 
org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
            at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
    ```
    
    ### Why are the changes needed?
    
    Most of the time this assertion error is fired due to user errors, and we 
have the information to make it explicit. This PR tries to utilize all the 
information to separate user errors from system errors.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Creates some unit tests in `KafkaRelationSuiteBase`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50866 from eason-yuchen-liu/kafkaOffsetErrorMsg.
    
    Authored-by: Yuchen Liu 
<170372783+eason-yuchen-...@users.noreply.github.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../resources/error/kafka-error-conditions.json    |  25 ++++-
 .../spark/sql/kafka010/KafkaExceptions.scala       |  71 +++++++++++++
 .../sql/kafka010/KafkaOffsetReaderAdmin.scala      |  12 ++-
 .../sql/kafka010/KafkaOffsetReaderConsumer.scala   |  12 ++-
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  59 +++++++++++
 .../apache/spark/sql/kafka010/KafkaSourceRDD.scala |  21 ++--
 .../spark/sql/kafka010/KafkaRelationSuite.scala    | 113 +++++++++++++++++++++
 7 files changed, 304 insertions(+), 9 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json 
b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json
index d6a7aa19d030..4260d96654d7 100644
--- 
a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json
+++ 
b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json
@@ -5,6 +5,12 @@
       "topic-partitions for pre-fetched offset: <tpsForPrefetched>, 
topic-partitions for end offset: <tpsForEndOffset>."
     ]
   },
+  "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_START_OFFSET_AND_END_OFFSET" : {
+    "message" : [
+      "The specified start offset and end offset should have the same 
topic-partitions.",
+      "Topic-partitions for start offset: <tpsForStartOffset>, 
topic-partitions for end offset: <tpsForEndOffset>."
+    ]
+  },
   "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED" : {
     "message" : [
       "For Kafka data source with Trigger.AvailableNow, end offset should have 
lower or equal offset per each topic partition than pre-fetched offset. The 
error could be transient - restart your query, and report if you still see the 
same issue.",
@@ -87,7 +93,8 @@
       },
       "PARTITION_OFFSET_CHANGED" : {
         "message" : [
-          "Partition <topicPartition> offset was changed from <prevOffset> to 
<newOffset>."
+          "Partition <topicPartition> offset was changed from <prevOffset> to 
<newOffset>.",
+          "This could be either 1) a user error that the start offset is set 
beyond available offset when starting query, or 2) the kafka topic-partition is 
deleted and re-created."
         ]
       },
       "START_OFFSET_RESET" : {
@@ -97,5 +104,21 @@
       }
     },
     "sqlState" : "22000"
+  },
+  "RESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET" : {
+    "message" : [
+      "The resolved start offset <startOffset> is greater than the resolved 
end offset <endOffset> for topic <topic> partition <partition>.",
+      "Please investigate in this order:",
+      "1. Check whether start offset is set beyond the end offset or the 
latest available offset after resolution. Possible inputs are:",
+      "  (a) Start offset is set to a timestamp, and the end offset is set to 
an index of offset.",
+      "  (b) Start offset is set to an index of offset, and the end offset is 
set to a timestamp.",
+      "  (c) Start offset is set, and the end offset is set to latest.",
+      "2. Check whether the Kafka topic-partition is deleted and re-created."
+    ]
+  },
+  "UNRESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET" : {
+    "message" : [
+      "The specified start <offsetType> <startOffset> is greater than the end 
<offsetType> <endOffset> for topic <topic> partition <partition>."
+    ]
   }
 }
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
index a6eb13e68c19..156bb71d777d 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
@@ -183,6 +183,57 @@ object KafkaExceptions {
       errorClass = "KAFKA_NULL_TOPIC_IN_DATA",
       messageParameters = Map.empty)
   }
+
+  def unmatchedTopicPartitionsBetweenOffsets(
+      startOffset: Set[TopicPartition],
+      endOffset: Set[TopicPartition]): KafkaIllegalArgumentException = {
+    new KafkaIllegalArgumentException(
+      errorClass = 
"MISMATCHED_TOPIC_PARTITIONS_BETWEEN_START_OFFSET_AND_END_OFFSET",
+      messageParameters = Map(
+        "tpsForStartOffset" -> startOffset.mkString(", "),
+        "tpsForEndOffset" -> endOffset.mkString(", ")))
+  }
+
+  def unresolvedStartOffsetGreaterThanEndOffset(
+      startOffset: Long,
+      endOffset: Long,
+      topicPartition: TopicPartition): KafkaIllegalArgumentException = {
+    new KafkaIllegalArgumentException(
+      errorClass = "UNRESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET",
+      messageParameters = Map(
+        "offsetType" -> "offset",
+        "startOffset" -> startOffset.toString,
+        "endOffset" -> endOffset.toString,
+        "topic" -> topicPartition.topic,
+        "partition" -> topicPartition.partition.toString))
+  }
+
+  def unresolvedStartTimestampGreaterThanEndTimestamp(
+      startOffset: Long,
+      endOffset: Long,
+      topicPartition: TopicPartition): KafkaIllegalArgumentException = {
+    new KafkaIllegalArgumentException(
+      errorClass = "UNRESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET",
+      messageParameters = Map(
+        "offsetType" -> "timestamp",
+        "startOffset" -> startOffset.toString,
+        "endOffset" -> endOffset.toString,
+        "topic" -> topicPartition.topic,
+        "partition" -> topicPartition.partition.toString))
+  }
+
+  def resolvedStartOffsetGreaterThanEndOffset(
+      startOffset: Long,
+      endOffset: Long,
+      topicPartition: TopicPartition): KafkaIllegalStateException = {
+    new KafkaIllegalStateException(
+      errorClass = "RESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET",
+      messageParameters = Map(
+        "startOffset" -> startOffset.toString,
+        "endOffset" -> endOffset.toString,
+        "topic" -> topicPartition.topic,
+        "partition" -> topicPartition.partition.toString))
+  }
 }
 
 /**
@@ -204,3 +255,23 @@ private[kafka010] class KafkaIllegalStateException(
 
   override def getCondition: String = errorClass
 }
+
+/**
+ * Illegal argument exception thrown with an error class.
+ */
+private[kafka010] class KafkaIllegalArgumentException(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    cause: Throwable = null)
+  extends IllegalArgumentException(
+    KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
+      errorClass, messageParameters), cause)
+  with SparkThrowable {
+
+  override def getSqlState: String =
+    KafkaExceptionsHelper.errorClassesJsonReader.getSqlState(errorClass)
+
+  override def getMessageParameters: java.util.Map[String, String] = 
messageParameters.asJava
+
+  override def getCondition: String = errorClass
+}
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
index bfc6139bdb72..86cec6fc041c 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
@@ -401,6 +401,14 @@ private[kafka010] class KafkaOffsetReaderAdmin(
         throw new IllegalStateException(s"$tp doesn't have a from offset")
       )
       val untilOffset = untilPartitionOffsets(tp)
+
+      KafkaSourceProvider.checkStartOffsetNotGreaterThanEndOffset(
+        fromOffset,
+        untilOffset,
+        tp,
+        KafkaExceptions.resolvedStartOffsetGreaterThanEndOffset
+      )
+
       KafkaOffsetRange(tp, fromOffset, untilOffset, None)
     }.toSeq
 
@@ -504,7 +512,9 @@ private[kafka010] class KafkaOffsetReaderAdmin(
       if (untilOffset < fromOffset) {
         reportDataLoss(
           s"Partition $tp's offset was changed from " +
-            s"$fromOffset to $untilOffset, some data may have been missed",
+            s"$fromOffset to $untilOffset. This could be either 1) a user 
error that the start " +
+            "offset is set beyond available offset when starting query, or 2) 
the kafka " +
+            "topic-partition is deleted and re-created.",
           () =>
             KafkaExceptions.partitionOffsetChanged(tp, fromOffset, 
untilOffset))
       }
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
index 7aadde7218f5..fb06797d1fe3 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
@@ -450,6 +450,14 @@ private[kafka010] class KafkaOffsetReaderConsumer(
         // fromPartitionOffsets
         throw new IllegalStateException(s"$tp doesn't have a from offset"))
       val untilOffset = untilPartitionOffsets(tp)
+
+      KafkaSourceProvider.checkStartOffsetNotGreaterThanEndOffset(
+        fromOffset,
+        untilOffset,
+        tp,
+        KafkaExceptions.resolvedStartOffsetGreaterThanEndOffset
+      )
+
       KafkaOffsetRange(tp, fromOffset, untilOffset, None)
     }.toSeq
 
@@ -553,7 +561,9 @@ private[kafka010] class KafkaOffsetReaderConsumer(
       if (untilOffset < fromOffset) {
         reportDataLoss(
           s"Partition $tp's offset was changed from " +
-            s"$fromOffset to $untilOffset, some data may have been missed",
+            s"$fromOffset to $untilOffset. This could be either 1) a user 
error that the start " +
+            "offset is set beyond available offset when starting query, or 2) 
the kafka " +
+            "topic-partition is deleted and re-created.",
           () =>
             KafkaExceptions.partitionOffsetChanged(tp, fromOffset, 
untilOffset))
       }
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 4cb9fa8df805..82ad75e028af 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -24,6 +24,7 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
 
 import org.apache.spark.internal.{Logging, LogKeys, MDC}
@@ -141,6 +142,8 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       LatestOffsetRangeLimit)
     assert(endingRelationOffsets != EarliestOffsetRangeLimit)
 
+    checkOffsetLimitValidity(startingRelationOffsets, endingRelationOffsets)
+
     val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, 
"false").toBoolean
 
     new KafkaRelation(
@@ -463,6 +466,8 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
         ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY,
         LatestOffsetRangeLimit)
 
+      checkOffsetLimitValidity(startingRelationOffsets, endingRelationOffsets)
+
       new KafkaBatch(
         strategy(caseInsensitiveOptions),
         caseInsensitiveOptions,
@@ -608,6 +613,60 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   private val serClassName = classOf[ByteArraySerializer].getName
   private val deserClassName = classOf[ByteArrayDeserializer].getName
 
+  def checkStartOffsetNotGreaterThanEndOffset(
+      startOffset: Long,
+      endOffset: Long,
+      topicPartition: TopicPartition,
+      exception: (Long, Long, TopicPartition) => Exception): Unit = {
+    // earliest or latest offsets are negative and should not be compared
+    if (startOffset > endOffset && startOffset >= 0 && endOffset >= 0) {
+      throw exception(startOffset, endOffset, topicPartition)
+    }
+  }
+
+  def checkOffsetLimitValidity(
+      startOffset: KafkaOffsetRangeLimit,
+      endOffset: KafkaOffsetRangeLimit): Unit = {
+    startOffset match {
+      case start: SpecificOffsetRangeLimit if 
endOffset.isInstanceOf[SpecificOffsetRangeLimit] =>
+        val end = endOffset.asInstanceOf[SpecificOffsetRangeLimit]
+        if (start.partitionOffsets.keySet != end.partitionOffsets.keySet) {
+          throw KafkaExceptions.unmatchedTopicPartitionsBetweenOffsets(
+            start.partitionOffsets.keySet, end.partitionOffsets.keySet
+          )
+        }
+        start.partitionOffsets.foreach {
+          case (tp, startOffset) =>
+            checkStartOffsetNotGreaterThanEndOffset(
+              startOffset,
+              end.partitionOffsets(tp),
+              tp,
+              KafkaExceptions.unresolvedStartOffsetGreaterThanEndOffset
+            )
+        }
+
+      case start: SpecificTimestampRangeLimit
+        if endOffset.isInstanceOf[SpecificTimestampRangeLimit] =>
+        val end = endOffset.asInstanceOf[SpecificTimestampRangeLimit]
+        if (start.topicTimestamps.keySet != end.topicTimestamps.keySet) {
+          throw KafkaExceptions.unmatchedTopicPartitionsBetweenOffsets(
+            start.topicTimestamps.keySet, end.topicTimestamps.keySet
+          )
+        }
+        start.topicTimestamps.foreach {
+          case (tp, startOffset) =>
+            checkStartOffsetNotGreaterThanEndOffset(
+              startOffset,
+              end.topicTimestamps(tp),
+              tp,
+              KafkaExceptions.unresolvedStartTimestampGreaterThanEndTimestamp
+            )
+        }
+
+      case _ =>  // do nothing
+    }
+  }
+
   def getKafkaOffsetRangeLimit(
       params: CaseInsensitiveMap[String],
       globalOffsetTimestampOptionKey: String,
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index bc7f8b6b44f9..0f962a29588a 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -21,7 +21,7 @@ import java.{util => ju}
 
 import org.apache.kafka.clients.consumer.ConsumerRecord
 
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
 import org.apache.spark.internal.LogKeys.{FROM_OFFSET, PARTITION_ID, TOPIC}
 import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
@@ -75,11 +75,12 @@ private[kafka010] class KafkaSourceRDD(
       sourcePartition.offsetRange.topicPartition, executorKafkaParams)
 
     val range = resolveRange(consumer, sourcePartition.offsetRange)
-    assert(
-      range.fromOffset <= range.untilOffset,
-      s"Beginning offset ${range.fromOffset} is after the ending offset 
${range.untilOffset} " +
-        s"for topic ${range.topic} partition ${range.partition}. " +
-        "You either provided an invalid fromOffset, or the Kafka topic has 
been damaged")
+    if (range.fromOffset < 0 || range.untilOffset < 0) {
+      throw SparkException.internalError(
+        s"Should not have negative offsets for topic ${range.topic} partition 
${range.partition} " +
+          s"at this point: fromOffset ${range.fromOffset} untilOffset 
${range.untilOffset}"
+      )
+    }
     if (range.fromOffset == range.untilOffset) {
       logInfo(log"Beginning offset ${MDC(FROM_OFFSET, range.fromOffset)} is 
the same as ending " +
         log"offset skipping ${MDC(TOPIC, range.topic)} ${MDC(PARTITION_ID, 
range.partition)}")
@@ -137,6 +138,14 @@ private[kafka010] class KafkaSourceRDD(
       } else {
         range.untilOffset
       }
+
+      KafkaSourceProvider.checkStartOffsetNotGreaterThanEndOffset(
+        fromOffset,
+        untilOffset,
+        range.topicPartition,
+        KafkaExceptions.resolvedStartOffsetGreaterThanEndOffset
+      )
+
       KafkaOffsetRange(range.topicPartition,
         fromOffset, untilOffset, range.preferredLoc)
     } else {
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index 1a884533e818..f99db196ce7c 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -21,6 +21,8 @@ import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Locale
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.concurrent.duration.DurationInt
+
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 
@@ -616,6 +618,95 @@ abstract class KafkaRelationSuiteBase extends QueryTest 
with SharedSparkSession
     assert(df.rdd.collectPartitions().flatMap(_.map(_.getString(0))).toSet
       === (0 to 30).map(_.toString).toSet)
   }
+
+  test("topic-partition for start offset and end offset must match") {
+    val e = intercept[IllegalArgumentException] {
+      spark.read
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("subscribe", "test")
+        .option(
+          "startingOffsets",
+          """
+            |{"test": {"0": 1, "1": 1, "2": 1}}
+            |""".stripMargin
+        )
+        .option(
+          "endingOffsets",
+          """
+            |{"test": {"0": 0, "1": 0}}
+            |""".stripMargin
+        )
+        .load()
+        .collect()
+    }
+
+    assert(e.getMessage.contains(
+      "The specified start offset and end offset should have the same 
topic-partitions.")
+    )
+  }
+
+  test("unresolved start offset greater than end offset") {
+    val e = intercept[IllegalArgumentException] {
+      spark.read
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("subscribe", "test")
+        .option(
+          "startingOffsets",
+          """
+            |{"test": {"0": 1, "1": 1, "2": 1}}
+            |""".stripMargin
+        )
+        .option(
+          "endingOffsets",
+          """
+            |{"test": {"0": 0, "1": 0, "2": 0}}
+            |""".stripMargin
+        )
+        .load()
+        .collect()
+    }
+
+    assert(e.getMessage.contains(
+      "The specified start offset 1 is greater than the end offset 0 for"))
+  }
+
+  test("resolved start offset greater than end offset (without latest)") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    val timestamp1 =
+      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(0))(1)._2.timestamp()
+    val timestamp2 =
+      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(1))(1)._2.timestamp()
+    val timestamp3 =
+      testUtils.sendMessages(topic, Seq("0", "0").toArray, 
Some(2))(1)._2.timestamp()
+    val df = spark.read
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(
+        "startingOffsets",
+        s"""
+          |{"$topic": {"0": 3, "1": 3, "2": 3}}
+          |""".stripMargin
+      )
+      .option(
+        "endingOffsetsByTimestamp",
+        s"""
+          |{"$topic": {"0": $timestamp1, "1": $timestamp2, "2": $timestamp3}}
+          |""".stripMargin
+      )
+      .load()
+
+    eventually(timeout(60.seconds)) {
+      val e = intercept[IllegalStateException] {
+        df.collect()
+      }
+      assert(e.getMessage.contains(
+        "The resolved start offset 3 is greater than the resolved end offset 1 
for"))
+    }
+  }
 }
 
 class KafkaRelationSuiteWithAdminV1 extends KafkaRelationSuiteV1 {
@@ -641,6 +732,28 @@ class KafkaRelationSuiteV1 extends KafkaRelationSuiteBase {
       case _: LogicalRelation => true
     }.nonEmpty)
   }
+
+  test("resolved start offset greater than end offset (with latest)") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    val df = spark.read
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(
+        "startingOffsets",
+        s"""
+           |{"$topic": {"0": 1, "1": 1, "2": 1}}
+           |""".stripMargin
+      )
+      .load()
+
+    val e = intercept[IllegalStateException] {
+      df.collect()
+    }
+    assert(e.getMessage.contains(
+      "The resolved start offset 1 is greater than the resolved end offset 0 
for"))
+  }
 }
 
 class KafkaRelationSuiteV2 extends KafkaRelationSuiteBase {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to