This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 3d79b3401a8 MINOR: Cherry pick important fixes for 4.2.1 (#21910)
3d79b3401a8 is described below
commit 3d79b3401a8e789f4588ae13c5f1eb3d4b963aaa
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Mar 31 18:53:10 2026 +0530
MINOR: Cherry pick important fixes for 4.2.1 (#21910)
Cherry picking
*
https://github.com/apache/kafka/commit/3e9ae03b9ca7d1465218912b9067f464adee4c81
*
https://github.com/apache/kafka/commit/6eebb863a0335615b0dd491c8875e79a062d4756
For 4.2.1 cut
Reviewers: Andrew Schofield <[email protected]>
---------
Co-authored-by: Chirag Wadhwa <[email protected]>
Co-authored-by: majialong <[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 123 +++++++++++++++++++++
.../server/share/persister/PartitionFactory.java | 7 +-
.../coordinator/share/ShareCoordinatorShard.java | 2 +-
.../kafka/coordinator/share/ShareGroupOffset.java | 11 +-
.../share/ShareCoordinatorShardTest.java | 40 +++++++
5 files changed, 178 insertions(+), 5 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index d8095838fa0..32b05351e98 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -21,8 +21,10 @@ import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
@@ -3443,6 +3445,97 @@ public class ShareConsumerTest {
}
}
+ @ClusterTest
+ public void testSharePartitionLagAfterAlterShareGroupOffsets() {
+ String groupId = "group1";
+ try (Producer<byte[], byte[]> producer = createProducer();
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ // Producing 100 records to the topic partition.
+ for (int i = 0; i < 100; i++) {
+ producer.send(record);
+ }
+ producer.flush();
+
+ // Create a new share consumer. Since the share.auto.offset.reset
is not altered, it should be latest by default.
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure it joins the group and
subscribes to the topic.
+ waitedPoll(shareConsumer, 2500L, 0, true, groupId, List.of(new
TopicPartition(tp.topic(), 0)));
+ // Producing 5 additional records to the topic partition.
+ for (int i = 0; i < 5; i++) {
+ producer.send(record);
+ }
+ producer.flush();
+ // Polling share consumer to make sure the records are consumed.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 5);
+ assertEquals(5, records.count());
+ // Accept the record first to move the offset forward and register
the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Closing the share consumer so that the offsets can be altered.
+ shareConsumer.close();
+ // Alter the start offset of the share partition to 40.
+ alterShareGroupOffsets(adminClient, groupId, tp, 40L);
+ // After altering, the share partition start offset should be 40.
+ verifySharePartitionStartOffset(adminClient, groupId, tp, 40L);
+ // Verify that the lag is now 65 since the start offset is altered
to 40 and there are total 105 records in the partition.
+ verifySharePartitionLag(adminClient, groupId, tp, 65L);
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Test failed with exception: " + e.getMessage());
+ }
+ }
+
+ @ClusterTest
+ public void testSharePartitionLagAfterDeleteShareGroupOffsets() {
+ String groupId = "group1";
+ alterShareAutoOffsetReset(groupId, "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ Admin adminClient = createAdminClient()) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"Message".getBytes());
+ // Producing 5 records to the topic partition.
+ for (int i = 0; i < 5; i++) {
+ producer.send(record);
+ }
+ producer.flush();
+ // Create a new share consumer.
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Polling share consumer to make sure it joins the group and
consumes the produced records.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 5);
+ assertEquals(5, records.count());
+ // Accept the records first to move the offset forward and
register the state with persister.
+ records.forEach(r -> shareConsumer.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer.commitSync();
+ // After accepting, the lag should be 0 because the record is
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Closing the share consumer so that the offsets can be deleted.
+ shareConsumer.close();
+ // Delete the share group offsets.
+ deleteShareGroupOffsets(adminClient, groupId, tp.topic());
+ // Verify that the share partition offsets are deleted.
+ verifySharePartitionOffsetsDeleted(adminClient, groupId, tp);
+ // Create a new share consumer.
+ ShareConsumer<byte[], byte[]> shareConsumer2 =
createShareConsumer(groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+ shareConsumer2.subscribe(List.of(tp.topic()));
+ // Since the offsets are deleted, the share consumer should
consume from the beginning (share.auto.offset.reset is earliest).
+ // Thus, the consumer should consume all 5 records again.
+ records = waitedPoll(shareConsumer2, 2500L, 5);
+ assertEquals(5, records.count());
+ // Accept the records first to move the offset forward and
register the state with persister.
+ records.forEach(r -> shareConsumer2.acknowledge(r,
AcknowledgeType.ACCEPT));
+ shareConsumer2.commitSync();
+ // After accepting, the lag should be 0 because the records are
consumed successfully.
+ verifySharePartitionLag(adminClient, groupId, tp, 0L);
+ // Closing the share consumer so that the offsets can be deleted.
+ shareConsumer2.close();
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Test failed with exception: " + e.getMessage());
+ }
+ }
+
@ClusterTest
public void testFetchWithThrottledDelivery() {
alterShareAutoOffsetReset("group1", "earliest");
@@ -4127,6 +4220,14 @@ public class ShareConsumerTest {
return partitionResult;
}
+ private void verifySharePartitionStartOffset(Admin adminClient, String
groupId, TopicPartition tp, long expectedStartOffset) throws
InterruptedException {
+ TestUtils.waitForCondition(() -> {
+ SharePartitionOffsetInfo sharePartitionOffsetInfo =
sharePartitionOffsetInfo(adminClient, groupId, tp);
+ return sharePartitionOffsetInfo != null &&
+ sharePartitionOffsetInfo.startOffset() == expectedStartOffset;
+ }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to
retrieve share partition lag");
+ }
+
private void verifySharePartitionLag(Admin adminClient, String groupId,
TopicPartition tp, long expectedLag) throws InterruptedException {
TestUtils.waitForCondition(() -> {
SharePartitionOffsetInfo sharePartitionOffsetInfo =
sharePartitionOffsetInfo(adminClient, groupId, tp);
@@ -4136,6 +4237,28 @@ public class ShareConsumerTest {
}, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to
retrieve share partition lag");
}
+ private void verifySharePartitionOffsetsDeleted(Admin adminClient, String
groupId, TopicPartition tp) throws InterruptedException {
+ TestUtils.waitForCondition(
+ () -> sharePartitionOffsetInfo(adminClient, groupId, tp) == null,
+ DEFAULT_MAX_WAIT_MS,
+ DEFAULT_POLL_INTERVAL_MS,
+ () -> "Failed to retrieve share partition lag");
+ }
+
+ private void alterShareGroupOffsets(Admin adminClient, String groupId,
TopicPartition topicPartition, Long newOffset) throws InterruptedException,
ExecutionException {
+ adminClient.alterShareGroupOffsets(
+ groupId,
+ Map.of(topicPartition, newOffset),
+ new
AlterShareGroupOffsetsOptions().timeoutMs(30000)).partitionResult(topicPartition).get();
+ }
+
+ private void deleteShareGroupOffsets(Admin adminClient, String groupId,
String topic) throws InterruptedException, ExecutionException {
+ adminClient.deleteShareGroupOffsets(
+ groupId,
+ Set.of(topic),
+ new
DeleteShareGroupOffsetsOptions().timeoutMs(30000)).topicResult(topic).get();
+ }
+
private void alterShareRecordLockDurationMs(String groupId, int newValue) {
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.GROUP, groupId);
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new
HashMap<>();
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
index 215e95ff085..b36c483a53f 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
@@ -42,7 +42,12 @@ public class PartitionFactory {
}
public static PartitionStateData newPartitionStateData(int partition, int
stateEpoch, long startOffset) {
- return new PartitionData(partition, stateEpoch, startOffset,
UNINITIALIZED_DELIVERY_COMPLETE_COUNT, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE,
DEFAULT_LEADER_EPOCH, null);
+ // If the start offset is uninitialized (when the share partition is
being initialized for the first time), the
+ // consumption hasn't started yet, and lag cannot be calculated. Thus,
deliveryCompleteCount is also set as -1.
+ // But, if start offset is a non-negative value (when the start offset
is altered), the lag can be calculated
+ // from that point onward. Hence, we set deliveryCompleteCount to 0 in
that case.
+ int deliveryCompleteCount = startOffset == UNINITIALIZED_START_OFFSET
? UNINITIALIZED_DELIVERY_COMPLETE_COUNT : 0;
+ return new PartitionData(partition, stateEpoch, startOffset,
deliveryCompleteCount, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE,
DEFAULT_LEADER_EPOCH, null);
}
public static PartitionErrorData newPartitionErrorData(int partition,
short errorCode, String errorMessage) {
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index aa098090aef..caf7cad1db5 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -279,7 +279,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
// This is an incremental snapshot,
// so we need to apply it to our current soft state.
shareStateMap.compute(mapKey, (k, v) -> v == null ? offsetRecord :
merge(v, value));
- snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 0 : v + 1);
+ snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 1 : v + 1);
}
private void maybeUpdateLeaderEpochMap(SharePartitionKey mapKey, int
leaderEpoch) {
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
index 9f1a2644257..ac17045733d 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
@@ -35,6 +35,7 @@ public class ShareGroupOffset {
public static final int NO_TIMESTAMP = 0;
public static final int UNINITIALIZED_EPOCH = 0;
public static final int UNINITIALIZED_DELIVERY_COMPLETE_COUNT = -1;
+ public static final int UNINITIALIZED_START_OFFSET = -1;
public static final int DEFAULT_EPOCH = 0;
private final int snapshotEpoch;
@@ -160,14 +161,18 @@ public class ShareGroupOffset {
}
public static ShareGroupOffset
fromRequest(InitializeShareGroupStateRequestData.PartitionData data, int
snapshotEpoch, long timestamp) {
- // This method is invoked during InitializeShareGroupStateRequest.
Since the deliveryCompleteCount is not yet
- // known at this stage, it is initialized to its default value.
+ // This method is invoked during InitializeShareGroupStateRequest. If
the start offset is uninitialized (when the
+ // share partition is being initialized for the first time), the
consumption hasn't started yet, and lag cannot
+ // be calculated. Thus, deliveryCompleteCount is also set as -1. But,
if start offset is a non-negative value (when
+ // the start offset is altered), the lag can be calculated from that
point onward. Hence, we set deliveryCompleteCount
+ // to 0 in that case.
+ int deliveryCompleteCount = data.startOffset() ==
UNINITIALIZED_START_OFFSET ? UNINITIALIZED_DELIVERY_COMPLETE_COUNT : 0;
return new ShareGroupOffset(
snapshotEpoch,
data.stateEpoch(),
UNINITIALIZED_EPOCH,
data.startOffset(),
- UNINITIALIZED_DELIVERY_COMPLETE_COUNT,
+ deliveryCompleteCount,
List.of(),
timestamp,
timestamp
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index aab870e4d8a..79b9bb9534c 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -70,6 +70,7 @@ import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -881,6 +882,45 @@ class ShareCoordinatorShardTest {
verify(shard.getMetricsShard(),
times(5)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}
+ @Test
+ public void testSnapshotUpdateCountBoundary() {
+ shard = new ShareCoordinatorShardBuilder()
+
.setConfigOverrides(Map.of(ShareCoordinatorConfig.SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG,
"2"))
+ .build();
+
+ initSharePartition(shard, SHARE_PARTITION_KEY);
+
+ WriteShareGroupStateRequestData request = new
WriteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartition(PARTITION)
+ .setStartOffset(0)
+ .setDeliveryCompleteCount(0)
+ .setStateEpoch(0)
+ .setLeaderEpoch(0)
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0)))))));
+
+ // Write 1: update count 0 < limit 2, should produce update record.
+ CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>
result = shard.writeState(request);
+ assertInstanceOf(ShareUpdateKey.class, result.records().get(0).key());
+ shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+ // Write 2: update count 1 < limit 2, should produce update record.
+ result = shard.writeState(request);
+ assertInstanceOf(ShareUpdateKey.class, result.records().get(0).key());
+ shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+ // Write 3: update count 2 >= limit 2, should produce snapshot record.
+ result = shard.writeState(request);
+ assertInstanceOf(ShareSnapshotKey.class,
result.records().get(0).key());
+ }
+
@Test
public void testLastRedundantOffset() {
ShareCoordinatorOffsetsManager manager =
mock(ShareCoordinatorOffsetsManager.class);