apoorvmittal10 commented on code in PR #18105:
URL: https://github.com/apache/kafka/pull/18105#discussion_r1884281619
##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -1069,100 +1054,47 @@ public void
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe
alterShareAutoOffsetReset(groupId2, "earliest");
alterShareAutoOffsetReset(groupId3, "earliest");
- ExecutorService producerExecutorService =
Executors.newFixedThreadPool(producerCount);
- ExecutorService shareGroupExecutorService1 =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService shareGroupExecutorService2 =
Executors.newFixedThreadPool(consumerCount);
- ExecutorService shareGroupExecutorService3 =
Executors.newFixedThreadPool(consumerCount);
-
- CountDownLatch startSignal = new CountDownLatch(producerCount);
-
- ConcurrentLinkedQueue<CompletableFuture<Integer>> producerFutures =
new ConcurrentLinkedQueue<>();
-
+ List<CompletableFuture<Integer>> producerFutures = new ArrayList<>();
for (int i = 0; i < producerCount; i++) {
- producerExecutorService.submit(() -> {
- CompletableFuture<Integer> future =
produceMessages(messagesPerProducer);
- producerFutures.add(future);
- startSignal.countDown();
- });
+ producerFutures.add(CompletableFuture.supplyAsync(() ->
produceMessages(messagesPerProducer)));
}
-
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures1 = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures2 = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<CompletableFuture<Integer>> futures3 = new
ConcurrentLinkedQueue<>();
-
// Wait for the producers to run
- try {
- boolean signalled = startSignal.await(15, TimeUnit.SECONDS);
- assertTrue(signalled);
- } catch (InterruptedException e) {
- fail("Exception awaiting start signal");
- }
+ assertDoesNotThrow(() ->
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new))
+ .get(15, TimeUnit.SECONDS), "Exception awaiting
produceMessages");
+ int actualMessageSent =
producerFutures.stream().mapToInt(CompletableFuture::join).sum();
- int maxBytes = 100000;
+ List<CompletableFuture<Integer>> consumeMessagesFutures1 = new
ArrayList<>();
+ List<CompletableFuture<Integer>> consumeMessagesFutures2 = new
ArrayList<>();
+ List<CompletableFuture<Integer>> consumeMessagesFutures3 = new
ArrayList<>();
+ int maxBytes = 100000;
for (int i = 0; i < consumerCount; i++) {
Review Comment:
Should the variable be `consumerCount` => `consumerGroupCount `?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]