This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 696729f6d3d KAFKA-20116: Send client.rack via StreamsHeartbeatRequest 
(#21725)
696729f6d3d is described below

commit 696729f6d3d98de297955a8c445c3d54e68acfb4
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 16 10:11:57 2026 -0700

    KAFKA-20116: Send client.rack via StreamsHeartbeatRequest (#21725)
    
    This PR set the `client.rack` (if configured) in the
    StreamsHeartbeatRequest `RackId` field.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../StreamsGroupHeartbeatRequestManager.java       |  1 +
 .../consumer/internals/StreamsRebalanceData.java   |  8 ++++
 .../consumer/internals/AsyncKafkaConsumerTest.java |  8 ++--
 .../consumer/internals/RequestManagersTest.java    |  2 +-
 .../StreamsGroupHeartbeatRequestManagerTest.java   | 52 ++++++++++++++++++++++
 .../internals/StreamsRebalanceDataTest.java        | 33 +++++++++++++-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  1 +
 .../kafka/api/IntegrationTestHarness.scala         |  1 +
 .../streams/processor/internals/StreamThread.java  | 11 +++++
 .../DefaultStreamsRebalanceListenerTest.java       | 10 +++--
 .../processor/internals/StreamThreadTest.java      |  6 +++
 11 files changed, 124 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index c45e3da1b5e..6907e113f65 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -136,6 +136,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
                         .setPort(userEndpoint.port())
                     );
                 });
