This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 5af953c17 test(java): add async-specific tests for
ConsumerGroupsTcpClient (#2890)
5af953c17 is described below
commit 5af953c17d4030beaf11a003d6a0d58aa9ab5be9
Author: Atharva Lade <[email protected]>
AuthorDate: Tue Mar 10 06:09:31 2026 -0500
test(java): add async-specific tests for ConsumerGroupsTcpClient (#2890)
Closes #2225
---
.../iggy/client/async/AsyncConsumerGroupsTest.java | 529 +++++++++++++++++++++
1 file changed, 529 insertions(+)
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConsumerGroupsTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConsumerGroupsTest.java
new file mode 100644
index 000000000..843101297
--- /dev/null
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConsumerGroupsTest.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async;
+
+import org.apache.iggy.client.BaseIntegrationTest;
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.consumergroup.ConsumerGroup;
+import org.apache.iggy.consumergroup.ConsumerGroupDetails;
+import org.apache.iggy.exception.IggyResourceNotFoundException;
+import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Dedicated async-specific tests for {@link ConsumerGroupsClient} via
+ * {@link org.apache.iggy.client.async.tcp.ConsumerGroupsTcpClient}.
+ *
+ * <p>Covers all CRUD operations, join/leave membership, error scenarios, and
+ * CompletableFuture-specific patterns (chaining, concurrency, exception
propagation).
+ */
+public class AsyncConsumerGroupsTest extends BaseIntegrationTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(AsyncConsumerGroupsTest.class);
+
+ private static final String USERNAME = "iggy";
+ private static final String PASSWORD = "iggy";
+ private static final String TEST_STREAM = "async-cg-test-stream-" +
UUID.randomUUID();
+ private static final String TEST_TOPIC = "async-cg-test-topic";
+ private static final int TIMEOUT_SECONDS = 5;
+
+ private static final StreamId STREAM_ID = StreamId.of(TEST_STREAM);
+ private static final TopicId TOPIC_ID = TopicId.of(TEST_TOPIC);
+
+ private static AsyncIggyTcpClient client;
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ log.info("Setting up async consumer groups test");
+ client = new AsyncIggyTcpClient(serverHost(), serverTcpPort());
+
+ client.connect()
+ .thenCompose(v -> {
+ log.info("Connected to Iggy server");
+ return client.users().login(USERNAME, PASSWORD);
+ })
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ client.streams().createStream(TEST_STREAM).get(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ client.topics()
+ .createTopic(
+ STREAM_ID,
+ 1L,
+ CompressionAlgorithm.None,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ Optional.empty(),
+ TEST_TOPIC)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ log.info("Created stream '{}' and topic '{}'", TEST_STREAM,
TEST_TOPIC);
+ }
+
+ @AfterAll
+ public static void tearDown() throws Exception {
+ log.info("Cleaning up async consumer groups test resources");
+ try {
+ client.streams().deleteStream(STREAM_ID).get(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ log.info("Deleted test stream: {}", TEST_STREAM);
+ } catch (RuntimeException e) {
+ log.debug("Stream cleanup failed (may not exist): {}",
e.getMessage());
+ }
+ if (client != null) {
+ client.close().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ log.info("Closed async client");
+ }
+ }
+
+ // ===== Happy path tests =====
+
+ @Test
+ void shouldCreateConsumerGroupAsync() throws Exception {
+ String groupName = "create-test-" + UUID.randomUUID();
+
+ ConsumerGroupDetails group = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(group).isNotNull();
+ assertThat(group.id()).isNotNull();
+ assertThat(group.name()).isEqualTo(groupName);
+ assertThat(group.membersCount()).isEqualTo(0);
+ assertThat(group.members()).isEmpty();
+ }
+
+ @Test
+ void shouldGetConsumerGroupByIdAsync() throws Exception {
+ String groupName = "get-by-id-test-" + UUID.randomUUID();
+
+ ConsumerGroupDetails created = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ Optional<ConsumerGroupDetails> retrieved = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID,
ConsumerId.of(created.id()))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(retrieved).isPresent();
+ assertThat(retrieved.get().id()).isEqualTo(created.id());
+ assertThat(retrieved.get().name()).isEqualTo(groupName);
+ }
+
+ @Test
+ void shouldGetConsumerGroupByNameAsync() throws Exception {
+ String groupName = "get-by-name-test-" + UUID.randomUUID();
+
+ ConsumerGroupDetails created = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ Optional<ConsumerGroupDetails> byId = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID,
ConsumerId.of(created.id()))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ Optional<ConsumerGroupDetails> byName = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID,
ConsumerId.of(groupName))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(byName).isPresent();
+ assertThat(byName.get().name()).isEqualTo(groupName);
+ assertThat(byId).isEqualTo(byName);
+ }
+
+ @Test
+ void shouldListAllConsumerGroupsAsync() throws Exception {
+ String streamName = "list-all-stream-" + UUID.randomUUID();
+ String topicName = "list-all-topic";
+ StreamId streamId = StreamId.of(streamName);
+ TopicId topicId = TopicId.of(topicName);
+
+ try {
+ client.streams().createStream(streamName).get(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ client.topics()
+ .createTopic(
+ streamId,
+ 1L,
+ CompressionAlgorithm.None,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ Optional.empty(),
+ topicName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ client.consumerGroups()
+ .createConsumerGroup(streamId, topicId, "group-a")
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ client.consumerGroups()
+ .createConsumerGroup(streamId, topicId, "group-b")
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ client.consumerGroups()
+ .createConsumerGroup(streamId, topicId, "group-c")
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ List<ConsumerGroup> groups =
+ client.consumerGroups().getConsumerGroups(streamId,
topicId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(groups).hasSize(3);
+
assertThat(groups).map(ConsumerGroup::name).containsExactlyInAnyOrder("group-a",
"group-b", "group-c");
+ } finally {
+ client.streams().deleteStream(streamId).get(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ void shouldDeleteConsumerGroupAsync() throws Exception {
+ String groupName = "delete-test-" + UUID.randomUUID();
+
+ ConsumerGroupDetails created = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ client.consumerGroups()
+ .deleteConsumerGroup(STREAM_ID, TOPIC_ID,
ConsumerId.of(created.id()))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ Optional<ConsumerGroupDetails> deleted = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID,
ConsumerId.of(created.id()))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(deleted).isEmpty();
+ }
+
+ @Test
+ void shouldDeleteConsumerGroupByNameAsync() throws Exception {
+ String groupName = "delete-by-name-test-" + UUID.randomUUID();
+
+ client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ client.consumerGroups()
+ .deleteConsumerGroup(STREAM_ID, TOPIC_ID,
ConsumerId.of(groupName))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ Optional<ConsumerGroupDetails> deleted = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID,
ConsumerId.of(groupName))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(deleted).isEmpty();
+ }
+
+ // ===== Join/Leave tests =====
+
+ @Test
+ void shouldJoinConsumerGroupAsync() throws Exception {
+ String groupName = "join-test-" + UUID.randomUUID();
+
+ ConsumerGroupDetails created = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ ConsumerId groupId = ConsumerId.of(created.id());
+
+ client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID,
groupId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ ConsumerGroupDetails group = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .get();
+
+ assertThat(group.membersCount()).isEqualTo(1);
+ assertThat(group.members()).hasSize(1);
+ assertThat(group.members().get(0).partitionsCount()).isGreaterThan(0);
+
+ client.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID,
groupId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void shouldLeaveConsumerGroupAsync() throws Exception {
+ String groupName = "leave-test-" + UUID.randomUUID();
+
+ ConsumerGroupDetails created = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ ConsumerId groupId = ConsumerId.of(created.id());
+
+ client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID,
groupId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ client.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID,
groupId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ ConsumerGroupDetails group = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .get();
+
+ assertThat(group.membersCount()).isEqualTo(0);
+ assertThat(group.members()).isEmpty();
+ }
+
+ @Test
+ void shouldJoinAndLeaveConsumerGroupSequentiallyAsync() throws Exception {
+ String groupName = "sequential-test-" + UUID.randomUUID();
+
+ ConsumerGroupDetails created = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ ConsumerId groupId = ConsumerId.of(created.id());
+
+ ConsumerGroupDetails afterLeave = client.consumerGroups()
+ .joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .thenCompose(v ->
client.consumerGroups().getConsumerGroup(STREAM_ID, TOPIC_ID, groupId))
+ .thenCompose(groupOpt -> {
+ assertThat(groupOpt).isPresent();
+ assertThat(groupOpt.get().membersCount()).isEqualTo(1);
+ return
client.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId);
+ })
+ .thenCompose(v ->
client.consumerGroups().getConsumerGroup(STREAM_ID, TOPIC_ID, groupId))
+ .thenApply(groupOpt -> {
+ assertThat(groupOpt).isPresent();
+ return groupOpt.get();
+ })
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(afterLeave.membersCount()).isEqualTo(0);
+ }
+
+ @Test
+ void shouldHandleMultipleClientsJoiningGroupAsync() throws Exception {
+ String groupName = "multi-client-test-" + UUID.randomUUID();
+
+ ConsumerGroupDetails created = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ ConsumerId groupId = ConsumerId.of(created.id());
+
+ AsyncIggyTcpClient secondClient = new AsyncIggyTcpClient(serverHost(),
serverTcpPort());
+ try {
+ secondClient
+ .connect()
+ .thenCompose(v -> secondClient.users().login(USERNAME,
PASSWORD))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ client.consumerGroups()
+ .joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ secondClient
+ .consumerGroups()
+ .joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ ConsumerGroupDetails group = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .get();
+
+ assertThat(group.membersCount()).isEqualTo(2);
+ assertThat(group.members()).hasSize(2);
+
+ client.consumerGroups()
+ .leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ secondClient
+ .consumerGroups()
+ .leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ ConsumerGroupDetails afterLeave = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .get();
+
+ assertThat(afterLeave.membersCount()).isEqualTo(0);
+ } finally {
+ secondClient.close().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+ }
+
+ // ===== Error scenario tests =====
+
+ @Test
+ void shouldReturnEmptyForNonExistentGroup() throws Exception {
+ Optional<ConsumerGroupDetails> result = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID, ConsumerId.of(999_999L))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ void shouldFailToDeleteNonExistentGroup() {
+ var future = client.consumerGroups().deleteConsumerGroup(STREAM_ID,
TOPIC_ID, ConsumerId.of(999_999L));
+
+ var exception = assertThrows(ExecutionException.class, () ->
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+
assertThat(exception.getCause()).isInstanceOf(IggyResourceNotFoundException.class);
+ }
+
+ @Test
+ void shouldFailToJoinNonExistentGroup() {
+ var future = client.consumerGroups().joinConsumerGroup(STREAM_ID,
TOPIC_ID, ConsumerId.of(999_999L));
+
+ var exception = assertThrows(ExecutionException.class, () ->
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+
assertThat(exception.getCause()).isInstanceOf(IggyResourceNotFoundException.class);
+ }
+
+ @Test
+ void shouldFailToLeaveGroupNotJoined() throws Exception {
+ String groupName = "leave-not-joined-test-" + UUID.randomUUID();
+
+ ConsumerGroupDetails created = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ var future = client.consumerGroups().leaveConsumerGroup(STREAM_ID,
TOPIC_ID, ConsumerId.of(created.id()));
+
+ var exception = assertThrows(ExecutionException.class, () ->
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+
assertThat(exception.getCause()).isInstanceOf(IggyResourceNotFoundException.class);
+ }
+
+ // ===== CompletableFuture-specific tests =====
+
+ @Test
+ void shouldChainCreateAndGetWithThenCompose() throws Exception {
+ String groupName = "chain-test-" + UUID.randomUUID();
+
+ Optional<ConsumerGroupDetails> result = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .thenCompose(created ->
+ client.consumerGroups().getConsumerGroup(STREAM_ID,
TOPIC_ID, ConsumerId.of(created.id())))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(result).isPresent();
+ assertThat(result.get().name()).isEqualTo(groupName);
+ assertThat(result.get().membersCount()).isEqualTo(0);
+ }
+
+ @Test
+ void shouldHandleConcurrentGroupCreations() throws Exception {
+ String streamName = "concurrent-create-stream-" + UUID.randomUUID();
+ String topicName = "concurrent-create-topic";
+ StreamId streamId = StreamId.of(streamName);
+ TopicId topicId = TopicId.of(topicName);
+
+ try {
+ client.streams().createStream(streamName).get(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ client.topics()
+ .createTopic(
+ streamId,
+ 1L,
+ CompressionAlgorithm.None,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ Optional.empty(),
+ topicName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ List<CompletableFuture<ConsumerGroupDetails>> futures = new
ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+
futures.add(client.consumerGroups().createConsumerGroup(streamId, topicId,
"concurrent-group-" + i));
+ }
+
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+ .get(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
+
+ for (CompletableFuture<ConsumerGroupDetails> f : futures) {
+ assertThat(f.isDone()).isTrue();
+ assertThat(f.isCompletedExceptionally()).isFalse();
+ }
+
+ List<ConsumerGroup> groups =
+ client.consumerGroups().getConsumerGroups(streamId,
topicId).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ assertThat(groups).hasSize(5);
+ assertThat(groups)
+ .map(ConsumerGroup::name)
+ .containsExactlyInAnyOrder(
+ "concurrent-group-0",
+ "concurrent-group-1",
+ "concurrent-group-2",
+ "concurrent-group-3",
+ "concurrent-group-4");
+ } finally {
+ client.streams().deleteStream(streamId).get(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ void shouldHandleConcurrentJoinAndLeaveOperations() throws Exception {
+ String groupName = "concurrent-join-leave-test-" + UUID.randomUUID();
+
+ ConsumerGroupDetails created = client.consumerGroups()
+ .createConsumerGroup(STREAM_ID, TOPIC_ID, groupName)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ ConsumerId groupId = ConsumerId.of(created.id());
+
+ AsyncIggyTcpClient secondClient = new AsyncIggyTcpClient(serverHost(),
serverTcpPort());
+ AsyncIggyTcpClient thirdClient = new AsyncIggyTcpClient(serverHost(),
serverTcpPort());
+ try {
+ secondClient
+ .connect()
+ .thenCompose(v -> secondClient.users().login(USERNAME,
PASSWORD))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ thirdClient
+ .connect()
+ .thenCompose(v -> thirdClient.users().login(USERNAME,
PASSWORD))
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ CompletableFuture.allOf(
+
client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId),
+
secondClient.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId),
+
thirdClient.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, groupId))
+ .get(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
+
+ ConsumerGroupDetails afterJoin = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .get();
+
+ assertThat(afterJoin.membersCount()).isEqualTo(3);
+
+ CompletableFuture.allOf(
+
client.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId),
+
secondClient.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId),
+
thirdClient.consumerGroups().leaveConsumerGroup(STREAM_ID, TOPIC_ID, groupId))
+ .get(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS);
+
+ ConsumerGroupDetails afterLeave = client.consumerGroups()
+ .getConsumerGroup(STREAM_ID, TOPIC_ID, groupId)
+ .get(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .get();
+
+ assertThat(afterLeave.membersCount()).isEqualTo(0);
+ } finally {
+ secondClient.close().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ thirdClient.close().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+ }
+}