This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 3d79b3401a8 MINOR: Cherry pick important fixes for 4.2.1 (#21910)
3d79b3401a8 is described below

commit 3d79b3401a8e789f4588ae13c5f1eb3d4b963aaa
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Mar 31 18:53:10 2026 +0530

    MINOR: Cherry pick important fixes for 4.2.1 (#21910)
    
    Cherry picking
    *
    
    
https://github.com/apache/kafka/commit/3e9ae03b9ca7d1465218912b9067f464adee4c81
    *
    
    
https://github.com/apache/kafka/commit/6eebb863a0335615b0dd491c8875e79a062d4756
    For 4.2.1 cut
    
    Reviewers: Andrew Schofield <[email protected]>
    
    ---------
    
    Co-authored-by: Chirag Wadhwa <[email protected]>
    Co-authored-by: majialong <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 123 +++++++++++++++++++++
 .../server/share/persister/PartitionFactory.java   |   7 +-
 .../coordinator/share/ShareCoordinatorShard.java   |   2 +-
 .../kafka/coordinator/share/ShareGroupOffset.java  |  11 +-
 .../share/ShareCoordinatorShardTest.java           |  40 +++++++
 5 files changed, 178 insertions(+), 5 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index d8095838fa0..32b05351e98 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -21,8 +21,10 @@ import kafka.server.KafkaBroker;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
@@ -3443,6 +3445,97 @@ public class ShareConsumerTest {
         }
     }
 
+    @ClusterTest
+    public void testSharePartitionLagAfterAlterShareGroupOffsets() {
+        String groupId = "group1";
+        try (Producer<byte[], byte[]> producer = createProducer();
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Producing 100 records to the topic partition.
+            for (int i = 0; i < 100; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+
+            // Create a new share consumer. Since the share.auto.offset.reset 
is not altered, it should be latest by default.
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure it joins the group and 
subscribes to the topic.
+            waitedPoll(shareConsumer, 2500L, 0, true, groupId, List.of(new 
TopicPartition(tp.topic(), 0)));
+            // Producing 5 additional records to the topic partition.
+            for (int i = 0; i < 5; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+            // Polling share consumer to make sure the records are consumed.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 5);
+            assertEquals(5, records.count());
+            // Accept the record first to move the offset forward and register 
the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Closing the share consumer so that the offsets can be altered.
+            shareConsumer.close();
+            // Alter the start offset of the share partition to 40.
+            alterShareGroupOffsets(adminClient, groupId, tp, 40L);
+            // After altering, the share partition start offset should be 40.
+            verifySharePartitionStartOffset(adminClient, groupId, tp, 40L);
+            // Verify that the lag is now 65 since the start offset is altered 
to 40 and there are total 105 records in the partition.
+            verifySharePartitionLag(adminClient, groupId, tp, 65L);
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Test failed with exception: " + e.getMessage());
+        }
+    }
+
+    @ClusterTest
+    public void testSharePartitionLagAfterDeleteShareGroupOffsets() {
+        String groupId = "group1";
+        alterShareAutoOffsetReset(groupId, "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             Admin adminClient = createAdminClient()) {
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"Message".getBytes());
+            // Producing 5 records to the topic partition.
+            for (int i = 0; i < 5; i++) {
+                producer.send(record);
+            }
+            producer.flush();
+            // Create a new share consumer.
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Polling share consumer to make sure it joins the group and 
consumes the produced records.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 5);
+            assertEquals(5, records.count());
+            // Accept the records first to move the offset forward and 
register the state with persister.
+            records.forEach(r -> shareConsumer.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer.commitSync();
+            // After accepting, the lag should be 0 because the record is 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Closing the share consumer so that the offsets can be deleted.
+            shareConsumer.close();
+            // Delete the share group offsets.
+            deleteShareGroupOffsets(adminClient, groupId, tp.topic());
+            // Verify that the share partition offsets are deleted.
+            verifySharePartitionOffsetsDeleted(adminClient, groupId, tp);
+            // Create a new share consumer.
+            ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(groupId, 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
+            shareConsumer2.subscribe(List.of(tp.topic()));
+            // Since the offsets are deleted, the share consumer should 
consume from the beginning (share.auto.offset.reset is earliest).
+            // Thus, the consumer should consume all 5 records again.
+            records = waitedPoll(shareConsumer2, 2500L, 5);
+            assertEquals(5, records.count());
+            // Accept the records first to move the offset forward and 
register the state with persister.
+            records.forEach(r -> shareConsumer2.acknowledge(r, 
AcknowledgeType.ACCEPT));
+            shareConsumer2.commitSync();
+            // After accepting, the lag should be 0 because the records are 
consumed successfully.
+            verifySharePartitionLag(adminClient, groupId, tp, 0L);
+            // Closing the share consumer so that the offsets can be deleted.
+            shareConsumer2.close();
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Test failed with exception: " + e.getMessage());
+        }
+    }
+
     @ClusterTest
     public void testFetchWithThrottledDelivery() {
         alterShareAutoOffsetReset("group1", "earliest");
@@ -4127,6 +4220,14 @@ public class ShareConsumerTest {
         return partitionResult;
     }
 
+    private void verifySharePartitionStartOffset(Admin adminClient, String 
groupId, TopicPartition tp, long expectedStartOffset) throws 
InterruptedException {
+        TestUtils.waitForCondition(() -> {
+            SharePartitionOffsetInfo sharePartitionOffsetInfo = 
sharePartitionOffsetInfo(adminClient, groupId, tp);
+            return sharePartitionOffsetInfo != null &&
+                sharePartitionOffsetInfo.startOffset() == expectedStartOffset;
+        }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to 
retrieve share partition lag");
+    }
+
     private void verifySharePartitionLag(Admin adminClient, String groupId, 
TopicPartition tp, long expectedLag) throws InterruptedException {
         TestUtils.waitForCondition(() -> {
             SharePartitionOffsetInfo sharePartitionOffsetInfo = 
sharePartitionOffsetInfo(adminClient, groupId, tp);
@@ -4136,6 +4237,28 @@ public class ShareConsumerTest {
         }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to 
retrieve share partition lag");
     }
 
+    private void verifySharePartitionOffsetsDeleted(Admin adminClient, String 
groupId, TopicPartition tp) throws InterruptedException {
+        TestUtils.waitForCondition(
+            () -> sharePartitionOffsetInfo(adminClient, groupId, tp) == null, 
+            DEFAULT_MAX_WAIT_MS, 
+            DEFAULT_POLL_INTERVAL_MS, 
+            () -> "Failed to retrieve share partition lag");
+    }
+
+    private void alterShareGroupOffsets(Admin adminClient, String groupId, 
TopicPartition topicPartition, Long newOffset) throws InterruptedException, 
ExecutionException {
+        adminClient.alterShareGroupOffsets(
+            groupId,
+            Map.of(topicPartition, newOffset),
+            new 
AlterShareGroupOffsetsOptions().timeoutMs(30000)).partitionResult(topicPartition).get();
+    }
+
+    private void deleteShareGroupOffsets(Admin adminClient, String groupId, 
String topic) throws InterruptedException, ExecutionException {
+        adminClient.deleteShareGroupOffsets(
+            groupId,
+            Set.of(topic),
+            new 
DeleteShareGroupOffsetsOptions().timeoutMs(30000)).topicResult(topic).get();
+    }
+
     private void alterShareRecordLockDurationMs(String groupId, int newValue) {
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
         Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
index 215e95ff085..b36c483a53f 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
@@ -42,7 +42,12 @@ public class PartitionFactory {
     }
 
     public static PartitionStateData newPartitionStateData(int partition, int 
stateEpoch, long startOffset) {
-        return new PartitionData(partition, stateEpoch, startOffset, 
UNINITIALIZED_DELIVERY_COMPLETE_COUNT, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, 
DEFAULT_LEADER_EPOCH, null);
+        // If the start offset is uninitialized (when the share partition is 
being initialized for the first time), the 
+        // consumption hasn't started yet, and lag cannot be calculated. Thus, 
deliveryCompleteCount is also set as -1. 
+        // But, if start offset is a non-negative value (when the start offset 
is altered), the lag can be calculated 
+        // from that point onward. Hence, we set deliveryCompleteCount to 0 in 
that case.
+        int deliveryCompleteCount = startOffset == UNINITIALIZED_START_OFFSET 
? UNINITIALIZED_DELIVERY_COMPLETE_COUNT : 0;
+        return new PartitionData(partition, stateEpoch, startOffset, 
deliveryCompleteCount, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, 
DEFAULT_LEADER_EPOCH, null);
     }
 
     public static PartitionErrorData newPartitionErrorData(int partition, 
short errorCode, String errorMessage) {
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index aa098090aef..caf7cad1db5 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -279,7 +279,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         // This is an incremental snapshot,
         // so we need to apply it to our current soft state.
         shareStateMap.compute(mapKey, (k, v) -> v == null ? offsetRecord : 
merge(v, value));
-        snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 0 : v + 1);
+        snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 1 : v + 1);
     }
 
     private void maybeUpdateLeaderEpochMap(SharePartitionKey mapKey, int 
leaderEpoch) {
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
index 9f1a2644257..ac17045733d 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java
@@ -35,6 +35,7 @@ public class ShareGroupOffset {
     public static final int NO_TIMESTAMP = 0;
     public static final int UNINITIALIZED_EPOCH = 0;
     public static final int UNINITIALIZED_DELIVERY_COMPLETE_COUNT = -1;
+    public static final int UNINITIALIZED_START_OFFSET = -1;
     public static final int DEFAULT_EPOCH = 0;
 
     private final int snapshotEpoch;
@@ -160,14 +161,18 @@ public class ShareGroupOffset {
     }
 
     public static ShareGroupOffset 
fromRequest(InitializeShareGroupStateRequestData.PartitionData data, int 
snapshotEpoch, long timestamp) {
-        // This method is invoked during InitializeShareGroupStateRequest. 
Since the deliveryCompleteCount is not yet
-        // known at this stage, it is initialized to its default value.
+        // This method is invoked during InitializeShareGroupStateRequest. If 
the start offset is uninitialized (when the 
+        // share partition is being initialized for the first time), the 
consumption hasn't started yet, and lag cannot
+        // be calculated. Thus, deliveryCompleteCount is also set as -1. But, 
if start offset is a non-negative value (when 
+        // the start offset is altered), the lag can be calculated from that 
point onward. Hence, we set deliveryCompleteCount
+        // to 0 in that case.
+        int deliveryCompleteCount = data.startOffset() == 
UNINITIALIZED_START_OFFSET ? UNINITIALIZED_DELIVERY_COMPLETE_COUNT : 0;
         return new ShareGroupOffset(
             snapshotEpoch,
             data.stateEpoch(),
             UNINITIALIZED_EPOCH,
             data.startOffset(),
-            UNINITIALIZED_DELIVERY_COMPLETE_COUNT,
+            deliveryCompleteCount,
             List.of(),
             timestamp,
             timestamp
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index aab870e4d8a..79b9bb9534c 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -70,6 +70,7 @@ import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -881,6 +882,45 @@ class ShareCoordinatorShardTest {
         verify(shard.getMetricsShard(), 
times(5)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
     }
 
+    @Test
+    public void testSnapshotUpdateCountBoundary() {
+        shard = new ShareCoordinatorShardBuilder()
+            
.setConfigOverrides(Map.of(ShareCoordinatorConfig.SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG,
 "2"))
+            .build();
+
+        initSharePartition(shard, SHARE_PARTITION_KEY);
+
+        WriteShareGroupStateRequestData request = new 
WriteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
WriteShareGroupStateRequestData.WriteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
WriteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)
+                    .setStartOffset(0)
+                    .setDeliveryCompleteCount(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(0)
+                    .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
+                        .setFirstOffset(0)
+                        .setLastOffset(10)
+                        .setDeliveryCount((short) 1)
+                        .setDeliveryState((byte) 0)))))));
+
+        // Write 1: update count 0 < limit 2, should produce update record.
+        CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> 
result = shard.writeState(request);
+        assertInstanceOf(ShareUpdateKey.class, result.records().get(0).key());
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        // Write 2: update count 1 < limit 2, should produce update record.
+        result = shard.writeState(request);
+        assertInstanceOf(ShareUpdateKey.class, result.records().get(0).key());
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        // Write 3: update count 2 >= limit 2, should produce snapshot record.
+        result = shard.writeState(request);
+        assertInstanceOf(ShareSnapshotKey.class, 
result.records().get(0).key());
+    }
+
     @Test
     public void testLastRedundantOffset() {
         ShareCoordinatorOffsetsManager manager = 
mock(ShareCoordinatorOffsetsManager.class);

Reply via email to