lucasbru commented on code in PR #19802:
URL: https://github.com/apache/kafka/pull/19802#discussion_r2121800290
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -54,7 +54,7 @@ public void
testConstructorWithNullInternalTopicsToBeCreated() {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
0,
- Optional.of(new TreeMap<>()),
+ 0, Optional.of(new TreeMap<>()),
Review Comment:
```suggestion
0,
Optional.of(new TreeMap<>()),
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -78,7 +78,7 @@ public void testConstructorWithInvalidTopologyEpoch() {
assertThrows(IllegalArgumentException.class,
() -> new ConfiguredTopology(
-1,
- Optional.of(new TreeMap<>()),
+ 0, Optional.of(new TreeMap<>()),
Review Comment:
```suggestion
0,
Optional.of(new TreeMap<>()),
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16707,13 +16732,18 @@ public void
testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() {
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
- .withPartitionMetadata(Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- ))
+ .withMetadataHash(groupMetadataHash)
)
.build();
+ context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
Review Comment:
Same as above.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -17158,13 +17214,17 @@ public void testStreamsReconciliationProcess() {
TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5),
TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
.withTargetAssignmentEpoch(10)
- .withPartitionMetadata(Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- ))
+ .withMetadataHash(groupMetadataHash)
)
.build();
+ context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
Review Comment:
Same as above.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -42,7 +42,7 @@ public void testConstructorWithNullSubtopologies() {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
0,
- null,
+ 0, null,
Review Comment:
```suggestion
0,
null,
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -4434,6 +4435,15 @@ public void testUpdateStreamsGroupSizeCounter() {
.build()))
.build();
+ for (int i = 1; i < 4; i++) {
Review Comment:
It's not so nice to call `InternalTopicManager` from within this test. Could
we turn this into explicit `.setConfiguredTopology` calls above? That would
seem more idiomatic.
EDIT: I see that you do that a couple of times below. I guess we can keep it
this way if we don't find an easy way to define those topologies statically. If
this will take too much time, we could define a little follow-up ticket to get
rid of the `InternalTopicManager` calls in here. Definitely get rid of the loop
here.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16526,12 +16539,18 @@ public void
testStreamsGroupMemberRequestingShutdownApplication() {
TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
- .withPartitionMetadata(Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6)
- ))
+ .withMetadataHash(groupMetadataHash)
)
.build();
+ context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
Review Comment:
Same here. If possible, let's make the ConfiguredTopology explicit here.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -90,7 +90,7 @@ public void testNoExceptionButNoSubtopologies() {
final IllegalArgumentException ex =
assertThrows(IllegalArgumentException.class,
() -> new ConfiguredTopology(
1,
- Optional.empty(),
+ 0, Optional.empty(),
Review Comment:
```suggestion
0,
Optional.empty(),
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -17593,16 +17663,16 @@ public void testStreamsStreamsGroupStates() {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
streamsGroupMemberBuilderWithDefaults(memberId1)
.build()));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
11, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
11, groupMetadataHash));
assertEquals(StreamsGroupState.NOT_READY,
context.streamsGroupState(groupId));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
- Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- )
- ));
+ context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
Review Comment:
Same as above.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16807,13 +16848,18 @@ public void
testStreamsUpdatingPartitionMetadataTriggersNewTargetAssignment() {
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
.withTargetAssignmentEpoch(10)
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
- .withPartitionMetadata(Map.of(
- fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6),
- barTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId,
barTopicName, 3)
- ))
+ .withMetadataHash(oldGroupMetadataHash)
)
.build();
+ context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
Review Comment:
Same as above.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java:
##########
@@ -66,7 +66,7 @@ public void
testConstructorWithNullTopicConfigurationException() {
assertThrows(NullPointerException.class,
() -> new ConfiguredTopology(
0,
- Optional.empty(),
+ 0, Optional.empty(),
Review Comment:
```suggestion
0,
Optional.empty(),
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]