This is an automated email from the ASF dual-hosted git repository.

apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d069b7cd166 MINOR: Cleanup DLQ record parameter. (#22480)
d069b7cd166 is described below

commit d069b7cd166913fa8c571c5e9101d2697d652eb0
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri Jun 5 17:22:40 2026 +0530

    MINOR: Cleanup DLQ record parameter. (#22480)
    
    * Remove `preserveRecordData` attribute from
    `ShareGroupDLQRecordParameter` as this information will be retrieved
    from the share group dynamic config
    `errors.deadletterqueue.copy.record.enable`.
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  2 +-
 .../share/dlq/ShareGroupDLQRecordParameter.java    | 16 +++++++--------
 .../share/dlq/ShareGroupDLQStateManagerTest.java   | 23 +++++++++++-----------
 3 files changed, 19 insertions(+), 22 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index c8b4decd0e1..4661752c73f 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -3351,7 +3351,7 @@ public class SharePartition {
         // Step 1: Enqueue to DLQ
         shareGroupDLQManager.enqueue(new ShareGroupDLQRecordParameter(
             groupId, topicIdPartition, firstOffset, lastOffset,
-            Optional.of(deliveryCount), Optional.ofNullable(dlqCause), false
+            Optional.of(deliveryCount), Optional.ofNullable(dlqCause)
         )).whenComplete((v1, dlqException) -> {
             if (dlqException != null) {
                 log.error("Failed to write to DLQ, proceeding to ARCHIVED 
regardless.", dlqException);
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
 
b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
index 2e7c728542d..2e7383e4bbc 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
@@ -25,13 +25,12 @@ import java.util.Optional;
  * Record representing information needed from callers of {@link 
ShareGroupDLQManager#enqueue}. Inclusion
  * of first and last offset allows passing batch information as well.
  *
- * @param groupId            The share group id of the message being recorded.
- * @param topicIdPartition   The topic and partition information of the 
message.
- * @param firstOffset        The first offset of the records in the kafka 
topic partition.
- * @param lastOffset         The last offset of the records in the kafka topic 
partition.
- * @param deliveryCount      If known, the number of times the message was 
delivered to the share consumer.
- * @param cause              If known, throwable representing the reason for 
queueing the message.
- * @param preserveRecordData If true, store original record headers, key and 
value in the dlq record as well.
+ * @param groupId          The share group id of the message being recorded.
+ * @param topicIdPartition The topic and partition information of the message.
+ * @param firstOffset      The first offset of the records in the kafka topic 
partition.
+ * @param lastOffset       The last offset of the records in the kafka topic 
partition.
+ * @param deliveryCount    If known, the number of times the message was 
delivered to the share consumer.
+ * @param cause            If known, throwable representing the reason for 
queueing the message.
  */
 public record ShareGroupDLQRecordParameter(
     String groupId,
@@ -39,7 +38,6 @@ public record ShareGroupDLQRecordParameter(
     long firstOffset,
     long lastOffset,
     Optional<Short> deliveryCount,
-    Optional<Throwable> cause,
-    boolean preserveRecordData
+    Optional<Throwable> cause
 ) {
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
 
b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
index eac929140cd..6eb5b659250 100644
--- 
a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java
@@ -168,8 +168,7 @@ class ShareGroupDLQStateManagerTest {
             0L,
             2L,
             Optional.of((short) 1),
-            Optional.of(new RuntimeException("simulated cause")),
-            false
+            Optional.of(new RuntimeException("simulated cause"))
         );
     }
 
@@ -972,12 +971,12 @@ class ShareGroupDLQStateManagerTest {
             GROUP_ID,
             new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
             0L, 0L,
-            Optional.empty(), Optional.empty(), false);
+            Optional.empty(), Optional.empty());
         ShareGroupDLQRecordParameter p1 = new ShareGroupDLQRecordParameter(
             GROUP_ID,
             new TopicIdPartition(SOURCE_TOPIC_ID, 1, "source-topic"),
             0L, 0L,
-            Optional.empty(), Optional.empty(), false);
+            Optional.empty(), Optional.empty());
 
         CompletableFuture<Void> r0 = stateManager.dlq(p0);
         CompletableFuture<Void> r1 = stateManager.dlq(p1);
@@ -1043,7 +1042,7 @@ class ShareGroupDLQStateManagerTest {
             GROUP_ID,
             new TopicIdPartition(SOURCE_TOPIC_ID, 0, null),
             0L, 0L,
-            Optional.empty(), Optional.empty(), false);
+            Optional.empty(), Optional.empty());
         assertNull(stateManager.dlq(p).get(10, TimeUnit.SECONDS));
 
         assertEquals(1, capturedProduces.size());
@@ -1123,17 +1122,17 @@ class ShareGroupDLQStateManagerTest {
             groupA,
             new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
             0L, 0L,
-            Optional.empty(), Optional.empty(), false);
+            Optional.empty(), Optional.empty());
         ShareGroupDLQRecordParameter pB = new ShareGroupDLQRecordParameter(
             groupB,
             new TopicIdPartition(SOURCE_TOPIC_ID, 1, "source-topic"),
             0L, 0L,
-            Optional.empty(), Optional.empty(), false);
+            Optional.empty(), Optional.empty());
         ShareGroupDLQRecordParameter pC = new ShareGroupDLQRecordParameter(
             groupC,
             new TopicIdPartition(SOURCE_TOPIC_ID, 2, "source-topic"),
             0L, 0L,
-            Optional.empty(), Optional.empty(), false);
+            Optional.empty(), Optional.empty());
 
         CompletableFuture<Void> rA = stateManager.dlq(pA);
         CompletableFuture<Void> rB = stateManager.dlq(pB);
@@ -1206,7 +1205,7 @@ class ShareGroupDLQStateManagerTest {
                 GROUP_ID,
                 new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
                 0L, 0L,
-                Optional.empty(), Optional.empty(), false));
+                Optional.empty(), Optional.empty()));
         }
 
         // Wait until the callback observes nodeRPCMap with more than 2 
handlers piled up.
@@ -1352,7 +1351,7 @@ class ShareGroupDLQStateManagerTest {
             new ShareGroupDLQRecordParameter(GROUP_ID,
                 new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
                 0L, 0L,
-                Optional.empty(), Optional.empty(), false),
+                Optional.empty(), Optional.empty()),
             goodFuture,
             ShareGroupDLQStateManager.REQUEST_BACKOFF_MS,
             ShareGroupDLQStateManager.REQUEST_BACKOFF_MAX_MS,
@@ -1367,7 +1366,7 @@ class ShareGroupDLQStateManagerTest {
             new ShareGroupDLQRecordParameter(GROUP_ID,
                 new TopicIdPartition(SOURCE_TOPIC_ID, 0, "source-topic"),
                 0L, 0L,
-                Optional.empty(), Optional.empty(), false),
+                Optional.empty(), Optional.empty()),
             brokenFuture,
             ShareGroupDLQStateManager.REQUEST_BACKOFF_MS,
             ShareGroupDLQStateManager.REQUEST_BACKOFF_MAX_MS,
@@ -1399,7 +1398,7 @@ class ShareGroupDLQStateManagerTest {
             groupId,
             new TopicIdPartition(SOURCE_TOPIC_ID, sourcePartition, 
"source-topic"),
             0L, 0L,
-            Optional.empty(), Optional.empty(), false);
+            Optional.empty(), Optional.empty());
         return manager.new ProduceRequestHandler(
             param,
             new CompletableFuture<>(),

Reply via email to