+                streamsRebalanceData.rackId().ifPresent(data::setRackId);
                 
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
                     .map(entry -> new 
StreamsGroupHeartbeatRequestData.KeyValue()
                         .setKey(entry.getKey())
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index ac3c53c47d9..a08472b6752 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -329,6 +329,8 @@ public class StreamsRebalanceData {
 
     private final Optional<HostInfo> endpoint;
 
+    private final Optional<String> rackId;
+
     private final Map<String, String> clientTags;
 
     private final Map<String, Subtopology> subtopologies;
@@ -345,10 +347,12 @@ public class StreamsRebalanceData {
 
     public StreamsRebalanceData(final UUID processId,
                                 final Optional<HostInfo> endpoint,
+                                final Optional<String> rackId,
                                 final Map<String, Subtopology> subtopologies,
                                 final Map<String, String> clientTags) {
         this.processId = Objects.requireNonNull(processId, "Process ID cannot 
be null");
         this.endpoint = Objects.requireNonNull(endpoint, "Endpoint cannot be 
null");
+        this.rackId = Objects.requireNonNull(rackId, "Rack ID cannot be null");
         this.subtopologies = Map.copyOf(Objects.requireNonNull(subtopologies, 
"Subtopologies cannot be null"));
         this.clientTags = Map.copyOf(Objects.requireNonNull(clientTags, 
"Client tags cannot be null"));
     }
@@ -361,6 +365,10 @@ public class StreamsRebalanceData {
         return endpoint;
     }
 
+    public Optional<String> rackId() {
+        return rackId;
+    }
+
     public Map<String, String> clientTags() {
         return clientTags;
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 40cc0e8df83..1df45a8fe94 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -1411,7 +1411,7 @@ public class AsyncKafkaConsumerTest {
     public void testStreamRebalanceData() {
         final String groupId = "consumerGroupA";
         try (final MockedStatic<RequestManagers> requestManagers = 
mockStatic(RequestManagers.class)) {
-            StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
+            StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(), 
Map.of(), Map.of());
             consumer = 
newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), 
streamsRebalanceData);
             final Optional<StreamsRebalanceData> groupMetadataUpdateListener = 
captureStreamRebalanceData(requestManagers);
             assertTrue(groupMetadataUpdateListener.isPresent());
@@ -2154,7 +2154,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void 
testCloseInvokesStreamsRebalanceListenerOnTasksRevokedWhenMemberEpochPositive() 
{
         final String groupId = "streamsGroup";
-        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(), 
Map.of(), Map.of());
         
         try (final MockedStatic<RequestManagers> requestManagers = 
mockStatic(RequestManagers.class)) {
             consumer = 
newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), 
streamsRebalanceData);
@@ -2174,7 +2174,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void 
testCloseInvokesStreamsRebalanceListenerOnAllTasksLostWhenMemberEpochZeroOrNegative()
 {
         final String groupId = "streamsGroup";
-        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(), 
Map.of(), Map.of());
         
         try (final MockedStatic<RequestManagers> requestManagers = 
mockStatic(RequestManagers.class)) {
             consumer = 
newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), 
streamsRebalanceData);
@@ -2194,7 +2194,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testCloseWrapsStreamsRebalanceListenerException() {
         final String groupId = "streamsGroup";
-        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(), 
Map.of(), Map.of());
         
         try (final MockedStatic<RequestManagers> requestManagers = 
mockStatic(RequestManagers.class)) {
             consumer = 
newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), 
streamsRebalanceData);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
index 1b869160f08..5d188469d3f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
@@ -117,7 +117,7 @@ public class RequestManagersTest {
             new Metrics(),
             mock(OffsetCommitCallbackInvoker.class),
             listener,
-            Optional.of(new StreamsRebalanceData(UUID.randomUUID(), 
Optional.empty(), Map.of(), Map.of())),
+            Optional.of(new StreamsRebalanceData(UUID.randomUUID(), 
Optional.empty(), Optional.empty(), Map.of(), Map.of())),
             new PositionsValidator(logContext, time, subscriptions, metadata)
         ).get();
         assertTrue(requestManagers.streamsMembershipManager.isPresent());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 35392e99398..3faaf6e05e6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -93,6 +93,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
     private static final int MEMBER_EPOCH = 1;
     private static final String INSTANCE_ID = "instance-id";
     private static final UUID PROCESS_ID = UUID.randomUUID();
+    private static final String RACK_ID = "datacenter-1";
     private static final StreamsRebalanceData.HostInfo ENDPOINT = new 
StreamsRebalanceData.HostInfo("localhost", 8080);
     private static final String SOURCE_TOPIC_1 = "sourceTopic1";
     private static final String SOURCE_TOPIC_2 = "sourceTopic2";
@@ -161,6 +162,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
     private final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
         PROCESS_ID,
         Optional.of(ENDPOINT),
+        Optional.of(RACK_ID),
         SUBTOPOLOGIES,
         CLIENT_TAGS
     );
@@ -624,6 +626,56 @@ class StreamsGroupHeartbeatRequestManagerTest {
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatRequestRackIdSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new StreamsGroupHeartbeatRequestManager.HeartbeatState(
+                streamsRebalanceData,
+                membershipManager,
+                1234
+            );
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(RACK_ID, requestData1.rackId());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.rackId());
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideNonJoiningStates")
+    public void testBuildingHeartbeatRequestClientTagSentWhenJoining(final 
MemberState memberState) {
+        final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState =
+            new StreamsGroupHeartbeatRequestManager.HeartbeatState(
+                streamsRebalanceData,
+                membershipManager,
+                1234
+            );
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+        StreamsGroupHeartbeatRequestData requestData1 = 
heartbeatState.buildRequestData();
+
+        assertEquals(CLIENT_TAGS.entrySet().stream()
+            .map(entry -> {
+                StreamsGroupHeartbeatRequestData.KeyValue kv = new 
StreamsGroupHeartbeatRequestData.KeyValue();
+                kv.setKey(entry.getKey());
+                kv.setValue(entry.getValue());
+                return kv;
+            }).collect(Collectors.toList()), requestData1.clientTags());
+
+        when(membershipManager.state()).thenReturn(memberState);
+
+        StreamsGroupHeartbeatRequestData nonJoiningRequestData = 
heartbeatState.buildRequestData();
+
+        assertNull(nonJoiningRequestData.clientTags());
+    }
+
     @ParameterizedTest
     @MethodSource("provideNonJoiningStates")
     public void testBuildingHeartbeatRequestTopologySentWhenJoining(final 
MemberState memberState) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
index 237dd4f0993..d383b043e46 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
@@ -288,7 +288,7 @@ public class StreamsRebalanceDataTest {
     }
 
     @Test
-    public void 
streamsRebalanceDataShouldNotHaveModifiableSubtopologiesAndClientTags() {
+    public void 
streamsRebalanceDataShouldNotHaveModifiableSubtopologiesClientTags() {
         final UUID processId = UUID.randomUUID();
         final Optional<StreamsRebalanceData.HostInfo> endpoint = 
Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
         final Map<String, StreamsRebalanceData.Subtopology> subtopologies = 
new HashMap<>();
@@ -296,6 +296,7 @@ public class StreamsRebalanceDataTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
             processId,
             endpoint,
+            Optional.empty(),
             subtopologies,
             clientTags
         );
@@ -327,6 +328,7 @@ public class StreamsRebalanceDataTest {
             () -> new StreamsRebalanceData(
                 null,
                 endpoint,
+                Optional.empty(),
                 subtopologies,
                 clientTags
             )
@@ -345,6 +347,7 @@ public class StreamsRebalanceDataTest {
             () -> new StreamsRebalanceData(
                 processId,
                 null,
+                Optional.empty(),
                 subtopologies,
                 clientTags
             )
@@ -363,6 +366,7 @@ public class StreamsRebalanceDataTest {
             () -> new StreamsRebalanceData(
                 processId,
                 endpoint,
+                Optional.empty(),
                 null,
                 clientTags
             )
@@ -370,6 +374,26 @@ public class StreamsRebalanceDataTest {
         assertEquals("Subtopologies cannot be null", exception.getMessage());
     }
 
+    @Test
+    public void streamsRebalanceDataShouldNotAcceptNullRackId() {
+        final UUID processId = UUID.randomUUID();
+        final Optional<StreamsRebalanceData.HostInfo> endpoint = 
Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
+        final Map<String, StreamsRebalanceData.Subtopology> subtopologies = 
new HashMap<>();
+        final Map<String, String> clientTags = Map.of("clientTag1", 
"clientTagValue1");
+
+        final Exception exception = assertThrows(
+            NullPointerException.class,
+            () -> new StreamsRebalanceData(
+                processId,
+                endpoint,
+                null,
+                subtopologies,
+                clientTags
+            )
+        );
+        assertEquals("Rack ID cannot be null", exception.getMessage());
+    }
+
     @Test
     public void streamsRebalanceDataShouldNotAcceptNullClientTags() {
         final UUID processId = UUID.randomUUID();
@@ -381,6 +405,7 @@ public class StreamsRebalanceDataTest {
             () -> new StreamsRebalanceData(
                 processId,
                 endpoint,
+                Optional.empty(),
                 subtopologies,
                 null
             )
@@ -397,6 +422,7 @@ public class StreamsRebalanceDataTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
             processId,
             endpoint,
+            Optional.empty(),
             subtopologies,
             clientTags
         );
@@ -413,6 +439,7 @@ public class StreamsRebalanceDataTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
             processId,
             endpoint,
+            Optional.empty(),
             subtopologies,
             clientTags
         );
@@ -429,6 +456,7 @@ public class StreamsRebalanceDataTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
             processId,
             endpoint,
+            Optional.empty(),
             subtopologies,
             clientTags
         );
@@ -445,6 +473,7 @@ public class StreamsRebalanceDataTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
             processId,
             endpoint,
+            Optional.empty(),
             subtopologies,
             clientTags
         );
@@ -463,6 +492,7 @@ public class StreamsRebalanceDataTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
                 processId,
                 endpoint,
+                Optional.empty(),
                 subtopologies,
                 clientTags
         );
@@ -481,6 +511,7 @@ public class StreamsRebalanceDataTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
                 processId,
                 endpoint,
+                Optional.empty(),
                 subtopologies,
                 clientTags
         );
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 65deb347ebf..e2ac0e1433c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3905,6 +3905,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     val consumer = createStreamsConsumer(streamsRebalanceData = new 
StreamsRebalanceData(
       UUID.randomUUID(),
       Optional.empty(),
+      Optional.empty(),
       util.Map.of(
         "subtopology-0", new StreamsRebalanceData.Subtopology(
           if (topicAsSourceTopic) util.Set.of(sourceTopic, topic) else 
util.Set.of(sourceTopic),
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 9271b96d563..d8a39f5f317 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -257,6 +257,7 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     val streamsRebalanceData = new StreamsRebalanceData(
       UUID.randomUUID(),
       Optional.empty(),
+      Optional.empty(),
       util.Map.of(
         "subtopology-0", new StreamsRebalanceData.Subtopology(
           inputTopics.asJava,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a5002a17378..29724166e7a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation;
@@ -558,6 +559,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                     processId,
                     config,
                     
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
+                    parseRackId((String) 
config.originals().get(CommonClientConfigs.CLIENT_RACK_CONFIG)),
                     topologyMetadata
                 )
             );
@@ -671,9 +673,17 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
     }
 
+    private static Optional<String> parseRackId(final String rackId) {
+        if (rackId == null || rackId.isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(rackId);
+    }
+
     private static StreamsRebalanceData initStreamsRebalanceData(final UUID 
processId,
                                                                  final 
StreamsConfig config,
                                                                  final 
Optional<StreamsRebalanceData.HostInfo> endpoint,
+                                                                 final 
Optional<String> rackId,
                                                                  final 
TopologyMetadata topologyMetadata) {
         final InternalTopologyBuilder internalTopologyBuilder = 
topologyMetadata.lookupBuilderForNamedTopology(null);
 
@@ -682,6 +692,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         return new StreamsRebalanceData(
             processId,
             endpoint,
+            rackId,
             subtopologies,
             config.getClientTags()
         );
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
index 81103b3a2ea..f94277fa3df 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
@@ -95,6 +95,7 @@ public class DefaultStreamsRebalanceListenerTest {
         createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(
             UUID.randomUUID(),
             Optional.empty(),
+            Optional.empty(),
             Map.of(
                 "1",
                 new StreamsRebalanceData.Subtopology(
@@ -130,7 +131,7 @@ public class DefaultStreamsRebalanceListenerTest {
         final Exception exception = new RuntimeException("sample exception");
         doThrow(exception).when(taskManager).handleRevocation(any());
 
-        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
+        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(), 
Map.of(), Map.of()));
 
         final Exception actualException = assertThrows(RuntimeException.class, 
() -> defaultStreamsRebalanceListener.onTasksRevoked(Set.of()));
 
@@ -254,6 +255,7 @@ public class DefaultStreamsRebalanceListenerTest {
         createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(
             UUID.randomUUID(),
             Optional.empty(),
+            Optional.empty(),
             Map.of(
                 "1",
                 new StreamsRebalanceData.Subtopology(
@@ -288,6 +290,7 @@ public class DefaultStreamsRebalanceListenerTest {
         createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(
             UUID.randomUUID(),
             Optional.empty(),
+            Optional.empty(),
             Map.of(
                 "1",
                 new StreamsRebalanceData.Subtopology(
@@ -328,7 +331,7 @@ public class DefaultStreamsRebalanceListenerTest {
             return null;
         }).when(taskManager).handleLostAll();
 
-        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
+        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(), 
Map.of(), Map.of()));
 
         defaultStreamsRebalanceListener.onAllTasksLost();
 
@@ -348,6 +351,7 @@ public class DefaultStreamsRebalanceListenerTest {
         createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(
             UUID.randomUUID(),
             Optional.empty(),
+            Optional.empty(),
             Map.of(
                 "1",
                 new StreamsRebalanceData.Subtopology(
@@ -378,7 +382,7 @@ public class DefaultStreamsRebalanceListenerTest {
             throw exception;
         }).when(taskManager).handleAssignment(any(), any());
 
-        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
+        createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(), 
Map.of(), Map.of()));
 
         assertThrows(RuntimeException.class, () -> 
defaultStreamsRebalanceListener.onTasksAssigned(
             new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), 
false)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 3d5c59f7f80..3ea49bc00ef 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -3576,6 +3576,7 @@ public class StreamThreadTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
             UUID.randomUUID(),
             Optional.empty(),
+            Optional.empty(),
             Map.of(),
             Map.of()
         );
@@ -3635,6 +3636,7 @@ public class StreamThreadTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
                 UUID.randomUUID(),
                 Optional.empty(),
+                Optional.empty(),
                 Map.of(),
                 Map.of()
         );
@@ -3704,6 +3706,7 @@ public class StreamThreadTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
                 UUID.randomUUID(),
                 Optional.empty(),
+                Optional.empty(),
                 Map.of(),
                 Map.of()
         );
@@ -3765,6 +3768,7 @@ public class StreamThreadTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
             UUID.randomUUID(),
             Optional.empty(),
+            Optional.empty(),
             Map.of(),
             Map.of()
         );
@@ -3824,6 +3828,7 @@ public class StreamThreadTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
                 UUID.randomUUID(),
                 Optional.empty(),
+                Optional.empty(),
                 Map.of(),
                 Map.of()
         );
@@ -3893,6 +3898,7 @@ public class StreamThreadTest {
         final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
                 UUID.randomUUID(),
                 Optional.empty(),
+                Optional.empty(),
                 Map.of(),
                 Map.of()
         );

Reply via email to