This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 696729f6d3d KAFKA-20116: Send client.rack via StreamsHeartbeatRequest
(#21725)
696729f6d3d is described below
commit 696729f6d3d98de297955a8c445c3d54e68acfb4
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 16 10:11:57 2026 -0700
KAFKA-20116: Send client.rack via StreamsHeartbeatRequest (#21725)
This PR set the `client.rack` (if configured) in the
StreamsHeartbeatRequest `RackId` field.
Reviewers: Lucas Brutschy <[email protected]>
---
.../StreamsGroupHeartbeatRequestManager.java | 1 +
.../consumer/internals/StreamsRebalanceData.java | 8 ++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 8 ++--
.../consumer/internals/RequestManagersTest.java | 2 +-
.../StreamsGroupHeartbeatRequestManagerTest.java | 52 ++++++++++++++++++++++
.../internals/StreamsRebalanceDataTest.java | 33 +++++++++++++-
.../kafka/api/AuthorizerIntegrationTest.scala | 1 +
.../kafka/api/IntegrationTestHarness.scala | 1 +
.../streams/processor/internals/StreamThread.java | 11 +++++
.../DefaultStreamsRebalanceListenerTest.java | 10 +++--
.../processor/internals/StreamThreadTest.java | 6 +++
11 files changed, 124 insertions(+), 9 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index c45e3da1b5e..6907e113f65 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -136,6 +136,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
.setPort(userEndpoint.port())
);
});
+ streamsRebalanceData.rackId().ifPresent(data::setRackId);
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
.map(entry -> new
StreamsGroupHeartbeatRequestData.KeyValue()
.setKey(entry.getKey())
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index ac3c53c47d9..a08472b6752 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -329,6 +329,8 @@ public class StreamsRebalanceData {
private final Optional<HostInfo> endpoint;
+ private final Optional<String> rackId;
+
private final Map<String, String> clientTags;
private final Map<String, Subtopology> subtopologies;
@@ -345,10 +347,12 @@ public class StreamsRebalanceData {
public StreamsRebalanceData(final UUID processId,
final Optional<HostInfo> endpoint,
+ final Optional<String> rackId,
final Map<String, Subtopology> subtopologies,
final Map<String, String> clientTags) {
this.processId = Objects.requireNonNull(processId, "Process ID cannot
be null");
this.endpoint = Objects.requireNonNull(endpoint, "Endpoint cannot be
null");
+ this.rackId = Objects.requireNonNull(rackId, "Rack ID cannot be null");
this.subtopologies = Map.copyOf(Objects.requireNonNull(subtopologies,
"Subtopologies cannot be null"));
this.clientTags = Map.copyOf(Objects.requireNonNull(clientTags,
"Client tags cannot be null"));
}
@@ -361,6 +365,10 @@ public class StreamsRebalanceData {
return endpoint;
}
+ public Optional<String> rackId() {
+ return rackId;
+ }
+
public Map<String, String> clientTags() {
return clientTags;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 40cc0e8df83..1df45a8fe94 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -1411,7 +1411,7 @@ public class AsyncKafkaConsumerTest {
public void testStreamRebalanceData() {
final String groupId = "consumerGroupA";
try (final MockedStatic<RequestManagers> requestManagers =
mockStatic(RequestManagers.class)) {
- StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
+ StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(),
Map.of(), Map.of());
consumer =
newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId),
streamsRebalanceData);
final Optional<StreamsRebalanceData> groupMetadataUpdateListener =
captureStreamRebalanceData(requestManagers);
assertTrue(groupMetadataUpdateListener.isPresent());
@@ -2154,7 +2154,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void
testCloseInvokesStreamsRebalanceListenerOnTasksRevokedWhenMemberEpochPositive()
{
final String groupId = "streamsGroup";
- final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(),
Map.of(), Map.of());
try (final MockedStatic<RequestManagers> requestManagers =
mockStatic(RequestManagers.class)) {
consumer =
newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId),
streamsRebalanceData);
@@ -2174,7 +2174,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void
testCloseInvokesStreamsRebalanceListenerOnAllTasksLostWhenMemberEpochZeroOrNegative()
{
final String groupId = "streamsGroup";
- final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(),
Map.of(), Map.of());
try (final MockedStatic<RequestManagers> requestManagers =
mockStatic(RequestManagers.class)) {
consumer =
newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId),
streamsRebalanceData);
@@ -2194,7 +2194,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testCloseWrapsStreamsRebalanceListenerException() {
final String groupId = "streamsGroup";
- final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(),
Map.of(), Map.of());
try (final MockedStatic<RequestManagers> requestManagers =
mockStatic(RequestManagers.class)) {
consumer =
newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId),
streamsRebalanceData);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
index 1b869160f08..5d188469d3f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
@@ -117,7 +117,7 @@ public class RequestManagersTest {
new Metrics(),
mock(OffsetCommitCallbackInvoker.class),
listener,
- Optional.of(new StreamsRebalanceData(UUID.randomUUID(),
Optional.empty(), Map.of(), Map.of())),
+ Optional.of(new StreamsRebalanceData(UUID.randomUUID(),
Optional.empty(), Optional.empty(), Map.of(), Map.of())),
new PositionsValidator(logContext, time, subscriptions, metadata)
).get();
assertTrue(requestManagers.streamsMembershipManager.isPresent());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 35392e99398..3faaf6e05e6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -93,6 +93,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
private static final int MEMBER_EPOCH = 1;
private static final String INSTANCE_ID = "instance-id";
private static final UUID PROCESS_ID = UUID.randomUUID();
+ private static final String RACK_ID = "datacenter-1";
private static final StreamsRebalanceData.HostInfo ENDPOINT = new
StreamsRebalanceData.HostInfo("localhost", 8080);
private static final String SOURCE_TOPIC_1 = "sourceTopic1";
private static final String SOURCE_TOPIC_2 = "sourceTopic2";
@@ -161,6 +162,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
private final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
PROCESS_ID,
Optional.of(ENDPOINT),
+ Optional.of(RACK_ID),
SUBTOPOLOGIES,
CLIENT_TAGS
);
@@ -624,6 +626,56 @@ class StreamsGroupHeartbeatRequestManagerTest {
}
}
+ @ParameterizedTest
+ @MethodSource("provideNonJoiningStates")
+ public void testBuildingHeartbeatRequestRackIdSentWhenJoining(final
MemberState memberState) {
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new StreamsGroupHeartbeatRequestManager.HeartbeatState(
+ streamsRebalanceData,
+ membershipManager,
+ 1234
+ );
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+ StreamsGroupHeartbeatRequestData requestData1 =
heartbeatState.buildRequestData();
+
+ assertEquals(RACK_ID, requestData1.rackId());
+
+ when(membershipManager.state()).thenReturn(memberState);
+
+ StreamsGroupHeartbeatRequestData nonJoiningRequestData =
heartbeatState.buildRequestData();
+
+ assertNull(nonJoiningRequestData.rackId());
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideNonJoiningStates")
+ public void testBuildingHeartbeatRequestClientTagSentWhenJoining(final
MemberState memberState) {
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new StreamsGroupHeartbeatRequestManager.HeartbeatState(
+ streamsRebalanceData,
+ membershipManager,
+ 1234
+ );
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+ StreamsGroupHeartbeatRequestData requestData1 =
heartbeatState.buildRequestData();
+
+ assertEquals(CLIENT_TAGS.entrySet().stream()
+ .map(entry -> {
+ StreamsGroupHeartbeatRequestData.KeyValue kv = new
StreamsGroupHeartbeatRequestData.KeyValue();
+ kv.setKey(entry.getKey());
+ kv.setValue(entry.getValue());
+ return kv;
+ }).collect(Collectors.toList()), requestData1.clientTags());
+
+ when(membershipManager.state()).thenReturn(memberState);
+
+ StreamsGroupHeartbeatRequestData nonJoiningRequestData =
heartbeatState.buildRequestData();
+
+ assertNull(nonJoiningRequestData.clientTags());
+ }
+
@ParameterizedTest
@MethodSource("provideNonJoiningStates")
public void testBuildingHeartbeatRequestTopologySentWhenJoining(final
MemberState memberState) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
index 237dd4f0993..d383b043e46 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
@@ -288,7 +288,7 @@ public class StreamsRebalanceDataTest {
}
@Test
- public void
streamsRebalanceDataShouldNotHaveModifiableSubtopologiesAndClientTags() {
+ public void
streamsRebalanceDataShouldNotHaveModifiableSubtopologiesClientTags() {
final UUID processId = UUID.randomUUID();
final Optional<StreamsRebalanceData.HostInfo> endpoint =
Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
final Map<String, StreamsRebalanceData.Subtopology> subtopologies =
new HashMap<>();
@@ -296,6 +296,7 @@ public class StreamsRebalanceDataTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
processId,
endpoint,
+ Optional.empty(),
subtopologies,
clientTags
);
@@ -327,6 +328,7 @@ public class StreamsRebalanceDataTest {
() -> new StreamsRebalanceData(
null,
endpoint,
+ Optional.empty(),
subtopologies,
clientTags
)
@@ -345,6 +347,7 @@ public class StreamsRebalanceDataTest {
() -> new StreamsRebalanceData(
processId,
null,
+ Optional.empty(),
subtopologies,
clientTags
)
@@ -363,6 +366,7 @@ public class StreamsRebalanceDataTest {
() -> new StreamsRebalanceData(
processId,
endpoint,
+ Optional.empty(),
null,
clientTags
)
@@ -370,6 +374,26 @@ public class StreamsRebalanceDataTest {
assertEquals("Subtopologies cannot be null", exception.getMessage());
}
+ @Test
+ public void streamsRebalanceDataShouldNotAcceptNullRackId() {
+ final UUID processId = UUID.randomUUID();
+ final Optional<StreamsRebalanceData.HostInfo> endpoint =
Optional.of(new StreamsRebalanceData.HostInfo("localhost", 9090));
+ final Map<String, StreamsRebalanceData.Subtopology> subtopologies =
new HashMap<>();
+ final Map<String, String> clientTags = Map.of("clientTag1",
"clientTagValue1");
+
+ final Exception exception = assertThrows(
+ NullPointerException.class,
+ () -> new StreamsRebalanceData(
+ processId,
+ endpoint,
+ null,
+ subtopologies,
+ clientTags
+ )
+ );
+ assertEquals("Rack ID cannot be null", exception.getMessage());
+ }
+
@Test
public void streamsRebalanceDataShouldNotAcceptNullClientTags() {
final UUID processId = UUID.randomUUID();
@@ -381,6 +405,7 @@ public class StreamsRebalanceDataTest {
() -> new StreamsRebalanceData(
processId,
endpoint,
+ Optional.empty(),
subtopologies,
null
)
@@ -397,6 +422,7 @@ public class StreamsRebalanceDataTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
processId,
endpoint,
+ Optional.empty(),
subtopologies,
clientTags
);
@@ -413,6 +439,7 @@ public class StreamsRebalanceDataTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
processId,
endpoint,
+ Optional.empty(),
subtopologies,
clientTags
);
@@ -429,6 +456,7 @@ public class StreamsRebalanceDataTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
processId,
endpoint,
+ Optional.empty(),
subtopologies,
clientTags
);
@@ -445,6 +473,7 @@ public class StreamsRebalanceDataTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
processId,
endpoint,
+ Optional.empty(),
subtopologies,
clientTags
);
@@ -463,6 +492,7 @@ public class StreamsRebalanceDataTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
processId,
endpoint,
+ Optional.empty(),
subtopologies,
clientTags
);
@@ -481,6 +511,7 @@ public class StreamsRebalanceDataTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
processId,
endpoint,
+ Optional.empty(),
subtopologies,
clientTags
);
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 65deb347ebf..e2ac0e1433c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3905,6 +3905,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
val consumer = createStreamsConsumer(streamsRebalanceData = new
StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
util.Map.of(
"subtopology-0", new StreamsRebalanceData.Subtopology(
if (topicAsSourceTopic) util.Set.of(sourceTopic, topic) else
util.Set.of(sourceTopic),
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 9271b96d563..d8a39f5f317 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -257,6 +257,7 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
val streamsRebalanceData = new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
util.Map.of(
"subtopology-0", new StreamsRebalanceData.Subtopology(
inputTopics.asJava,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a5002a17378..29724166e7a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation;
@@ -558,6 +559,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
processId,
config,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
+ parseRackId((String)
config.originals().get(CommonClientConfigs.CLIENT_RACK_CONFIG)),
topologyMetadata
)
);
@@ -671,9 +673,17 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
}
+ private static Optional<String> parseRackId(final String rackId) {
+ if (rackId == null || rackId.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(rackId);
+ }
+
private static StreamsRebalanceData initStreamsRebalanceData(final UUID
processId,
final
StreamsConfig config,
final
Optional<StreamsRebalanceData.HostInfo> endpoint,
+ final
Optional<String> rackId,
final
TopologyMetadata topologyMetadata) {
final InternalTopologyBuilder internalTopologyBuilder =
topologyMetadata.lookupBuilderForNamedTopology(null);
@@ -682,6 +692,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
return new StreamsRebalanceData(
processId,
endpoint,
+ rackId,
subtopologies,
config.getClientTags()
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
index 81103b3a2ea..f94277fa3df 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
@@ -95,6 +95,7 @@ public class DefaultStreamsRebalanceListenerTest {
createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
Map.of(
"1",
new StreamsRebalanceData.Subtopology(
@@ -130,7 +131,7 @@ public class DefaultStreamsRebalanceListenerTest {
final Exception exception = new RuntimeException("sample exception");
doThrow(exception).when(taskManager).handleRevocation(any());
- createRebalanceListenerWithRebalanceData(new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
+ createRebalanceListenerWithRebalanceData(new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(),
Map.of(), Map.of()));
final Exception actualException = assertThrows(RuntimeException.class,
() -> defaultStreamsRebalanceListener.onTasksRevoked(Set.of()));
@@ -254,6 +255,7 @@ public class DefaultStreamsRebalanceListenerTest {
createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
Map.of(
"1",
new StreamsRebalanceData.Subtopology(
@@ -288,6 +290,7 @@ public class DefaultStreamsRebalanceListenerTest {
createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
Map.of(
"1",
new StreamsRebalanceData.Subtopology(
@@ -328,7 +331,7 @@ public class DefaultStreamsRebalanceListenerTest {
return null;
}).when(taskManager).handleLostAll();
- createRebalanceListenerWithRebalanceData(new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
+ createRebalanceListenerWithRebalanceData(new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(),
Map.of(), Map.of()));
defaultStreamsRebalanceListener.onAllTasksLost();
@@ -348,6 +351,7 @@ public class DefaultStreamsRebalanceListenerTest {
createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
Map.of(
"1",
new StreamsRebalanceData.Subtopology(
@@ -378,7 +382,7 @@ public class DefaultStreamsRebalanceListenerTest {
throw exception;
}).when(taskManager).handleAssignment(any(), any());
- createRebalanceListenerWithRebalanceData(new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
+ createRebalanceListenerWithRebalanceData(new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Optional.empty(),
Map.of(), Map.of()));
assertThrows(RuntimeException.class, () ->
defaultStreamsRebalanceListener.onTasksAssigned(
new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(),
false)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 3d5c59f7f80..3ea49bc00ef 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -3576,6 +3576,7 @@ public class StreamThreadTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
Map.of(),
Map.of()
);
@@ -3635,6 +3636,7 @@ public class StreamThreadTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
Map.of(),
Map.of()
);
@@ -3704,6 +3706,7 @@ public class StreamThreadTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
Map.of(),
Map.of()
);
@@ -3765,6 +3768,7 @@ public class StreamThreadTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
Map.of(),
Map.of()
);
@@ -3824,6 +3828,7 @@ public class StreamThreadTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
Map.of(),
Map.of()
);
@@ -3893,6 +3898,7 @@ public class StreamThreadTest {
final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
+ Optional.empty(),
Map.of(),
Map.of()
);