This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7c9456f6a7c6 [SPARK-55699][SS][FOLLOWUP] Inconsistent reading of
LowLatencyClock when used together with ManualClock
7c9456f6a7c6 is described below
commit 7c9456f6a7c60cf878078fa589efea839b908515
Author: Yuchen Liu <[email protected]>
AuthorDate: Mon Mar 2 07:18:49 2026 +0900
[SPARK-55699][SS][FOLLOWUP] Inconsistent reading of LowLatencyClock when
used together with ManualClock
### What changes were proposed in this pull request?
In the previous [PR](https://github.com/apache/spark/pull/54497), we update
the signature of `nextWithTimeout` in `SupportsRealTimeRead`. However, there
was a bug introduced in `LowLatencyMemoryStream` where we compared millisecond
timestamp and nanosecond timestamp directly without conversion.
This PR fixes this issue, and renamed the parameter to better prevent such
issue from happening.
### Why are the changes needed?
**Context of the previous PR:** There was an issue that RTM tests that use
manual clock might stuck because manual clock advancement may happens right in
between `LowLatencyReaderWrap` getting the reference time, and
`nextWithTimeout` in each source getting the start time. `nextWithTimeout` may
uses the already advanced time to time its wait. When this happens, since the
manual clock may only be advanced once by the test, `nextWithTimeout` may never
return, causing test timeout. This [...]
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54550 from eason-yuchen-liu/fixLowLatencyClockInconsistency2.
Authored-by: Yuchen Liu
<[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala | 2 +-
.../spark/sql/connector/read/streaming/SupportsRealTimeRead.java | 6 +++---
.../sql/execution/streaming/sources/LowLatencyMemoryStream.scala | 7 +++----
3 files changed, 7 insertions(+), 8 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
index 77ebcb04f2f7..36aaf3e76ff9 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
@@ -105,7 +105,7 @@ private case class KafkaBatchPartitionReader(
}
override def nextWithTimeout(
- startTime: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
+ startTimeMs: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
if (!iteratorForRealTimeMode.isDefined) {
logInfo(s"Getting a new kafka consuming iterator for
${offsetRange.topicPartition} " +
s"starting from ${nextOffset}, timeoutMs ${timeoutMs}")
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
index 1e939c14cdc9..5542781f333a 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsRealTimeRead.java
@@ -78,12 +78,12 @@ public interface SupportsRealTimeRead<T> extends
PartitionReader<T> {
* Alternative function to be called than next(), that proceed to the next
record. The different
* from next() is that, if there is no more records, the call needs to
keep waiting until
* the timeout.
- * @param startTime the base time (milliseconds) the was used to calculate
the timeout.
+ * @param startTimeMs the base time (milliseconds) the was used to
calculate the timeout.
* Sources should use it as the reference time to start
waiting for the next
* record instead of getting the latest time from
LowLatencyClock.
- * @param timeout if no result is available after this timeout
(milliseconds), return
+ * @param timeoutMs if no result is available after this timeout
(milliseconds), return
* @return {@link RecordStatus} describing whether a record is available
and its arrival time
* @throws IOException
*/
- RecordStatus nextWithTimeout(Long startTime, Long timeout) throws
IOException;
+ RecordStatus nextWithTimeout(Long startTimeMs, Long timeoutMs) throws
IOException;
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
index bc8f51c95861..50bff9735f62 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
@@ -278,21 +278,20 @@ class LowLatencyMemoryStreamPartitionReader(
throw new IllegalStateException("Task context was not set!")
}
override def nextWithTimeout(
- startTime: java.lang.Long, timeout: java.lang.Long): RecordStatus = {
+ startTimeMs: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
// SPARK-55699: Use the reference time passed in by the caller instead of
getting the latest
// time from LowLatencyClock, to avoid inconsistent reading when
LowLatencyClock is a
// manual clock.
- val startReadTime = startTime
var elapsedTimeMs = 0L
current = getRecordWithTimestamp
while (current.isEmpty) {
val POLL_TIME = 10L
- if (elapsedTimeMs >= timeout) {
+ if (elapsedTimeMs >= timeoutMs) {
return RecordStatus.newStatusWithoutArrivalTime(false)
}
Thread.sleep(POLL_TIME)
current = getRecordWithTimestamp
- elapsedTimeMs = (clock.nanoTime() - startReadTime) / 1000 / 1000
+ elapsedTimeMs = clock.getTimeMillis() - startTimeMs
}
currentOffset += 1
RecordStatus.newStatusWithArrivalTimeMs(current.get._2)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]