This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 5f291bf07cf KAFKA-20634: Spurious HighWatermarkUpdate failed errors in
the group coordinator after partition leadership change (#22444)
5f291bf07cf is described below
commit 5f291bf07cfcf17aab789a27ca3589ae82ca413d
Author: David Jacot <[email protected]>
AuthorDate: Wed Jun 3 17:40:52 2026 +0200
KAFKA-20634: Spurious HighWatermarkUpdate failed errors in the group
coordinator after partition leadership change (#22444)
When a `__consumer_offsets` partition transitions to follower, its local
log is truncated and re-replicated from the new leader. The group
coordinator hosting the partition remains active until it is unloaded
asynchronously. During that window, the partition's high watermark
advances again over records that this coordinator did not write, while
the coordinator still holds in-memory state (and pending deferred
operations) for its own records that were truncated and never durably
committed.
Applying such a high watermark has two consequences. It can violate the
invariants of the snapshot registry and fail the `HighWatermarkUpdate`
event, logging a spurious error such as "Execution of
HighWatermarkUpdate failed due to New committed offset X of
__consumer_offsets-N must be less than or equal to Y". More importantly,
when it does not fail, it advances the committed offset over the
coordinator's uncommitted state and completes the corresponding deferred
writes with a success response, even though those records were lost. A
client can therefore receive a successful offset-commit acknowledgment
for a commit that is silently dropped once the new coordinator takes
over.
This patch gates high watermark propagation in
`CoordinatorPartitionWriter.ListenerAdapter` on the partition's
leadership. The adapter stops forwarding high watermark updates once the
partition transitions to follower, is deleted, or fails. The partition
signals these transitions (via `PartitionListener`) before its fetcher
is restarted (see `ReplicaManager#applyDelta`), i.e. before any such
high watermark can be produced, so the coordinator never observes a high
watermark that it should not apply. The pending deferred operations then
remain in place and are failed with `NOT_COORDINATOR` when the
coordinator is unloaded, so clients correctly retry against the new
coordinator.
Gating on leadership rather than inspecting the offset is deliberate:
after truncation an offset can still have a snapshot in the registry
while holding the new leader's data, so no offset-based check can tell
whether a high watermark is safe to apply.
Reviewers: Sean Quah <[email protected]>
---
.../common/runtime/PartitionWriter.java | 29 +++++++++++-
.../group/CoordinatorPartitionWriter.scala | 28 +++++++++++-
.../group/CoordinatorPartitionWriterTest.scala | 51 ++++++++++++++++++++++
.../unit/kafka/server/ReplicaManagerTest.scala | 32 ++++++++++++++
4 files changed, 137 insertions(+), 3 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
index cb8bec3f71c..3807a895c75 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/PartitionWriter.java
@@ -32,9 +32,29 @@ public interface PartitionWriter {
/**
* Listener allowing to listen to high watermark changes. This is meant
- * to be used in conjunction with {{@link
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}}.
+ * to be used in conjunction with {@link
PartitionWriter#append(TopicPartition, VerificationGuard, MemoryRecords)}.
+ * <p>
+ * A registered listener observes a single leadership tenure of the
partition. It is
+ * delivered high watermark updates only while this broker is the leader
of the
+ * partition. Once the partition is no longer led by this broker (it
transitions to
+ * follower, is deleted, or fails), the listener is permanently retired:
no further
+ * updates are delivered to it, even if the broker later regains
leadership. A new
+ * listener must be registered to observe a subsequent tenure.
+ * <p>
+ * Retiring the listener is required because, after a leadership change,
the local log
+ * can be truncated and re-replicated from the new leader, so a high
watermark observed
+ * afterwards may advance over records that this broker never wrote. This
guarantees
+ * that every delivered update advances only over records that this broker
wrote as
+ * leader.
*/
interface Listener {
+ /**
+ * Called when the high watermark of the partition advances. Only
invoked while
+ * this broker is the leader of the partition (see {@link Listener}).
+ *
+ * @param tp The topic partition.
+ * @param offset The new high watermark.
+ */
void onHighWatermarkUpdated(
TopicPartition tp,
long offset
@@ -42,7 +62,12 @@ public interface PartitionWriter {
}
/**
- * Register a {{@link Listener}}.
+ * Register a {@link Listener}.
+ * <p>
+ * The listener observes only the current leadership tenure: as described
on
+ * {@link Listener}, it stops receiving updates once the partition is no
longer led by
+ * this broker and is not re-armed if leadership is regained. A new
listener must be
+ * registered to observe a later tenure.
*
* @param tp The partition to register the listener to.
* @param listener The listener.
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index 08b3c9aa498..e5700288ebc 100644
---
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -28,20 +28,46 @@ import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
VerificationGuard}
import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.Map
/**
* ListenerAdapter adapts the PartitionListener interface to the
* PartitionWriter.Listener interface.
+ *
+ * This upholds the PartitionWriter.Listener contract that high watermark
updates are
+ * delivered only while the partition is led by this broker. When the partition
+ * transitions to follower, is deleted or fails, its local log can be
truncated and
+ * re-replicated from the new leader, so the high watermark would advance over
records
+ * that this broker did not write; propagating those updates would corrupt the
+ * coordinator's committed state. The partition notifies these transitions
before its
+ * fetcher is restarted (see ReplicaManager#applyDelta), i.e. before any such
update can
+ * be produced, so gating on this flag is sufficient to stop them.
*/
private[group] class ListenerAdapter(
val listener: PartitionWriter.Listener
) extends PartitionListener {
+ private val active = new AtomicBoolean(true)
+
override def onHighWatermarkUpdated(
tp: TopicPartition,
offset: Long
): Unit = {
- listener.onHighWatermarkUpdated(tp, offset)
+ if (active.get()) {
+ listener.onHighWatermarkUpdated(tp, offset)
+ }
+ }
+
+ override def onBecomingFollower(tp: TopicPartition): Unit = {
+ active.set(false)
+ }
+
+ override def onFailed(tp: TopicPartition): Unit = {
+ active.set(false)
+ }
+
+ override def onDeleted(tp: TopicPartition): Unit = {
+ active.set(false)
}
override def equals(that: Any): Boolean = that match {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index 67f5deeaaee..5e9ec45ff43 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -35,6 +35,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}
import java.nio.charset.Charset
+import java.util
import java.util.Collections
import scala.collection.Map
import scala.jdk.CollectionConverters._
@@ -68,6 +69,56 @@ class CoordinatorPartitionWriterTest {
)
}
+ @Test
+ def testListenerAdapterPropagatesHighWatermarkUpdates(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val updates = new util.ArrayList[Long]()
+ val adapter = new ListenerAdapter(new PartitionWriter.Listener {
+ override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long):
Unit = updates.add(offset)
+ })
+
+ adapter.onHighWatermarkUpdated(tp, 10L)
+ adapter.onHighWatermarkUpdated(tp, 20L)
+
+ assertEquals(util.List.of(10L, 20L), updates)
+ }
+
+ @Test
+ def testListenerAdapterStopsPropagatingAfterBecomingFollower(): Unit = {
+ assertHighWatermarkPropagationStops(_.onBecomingFollower(_))
+ }
+
+ @Test
+ def testListenerAdapterStopsPropagatingAfterFailed(): Unit = {
+ assertHighWatermarkPropagationStops(_.onFailed(_))
+ }
+
+ @Test
+ def testListenerAdapterStopsPropagatingAfterDeleted(): Unit = {
+ assertHighWatermarkPropagationStops(_.onDeleted(_))
+ }
+
+ /**
+ * Verifies that no high watermark update is propagated to the wrapped
listener
+ * once the given transition has been signalled. Such updates are not safe to
+ * apply because the partition is no longer led by this broker.
+ */
+ private def assertHighWatermarkPropagationStops(
+ transition: (ListenerAdapter, TopicPartition) => Unit
+ ): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val updates = new util.ArrayList[Long]()
+ val adapter = new ListenerAdapter(new PartitionWriter.Listener {
+ override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long):
Unit = updates.add(offset)
+ })
+
+ adapter.onHighWatermarkUpdated(tp, 10L)
+ transition(adapter, tp)
+ adapter.onHighWatermarkUpdated(tp, 20L)
+
+ assertEquals(util.List.of(10L), updates)
+ }
+
@Test
def testConfig(): Unit = {
val tp = new TopicPartition("foo", 0)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a09e1f6ebd2..6d2da47f3e8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -5643,6 +5643,38 @@ class ReplicaManagerTest {
}
}
+ @Test
+ def testPartitionListenerWhenPartitionBecomesFollower(): Unit = {
+ val localId = 0
+ val topicPartition = new TopicPartition("foo", 0)
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), localId)
+ try {
+ val listener = new MockPartitionListener
+ listener.verify()
+
+ // Broker 0 becomes leader of the partition.
+ val leaderTopicsDelta = topicsCreateDelta(localId, isStartIdLeader =
true)
+ val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+ replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
+
+ // Register a listener.
+ assertTrue(replicaManager.maybeAddListener(topicPartition, listener))
+ listener.verify()
+
+ // Broker 0 transitions to follower of the partition. The listener is
notified that
+ // the partition is becoming a follower. This happens before the
follower starts
+ // fetching from the new leader, hence before any high watermark update
reflecting
+ // the new leader's records.
+ val followerTopicsDelta =
topicsChangeDelta(leaderMetadataImage.topics(), localId, isStartIdLeader =
false)
+ val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+ replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+ listener.verify(expectedFollower = true)
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean,
partition:Int = 0, directoryIds: List[Uuid] = List.empty): TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(TopicsImage.EMPTY)