kirktrue commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1621334851
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -1290,4 +1290,4 @@ static class MemberInfo {
this.memberEpoch = Optional.empty();
}
}
-}
+}
Review Comment:
We should revert/fix this change as it's whitespace only.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -53,89 +39,111 @@
import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
import static
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class ConsumerNetworkThreadTest {
+ static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+ static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+ private final Time time;
+ private final ConsumerMetadata metadata;
+ private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+ private final ApplicationEventProcessor applicationEventProcessor;
+ private final OffsetsRequestManager offsetsRequestManager;
+ private final HeartbeatRequestManager heartbeatRequestManager;
+ private final CoordinatorRequestManager coordinatorRequestManager;
+ private final ConsumerNetworkThread consumerNetworkThread;
+ private final MockClient client;
+ private final NetworkClientDelegate networkClientDelegate;
+ private final NetworkClientDelegate networkClient;
+ private final RequestManagers requestManagers;
+ private final CompletableEventReaper applicationEventReaper;
+
+ ConsumerNetworkThreadTest() {
+ LogContext logContext = new LogContext();
+ ConsumerConfig config = mock(ConsumerConfig.class);
+ this.time = new MockTime();
+ this.networkClientDelegate = mock(NetworkClientDelegate.class);
+ this.requestManagers = mock(RequestManagers.class);
+ this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+ this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+ this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ this.metadata = mock(ConsumerMetadata.class);
+ this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+ this.applicationEventReaper = mock(CompletableEventReaper.class);
+ this.client = new MockClient(time);
+
+ this.networkClient = new NetworkClientDelegate(
+ time,
+ config,
+ logContext,
+ client
+ );
- private ConsumerTestBuilder testBuilder;
- private Time time;
- private ConsumerMetadata metadata;
- private NetworkClientDelegate networkClient;
- private BlockingQueue<ApplicationEvent> applicationEventsQueue;
- private ApplicationEventProcessor applicationEventProcessor;
- private OffsetsRequestManager offsetsRequestManager;
- private CommitRequestManager commitRequestManager;
- private CoordinatorRequestManager coordinatorRequestManager;
- private ConsumerNetworkThread consumerNetworkThread;
- private final CompletableEventReaper applicationEventReaper =
mock(CompletableEventReaper.class);
- private MockClient client;
-
- @BeforeEach
- public void setup() {
- testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
- time = testBuilder.time;
- metadata = testBuilder.metadata;
- networkClient = testBuilder.networkClientDelegate;
- client = testBuilder.client;
- applicationEventsQueue = testBuilder.applicationEventQueue;
- applicationEventProcessor = testBuilder.applicationEventProcessor;
- commitRequestManager =
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
- offsetsRequestManager = testBuilder.offsetsRequestManager;
- coordinatorRequestManager =
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
- consumerNetworkThread = new ConsumerNetworkThread(
- testBuilder.logContext,
+ this.consumerNetworkThread = new ConsumerNetworkThread(
+ logContext,
time,
- testBuilder.applicationEventQueue,
+ applicationEventsQueue,
applicationEventReaper,
() -> applicationEventProcessor,
- () -> testBuilder.networkClientDelegate,
- () -> testBuilder.requestManagers
+ () -> networkClientDelegate,
+ () -> requestManagers
);
+ }
+
+ @BeforeEach
+ public void setup() {
consumerNetworkThread.initializeResources();
}
@AfterEach
public void tearDown() {
- if (testBuilder != null) {
- testBuilder.close();
- consumerNetworkThread.close(Duration.ZERO);
- }
+ if (consumerNetworkThread != null)
+ consumerNetworkThread.close();
+ }
+
+ @Test
+ public void testEnsureCloseStopsRunningThread() {
+ // consumerNetworkThread.running is set to true in the constructor
+ assertTrue(consumerNetworkThread.isRunning());
+
+ // close() should make consumerNetworkThread.running false by calling
closeInternal(Duration timeout)
+ consumerNetworkThread.close();
+ assertFalse(consumerNetworkThread.isRunning());
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {1, 100, 1000, 4999, 5001})
+ public void testConsumerNetworkThreadWaitTimeComputations(long
exampleTime) {
+ List<Optional<? extends RequestManager>> requestManagersList = new
ArrayList<>();
+ requestManagersList.add(Optional.of(coordinatorRequestManager));
+ when(requestManagers.entries()).thenReturn(requestManagersList);
Review Comment:
Nit, but could this be:
```suggestion
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(coordinatorRequestManager)));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -149,20 +157,28 @@ public void testStartupAndTearDown() throws
InterruptedException {
"The consumer network thread did not stop within " +
DEFAULT_MAX_WAIT_MS + " ms");
}
+ @Test
+ void testRequestManagersArePolledOnce() {
+ consumerNetworkThread.runOnce();
+ requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm, times(1)).poll(anyLong())));
+ requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm, times(1)).maximumTimeToWait(anyLong())));
+ verify(networkClientDelegate).poll(anyLong(), anyLong());
+ }
+
@Test
public void testApplicationEvent() {
ApplicationEvent e = new PollEvent(100);
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
- verify(applicationEventProcessor, times(1)).process(e);
+ verify(applicationEventProcessor).process(e);
Review Comment:
I'm curious as to this change. Is this something that was incorrect as is? 🤔
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -379,13 +403,4 @@ private MockClient.RequestMatcher
offsetCommitRequestMatcher(final Map<TopicPart
return true;
};
}
-
- private HashMap<TopicPartition, OffsetAndMetadata>
mockTopicPartitionOffset() {
- final TopicPartition t0 = new TopicPartition("t0", 2);
- final TopicPartition t1 = new TopicPartition("t0", 3);
- HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new
HashMap<>();
- topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
- topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
- return topicPartitionOffsets;
- }
-}
+}
Review Comment:
```suggestion
}
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -268,32 +281,31 @@ void testPollResultTimer() {
@Test
void testMaximumTimeToWait() {
+ List<Optional<? extends RequestManager>> list = new ArrayList<>();
+ list.add(Optional.of(heartbeatRequestManager));
// Initial value before runOnce has been called
assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS,
consumerNetworkThread.maximumTimeToWait());
+
+ when(requestManagers.entries()).thenReturn(list);
Review Comment:
Minor, but it seems like this line could be more succinct like:
```suggestion
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -314,7 +326,7 @@ void testEnsureEventsAreCompleted() {
coordinatorRequestManager.markCoordinatorUnknown("test",
time.milliseconds());
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"group-id", node));
prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
- CompletableApplicationEvent<Void> event1 = spy(new
AsyncCommitEvent(Collections.emptyMap()));
+ CompletableApplicationEvent<Void> event1 =
mock(AsyncCommitEvent.class);
Review Comment:
👍
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -327,8 +339,11 @@ void testEnsureEventsAreCompleted() {
assertTrue(applicationEventsQueue.isEmpty());
}
+ // Look into this one
Review Comment:
Can you elaborate on this comment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]