chia7712 commented on code in PR #19181:
URL: https://github.com/apache/kafka/pull/19181#discussion_r1989690501
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -119,9 +146,9 @@ private static
List<StreamsGroupHeartbeatRequestData.TaskIds> convertTaskIdColle
.map(entry -> {
StreamsGroupHeartbeatRequestData.TaskIds ids = new
StreamsGroupHeartbeatRequestData.TaskIds();
ids.setSubtopologyId(entry.getKey());
- ids.setPartitions(entry.getValue());
+
ids.setPartitions(entry.getValue().stream().sorted().collect(Collectors.toList()));
Review Comment:
Excuse me, is the sorting operation necessary in production, or is it
primarily used for testing purposes?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -82,29 +97,41 @@ public StreamsGroupHeartbeatRequestData buildRequestData() {
data.setMemberId(membershipManager.memberId());
data.setMemberEpoch(membershipManager.memberEpoch());
membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
- StreamsGroupHeartbeatRequestData.Topology topology = new
StreamsGroupHeartbeatRequestData.Topology();
-
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
- topology.setEpoch(streamsRebalanceData.topologyEpoch());
- data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
- data.setTopology(topology);
- data.setProcessId(streamsRebalanceData.processId().toString());
- streamsRebalanceData.endpoint().ifPresent(userEndpoint -> {
- data.setUserEndpoint(new
StreamsGroupHeartbeatRequestData.Endpoint()
- .setHost(userEndpoint.host())
- .setPort(userEndpoint.port())
- );
- });
-
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
- .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue()
- .setKey(entry.getKey())
- .setValue(entry.getValue())
- )
- .collect(Collectors.toList()));
+
+ boolean joining = membershipManager.state() == MemberState.JOINING;
+
+ if (joining) {
+ StreamsGroupHeartbeatRequestData.Topology topology = new
StreamsGroupHeartbeatRequestData.Topology();
+
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
+ topology.setEpoch(streamsRebalanceData.topologyEpoch());
+ data.setTopology(topology);
+ data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
+ data.setProcessId(streamsRebalanceData.processId().toString());
Review Comment:
Out of curiosity, what is the rationale for using `String` instead of `Uuid`
as the data type for `processId`? By contrast, `SubscriptionInfoData` uses
`Uuid`
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -61,9 +61,23 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
static class HeartbeatState {
+ // Fields of StreamsGroupHeartbeatRequest sent in the most recent
request
+ static class LastSentFields {
+
+ private StreamsRebalanceData.Assignment assignment = null;
Review Comment:
Maybe we can initialize it to `Assignment.EMPTY` to avoid NPE in the future?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -61,9 +61,23 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
static class HeartbeatState {
+ // Fields of StreamsGroupHeartbeatRequest sent in the most recent
request
+ static class LastSentFields {
Review Comment:
Given that this struct contains only one field, perhaps we could directly
incorporate the `assignment` field into `HeartbeatState`?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -476,20 +492,25 @@ public void
testNotSendingLeaveHeartbeatIfPollTimerExpiredAndMemberIsLeaving() {
}
@Test
- public void testSendingFullHeartbeatRequest() {
+ public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
try (
final MockedConstruction<HeartbeatRequestState>
heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
(mock, context) -> {
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
- })
+ });
+ final MockedConstruction<Timer> pollTimerMockedConstruction =
mockConstruction(
+ Timer.class,
+ (mock, context) -> {
+ when(mock.isExpired()).thenReturn(true);
+ });
Review Comment:
`;` is unnecessary
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -82,29 +97,41 @@ public StreamsGroupHeartbeatRequestData buildRequestData() {
data.setMemberId(membershipManager.memberId());
data.setMemberEpoch(membershipManager.memberEpoch());
membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
- StreamsGroupHeartbeatRequestData.Topology topology = new
StreamsGroupHeartbeatRequestData.Topology();
-
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
- topology.setEpoch(streamsRebalanceData.topologyEpoch());
- data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
- data.setTopology(topology);
- data.setProcessId(streamsRebalanceData.processId().toString());
- streamsRebalanceData.endpoint().ifPresent(userEndpoint -> {
- data.setUserEndpoint(new
StreamsGroupHeartbeatRequestData.Endpoint()
- .setHost(userEndpoint.host())
- .setPort(userEndpoint.port())
- );
- });
-
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
- .map(entry -> new StreamsGroupHeartbeatRequestData.KeyValue()
- .setKey(entry.getKey())
- .setValue(entry.getValue())
- )
- .collect(Collectors.toList()));
+
+ boolean joining = membershipManager.state() == MemberState.JOINING;
+
+ if (joining) {
+ StreamsGroupHeartbeatRequestData.Topology topology = new
StreamsGroupHeartbeatRequestData.Topology();
+
topology.setSubtopologies(getTopologyFromStreams(streamsRebalanceData.subtopologies()));
+ topology.setEpoch(streamsRebalanceData.topologyEpoch());
+ data.setTopology(topology);
+ data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
+ data.setProcessId(streamsRebalanceData.processId().toString());
+ streamsRebalanceData.endpoint().ifPresent(userEndpoint -> {
+ data.setUserEndpoint(new
StreamsGroupHeartbeatRequestData.Endpoint()
+ .setHost(userEndpoint.host())
+ .setPort(userEndpoint.port())
+ );
+ });
+
data.setClientTags(streamsRebalanceData.clientTags().entrySet().stream()
+ .map(entry -> new
StreamsGroupHeartbeatRequestData.KeyValue()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue())
+ )
+ .collect(Collectors.toList()));
+ data.setActiveTasks(convertTaskIdCollection(Set.of()));
Review Comment:
Excuse me, what is the rationale for initializing these fields as empty
lists, rather than leaving them as `null`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]