This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 57da00785 feat(java): add async API parity with blocking client (#2718)
57da00785 is described below
commit 57da00785c3070b4cdc8683b518a1ec6bca69499
Author: Atharva Lade <[email protected]>
AuthorDate: Thu Feb 26 02:28:42 2026 -0600
feat(java): add async API parity with blocking client (#2718)
Closes #2716
---
.../iggy/client/async/ConsumerGroupsClient.java | 92 ++++++++
.../iggy/client/async/ConsumerOffsetsClient.java | 90 ++++++++
.../apache/iggy/client/async/PartitionsClient.java | 75 ++++++
.../client/async/PersonalAccessTokensClient.java | 66 ++++++
.../org/apache/iggy/client/async/SystemClient.java | 69 ++++++
.../iggy/client/async/tcp/AsyncIggyTcpClient.java | 64 ++++++
.../client/async/tcp/ConsumerGroupsTcpClient.java | 91 ++++++++
.../client/async/tcp/ConsumerOffsetsTcpClient.java | 102 +++++++++
.../iggy/client/async/tcp/PartitionsTcpClient.java | 69 ++++++
.../async/tcp/PersonalAccessTokensTcpClient.java | 121 ++++++++++
.../iggy/client/async/tcp/SystemTcpClient.java | 128 +++++++++++
.../client/async/AsyncClientIntegrationTest.java | 252 +++++++++++++++++++++
.../async/tcp/AsyncIggyTcpClientBuilderTest.java | 56 +++++
13 files changed, 1275 insertions(+)
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerGroupsClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerGroupsClient.java
index 14d3fe68e..9a8af0108 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerGroupsClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerGroupsClient.java
@@ -19,10 +19,14 @@
package org.apache.iggy.client.async;
+import org.apache.iggy.consumergroup.ConsumerGroup;
+import org.apache.iggy.consumergroup.ConsumerGroupDetails;
import org.apache.iggy.identifier.ConsumerId;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
+import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
@@ -56,6 +60,94 @@ import java.util.concurrent.CompletableFuture;
*/
public interface ConsumerGroupsClient {
+ /**
+ * Gets a consumer group asynchronously.
+ *
+ * @param streamId The stream identifier (numeric ID)
+ * @param topicId The topic identifier (numeric ID)
+ * @param groupId The consumer group identifier (numeric ID)
+ * @return A CompletableFuture containing consumer group details if it
exists
+ */
+ default CompletableFuture<Optional<ConsumerGroupDetails>> getConsumerGroup(
+ Long streamId, Long topicId, Long groupId) {
+ return getConsumerGroup(StreamId.of(streamId), TopicId.of(topicId),
ConsumerId.of(groupId));
+ }
+
+ /**
+ * Gets a consumer group asynchronously.
+ *
+ * @param streamId The stream identifier
+ * @param topicId The topic identifier
+ * @param groupId The consumer group identifier
+ * @return A CompletableFuture containing consumer group details if it
exists
+ */
+ CompletableFuture<Optional<ConsumerGroupDetails>> getConsumerGroup(
+ StreamId streamId, TopicId topicId, ConsumerId groupId);
+
+ /**
+ * Gets all consumer groups for a topic asynchronously.
+ *
+ * @param streamId The stream identifier (numeric ID)
+ * @param topicId The topic identifier (numeric ID)
+ * @return A CompletableFuture containing list of consumer groups
+ */
+ default CompletableFuture<List<ConsumerGroup>> getConsumerGroups(Long
streamId, Long topicId) {
+ return getConsumerGroups(StreamId.of(streamId), TopicId.of(topicId));
+ }
+
+ /**
+ * Gets all consumer groups for a topic asynchronously.
+ *
+ * @param streamId The stream identifier
+ * @param topicId The topic identifier
+ * @return A CompletableFuture containing list of consumer groups
+ */
+ CompletableFuture<List<ConsumerGroup>> getConsumerGroups(StreamId
streamId, TopicId topicId);
+
+ /**
+ * Creates a consumer group asynchronously.
+ *
+ * @param streamId The stream identifier (numeric ID)
+ * @param topicId The topic identifier (numeric ID)
+ * @param name The name of the consumer group
+ * @return A CompletableFuture containing the created consumer group
details
+ */
+ default CompletableFuture<ConsumerGroupDetails> createConsumerGroup(Long
streamId, Long topicId, String name) {
+ return createConsumerGroup(StreamId.of(streamId), TopicId.of(topicId),
name);
+ }
+
+ /**
+ * Creates a consumer group asynchronously.
+ *
+ * @param streamId The stream identifier
+ * @param topicId The topic identifier
+ * @param name The name of the consumer group
+ * @return A CompletableFuture containing the created consumer group
details
+ */
+ CompletableFuture<ConsumerGroupDetails> createConsumerGroup(StreamId
streamId, TopicId topicId, String name);
+
+ /**
+ * Deletes a consumer group asynchronously.
+ *
+ * @param streamId The stream identifier (numeric ID)
+ * @param topicId The topic identifier (numeric ID)
+ * @param groupId The consumer group identifier (numeric ID)
+ * @return A CompletableFuture that completes when the operation is done
+ */
+ default CompletableFuture<Void> deleteConsumerGroup(Long streamId, Long
topicId, Long groupId) {
+ return deleteConsumerGroup(StreamId.of(streamId), TopicId.of(topicId),
ConsumerId.of(groupId));
+ }
+
+ /**
+ * Deletes a consumer group asynchronously.
+ *
+ * @param streamId The stream identifier
+ * @param topicId The topic identifier
+ * @param groupId The consumer group identifier
+ * @return A CompletableFuture that completes when the operation is done
+ */
+ CompletableFuture<Void> deleteConsumerGroup(StreamId streamId, TopicId
topicId, ConsumerId groupId);
+
/**
* Joins a consumer group asynchronously.
*
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerOffsetsClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerOffsetsClient.java
new file mode 100644
index 000000000..8035c0cb5
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerOffsetsClient.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async;
+
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.consumeroffset.ConsumerOffsetInfo;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+
+import java.math.BigInteger;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async interface for consumer offset operations.
+ */
+public interface ConsumerOffsetsClient {
+
+ /**
+ * Stores a consumer offset asynchronously.
+ *
+ * @param streamId The stream identifier (numeric ID)
+ * @param topicId The topic identifier (numeric ID)
+ * @param partitionId The partition identifier (optional)
+ * @param consumerId The consumer identifier (numeric ID)
+ * @param offset The offset to store
+ * @return A CompletableFuture that completes when the operation is done
+ */
+ default CompletableFuture<Void> storeConsumerOffset(
+ Long streamId, Long topicId, Optional<Long> partitionId, Long
consumerId, BigInteger offset) {
+ return storeConsumerOffset(
+ StreamId.of(streamId), TopicId.of(topicId), partitionId,
Consumer.of(consumerId), offset);
+ }
+
+ /**
+ * Stores a consumer offset asynchronously.
+ *
+ * @param streamId The stream identifier
+ * @param topicId The topic identifier
+ * @param partitionId The partition identifier (optional)
+ * @param consumer The consumer
+ * @param offset The offset to store
+ * @return A CompletableFuture that completes when the operation is done
+ */
+ CompletableFuture<Void> storeConsumerOffset(
+ StreamId streamId, TopicId topicId, Optional<Long> partitionId,
Consumer consumer, BigInteger offset);
+
+ /**
+ * Gets a consumer offset asynchronously.
+ *
+ * @param streamId The stream identifier (numeric ID)
+ * @param topicId The topic identifier (numeric ID)
+ * @param partitionId The partition identifier (optional)
+ * @param consumerId The consumer identifier (numeric ID)
+ * @return A CompletableFuture containing the consumer offset info if it
exists
+ */
+ default CompletableFuture<Optional<ConsumerOffsetInfo>> getConsumerOffset(
+ Long streamId, Long topicId, Optional<Long> partitionId, Long
consumerId) {
+ return getConsumerOffset(StreamId.of(streamId), TopicId.of(topicId),
partitionId, Consumer.of(consumerId));
+ }
+
+ /**
+ * Gets a consumer offset asynchronously.
+ *
+ * @param streamId The stream identifier
+ * @param topicId The topic identifier
+ * @param partitionId The partition identifier (optional)
+ * @param consumer The consumer
+ * @return A CompletableFuture containing the consumer offset info if it
exists
+ */
+ CompletableFuture<Optional<ConsumerOffsetInfo>> getConsumerOffset(
+ StreamId streamId, TopicId topicId, Optional<Long> partitionId,
Consumer consumer);
+}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/PartitionsClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/PartitionsClient.java
new file mode 100644
index 000000000..d06260e2f
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/PartitionsClient.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async;
+
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async interface for partition operations.
+ */
+public interface PartitionsClient {
+
+ /**
+ * Creates partitions asynchronously.
+ *
+ * @param streamId The stream identifier (numeric ID)
+ * @param topicId The topic identifier (numeric ID)
+ * @param partitionsCount The number of partitions to create
+ * @return A CompletableFuture that completes when the operation is done
+ */
+ default CompletableFuture<Void> createPartitions(Long streamId, Long
topicId, Long partitionsCount) {
+ return createPartitions(StreamId.of(streamId), TopicId.of(topicId),
partitionsCount);
+ }
+
+ /**
+ * Creates partitions asynchronously.
+ *
+ * @param streamId The stream identifier
+ * @param topicId The topic identifier
+ * @param partitionsCount The number of partitions to create
+ * @return A CompletableFuture that completes when the operation is done
+ */
+ CompletableFuture<Void> createPartitions(StreamId streamId, TopicId
topicId, Long partitionsCount);
+
+ /**
+ * Deletes partitions asynchronously.
+ *
+ * @param streamId The stream identifier (numeric ID)
+ * @param topicId The topic identifier (numeric ID)
+ * @param partitionsCount The number of partitions to delete
+ * @return A CompletableFuture that completes when the operation is done
+ */
+ default CompletableFuture<Void> deletePartitions(Long streamId, Long
topicId, Long partitionsCount) {
+ return deletePartitions(StreamId.of(streamId), TopicId.of(topicId),
partitionsCount);
+ }
+
+ /**
+ * Deletes partitions asynchronously.
+ *
+ * @param streamId The stream identifier
+ * @param topicId The topic identifier
+ * @param partitionsCount The number of partitions to delete
+ * @return A CompletableFuture that completes when the operation is done
+ */
+ CompletableFuture<Void> deletePartitions(StreamId streamId, TopicId
topicId, Long partitionsCount);
+}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/PersonalAccessTokensClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/PersonalAccessTokensClient.java
new file mode 100644
index 000000000..d9dcd7757
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/PersonalAccessTokensClient.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async;
+
+import org.apache.iggy.personalaccesstoken.PersonalAccessTokenInfo;
+import org.apache.iggy.personalaccesstoken.RawPersonalAccessToken;
+import org.apache.iggy.user.IdentityInfo;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async interface for personal access token operations.
+ */
+public interface PersonalAccessTokensClient {
+
+ /**
+ * Creates a new personal access token asynchronously.
+ *
+ * @param name The name of the token
+ * @param expiry The expiration time in seconds (0 for no expiration)
+ * @return A CompletableFuture containing the created raw personal access
token
+ */
+ CompletableFuture<RawPersonalAccessToken> createPersonalAccessToken(String
name, BigInteger expiry);
+
+ /**
+ * Gets all personal access tokens asynchronously.
+ *
+ * @return A CompletableFuture containing list of personal access tokens
+ */
+ CompletableFuture<List<PersonalAccessTokenInfo>> getPersonalAccessTokens();
+
+ /**
+ * Deletes a personal access token asynchronously.
+ *
+ * @param name The name of the token to delete
+ * @return A CompletableFuture that completes when the operation is done
+ */
+ CompletableFuture<Void> deletePersonalAccessToken(String name);
+
+ /**
+ * Logs in using a personal access token asynchronously.
+ *
+ * @param token The personal access token
+ * @return A CompletableFuture containing identity information
+ */
+ CompletableFuture<IdentityInfo> loginWithPersonalAccessToken(String token);
+}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/SystemClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/SystemClient.java
new file mode 100644
index 000000000..f5587142d
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/SystemClient.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async;
+
+import org.apache.iggy.system.ClientInfo;
+import org.apache.iggy.system.ClientInfoDetails;
+import org.apache.iggy.system.Stats;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async interface for system operations.
+ */
+public interface SystemClient {
+
+ /**
+ * Gets server statistics asynchronously.
+ *
+ * @return A CompletableFuture containing server statistics
+ */
+ CompletableFuture<Stats> getStats();
+
+ /**
+ * Gets information about the current client asynchronously.
+ *
+ * @return A CompletableFuture containing current client details
+ */
+ CompletableFuture<ClientInfoDetails> getMe();
+
+ /**
+ * Gets information about a specific client asynchronously.
+ *
+ * @param clientId The ID of the client to retrieve
+ * @return A CompletableFuture containing client details
+ */
+ CompletableFuture<ClientInfoDetails> getClient(Long clientId);
+
+ /**
+ * Gets a list of all connected clients asynchronously.
+ *
+ * @return A CompletableFuture containing list of client information
+ */
+ CompletableFuture<List<ClientInfo>> getClients();
+
+ /**
+ * Pings the server asynchronously.
+ *
+ * @return A CompletableFuture that completes when ping succeeds
+ */
+ CompletableFuture<String> ping();
+}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index 4729f0145..e76493d1d 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -20,8 +20,12 @@
package org.apache.iggy.client.async.tcp;
import org.apache.iggy.client.async.ConsumerGroupsClient;
+import org.apache.iggy.client.async.ConsumerOffsetsClient;
import org.apache.iggy.client.async.MessagesClient;
+import org.apache.iggy.client.async.PartitionsClient;
+import org.apache.iggy.client.async.PersonalAccessTokensClient;
import org.apache.iggy.client.async.StreamsClient;
+import org.apache.iggy.client.async.SystemClient;
import org.apache.iggy.client.async.TopicsClient;
import org.apache.iggy.client.async.UsersClient;
import org.apache.iggy.config.RetryPolicy;
@@ -99,9 +103,13 @@ public class AsyncIggyTcpClient {
private AsyncTcpConnection connection;
private MessagesClient messagesClient;
private ConsumerGroupsClient consumerGroupsClient;
+ private ConsumerOffsetsClient consumerOffsetsClient;
private StreamsClient streamsClient;
private TopicsClient topicsClient;
private UsersClient usersClient;
+ private SystemClient systemClient;
+ private PersonalAccessTokensClient personalAccessTokensClient;
+ private PartitionsClient partitionsClient;
/**
* Creates a new async TCP client with default settings.
@@ -162,9 +170,13 @@ public class AsyncIggyTcpClient {
return connection.connect().thenRun(() -> {
messagesClient = new MessagesTcpClient(connection);
consumerGroupsClient = new ConsumerGroupsTcpClient(connection);
+ consumerOffsetsClient = new ConsumerOffsetsTcpClient(connection);
streamsClient = new StreamsTcpClient(connection);
topicsClient = new TopicsTcpClient(connection);
usersClient = new UsersTcpClient(connection);
+ systemClient = new SystemTcpClient(connection);
+ personalAccessTokensClient = new
PersonalAccessTokensTcpClient(connection);
+ partitionsClient = new PartitionsTcpClient(connection);
});
}
@@ -256,6 +268,58 @@ public class AsyncIggyTcpClient {
return topicsClient;
}
+ /**
+ * Returns the async system client for server system operations.
+ *
+ * @return the {@link SystemClient} instance
+ * @throws IggyNotConnectedException if the client is not connected
+ */
+ public SystemClient system() {
+ if (systemClient == null) {
+ throw new IggyNotConnectedException();
+ }
+ return systemClient;
+ }
+
+ /**
+ * Returns the async personal access tokens client for token management.
+ *
+ * @return the {@link PersonalAccessTokensClient} instance
+ * @throws IggyNotConnectedException if the client is not connected
+ */
+ public PersonalAccessTokensClient personalAccessTokens() {
+ if (personalAccessTokensClient == null) {
+ throw new IggyNotConnectedException();
+ }
+ return personalAccessTokensClient;
+ }
+
+ /**
+ * Returns the async partitions client for partition management.
+ *
+ * @return the {@link PartitionsClient} instance
+ * @throws IggyNotConnectedException if the client is not connected
+ */
+ public PartitionsClient partitions() {
+ if (partitionsClient == null) {
+ throw new IggyNotConnectedException();
+ }
+ return partitionsClient;
+ }
+
+ /**
+ * Returns the async consumer offsets client for offset management.
+ *
+ * @return the {@link ConsumerOffsetsClient} instance
+ * @throws IggyNotConnectedException if the client is not connected
+ */
+ public ConsumerOffsetsClient consumerOffsets() {
+ if (consumerOffsetsClient == null) {
+ throw new IggyNotConnectedException();
+ }
+ return consumerOffsetsClient;
+ }
+
/**
* Closes the TCP connection and releases all Netty resources.
*
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
index cd0da30c8..71133f155 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
@@ -21,14 +21,20 @@ package org.apache.iggy.client.async.tcp;
import io.netty.buffer.Unpooled;
import org.apache.iggy.client.async.ConsumerGroupsClient;
+import org.apache.iggy.consumergroup.ConsumerGroup;
+import org.apache.iggy.consumergroup.ConsumerGroupDetails;
import org.apache.iggy.identifier.ConsumerId;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesDeserializer;
import org.apache.iggy.serde.BytesSerializer;
import org.apache.iggy.serde.CommandCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
@@ -43,6 +49,91 @@ public class ConsumerGroupsTcpClient implements
ConsumerGroupsClient {
this.connection = connection;
}
+ @Override
+ public CompletableFuture<Optional<ConsumerGroupDetails>> getConsumerGroup(
+ StreamId streamId, TopicId topicId, ConsumerId groupId) {
+ var payload = BytesSerializer.toBytes(streamId);
+ payload.writeBytes(BytesSerializer.toBytes(topicId));
+ payload.writeBytes(BytesSerializer.toBytes(groupId));
+
+ log.debug("Getting consumer group - Stream: {}, Topic: {}, Group: {}",
streamId, topicId, groupId);
+
+ return connection
+ .send(CommandCode.ConsumerGroup.GET.getValue(), payload)
+ .thenApply(response -> {
+ try {
+ if (response.isReadable()) {
+ return
Optional.of(BytesDeserializer.readConsumerGroupDetails(response));
+ } else {
+ return Optional.empty();
+ }
+ } finally {
+ response.release();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<List<ConsumerGroup>> getConsumerGroups(StreamId
streamId, TopicId topicId) {
+ var payload = BytesSerializer.toBytes(streamId);
+ payload.writeBytes(BytesSerializer.toBytes(topicId));
+
+ log.debug("Getting consumer groups - Stream: {}, Topic: {}", streamId,
topicId);
+
+ return connection
+ .send(CommandCode.ConsumerGroup.GET_ALL.getValue(), payload)
+ .thenApply(response -> {
+ try {
+ List<ConsumerGroup> groups = new ArrayList<>();
+ while (response.isReadable()) {
+
groups.add(BytesDeserializer.readConsumerGroup(response));
+ }
+ return groups;
+ } finally {
+ response.release();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<ConsumerGroupDetails> createConsumerGroup(
+ StreamId streamId, TopicId topicId, String name) {
+ var streamIdBytes = BytesSerializer.toBytes(streamId);
+ var topicIdBytes = BytesSerializer.toBytes(topicId);
+ var payload = Unpooled.buffer(1 + streamIdBytes.readableBytes() +
topicIdBytes.readableBytes() + name.length());
+
+ payload.writeBytes(streamIdBytes);
+ payload.writeBytes(topicIdBytes);
+ payload.writeBytes(BytesSerializer.toBytes(name));
+
+ log.debug("Creating consumer group - Stream: {}, Topic: {}, Name: {}",
streamId, topicId, name);
+
+ return connection
+ .send(CommandCode.ConsumerGroup.CREATE.getValue(), payload)
+ .thenApply(response -> {
+ try {
+ return
BytesDeserializer.readConsumerGroupDetails(response);
+ } finally {
+ response.release();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteConsumerGroup(StreamId streamId,
TopicId topicId, ConsumerId groupId) {
+ var payload = BytesSerializer.toBytes(streamId);
+ payload.writeBytes(BytesSerializer.toBytes(topicId));
+ payload.writeBytes(BytesSerializer.toBytes(groupId));
+
+ log.debug("Deleting consumer group - Stream: {}, Topic: {}, Group:
{}", streamId, topicId, groupId);
+
+ return connection
+ .send(CommandCode.ConsumerGroup.DELETE.getValue(), payload)
+ .thenAccept(response -> {
+ response.release();
+ });
+ }
+
@Override
public CompletableFuture<Void> joinConsumerGroup(StreamId streamId,
TopicId topicId, ConsumerId groupId) {
var payload = Unpooled.buffer();
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerOffsetsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerOffsetsTcpClient.java
new file mode 100644
index 000000000..aa6c6e41c
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerOffsetsTcpClient.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import org.apache.iggy.client.async.ConsumerOffsetsClient;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.consumeroffset.ConsumerOffsetInfo;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async TCP implementation of consumer offsets client.
+ */
+public class ConsumerOffsetsTcpClient implements ConsumerOffsetsClient {
+ private static final Logger log =
LoggerFactory.getLogger(ConsumerOffsetsTcpClient.class);
+
+ private final AsyncTcpConnection connection;
+
+ public ConsumerOffsetsTcpClient(AsyncTcpConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public CompletableFuture<Void> storeConsumerOffset(
+ StreamId streamId, TopicId topicId, Optional<Long> partitionId,
Consumer consumer, BigInteger offset) {
+ var payload = BytesSerializer.toBytes(consumer);
+ payload.writeBytes(BytesSerializer.toBytes(streamId));
+ payload.writeBytes(BytesSerializer.toBytes(topicId));
+ payload.writeBytes(BytesSerializer.toBytes(partitionId));
+ payload.writeBytes(BytesSerializer.toBytesAsU64(offset));
+
+ log.debug(
+ "Storing consumer offset - Stream: {}, Topic: {}, Partition:
{}, Consumer: {}, Offset: {}",
+ streamId,
+ topicId,
+ partitionId,
+ consumer,
+ offset);
+
+ return connection
+ .send(CommandCode.ConsumerOffset.STORE.getValue(), payload)
+ .thenAccept(response -> {
+ response.release();
+ });
+ }
+
+ @Override
+ public CompletableFuture<Optional<ConsumerOffsetInfo>> getConsumerOffset(
+ StreamId streamId, TopicId topicId, Optional<Long> partitionId,
Consumer consumer) {
+ var payload = BytesSerializer.toBytes(consumer);
+ payload.writeBytes(BytesSerializer.toBytes(streamId));
+ payload.writeBytes(BytesSerializer.toBytes(topicId));
+ payload.writeBytes(BytesSerializer.toBytes(partitionId));
+
+ log.debug(
+ "Getting consumer offset - Stream: {}, Topic: {}, Partition:
{}, Consumer: {}",
+ streamId,
+ topicId,
+ partitionId,
+ consumer);
+
+ return connection
+ .send(CommandCode.ConsumerOffset.GET.getValue(), payload)
+ .thenApply(response -> {
+ try {
+ if (response.isReadable()) {
+ return
Optional.of(BytesDeserializer.readConsumerOffsetInfo(response));
+ } else {
+ return Optional.empty();
+ }
+ } finally {
+ response.release();
+ }
+ });
+ }
+}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PartitionsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PartitionsTcpClient.java
new file mode 100644
index 000000000..c142bf683
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PartitionsTcpClient.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import org.apache.iggy.client.async.PartitionsClient;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async TCP implementation of partitions client.
+ */
+public class PartitionsTcpClient implements PartitionsClient {
+ private static final Logger log =
LoggerFactory.getLogger(PartitionsTcpClient.class);
+
+ private final AsyncTcpConnection connection;
+
+ public PartitionsTcpClient(AsyncTcpConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public CompletableFuture<Void> createPartitions(StreamId streamId, TopicId
topicId, Long partitionsCount) {
+ var payload = BytesSerializer.toBytes(streamId);
+ payload.writeBytes(BytesSerializer.toBytes(topicId));
+ payload.writeIntLE(partitionsCount.intValue());
+
+ log.debug("Creating {} partitions for stream: {}, topic: {}",
partitionsCount, streamId, topicId);
+
+ return connection.send(CommandCode.Partition.CREATE.getValue(),
payload).thenAccept(response -> {
+ response.release();
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> deletePartitions(StreamId streamId, TopicId
topicId, Long partitionsCount) {
+ var payload = BytesSerializer.toBytes(streamId);
+ payload.writeBytes(BytesSerializer.toBytes(topicId));
+ payload.writeIntLE(partitionsCount.intValue());
+
+ log.debug("Deleting {} partitions for stream: {}, topic: {}",
partitionsCount, streamId, topicId);
+
+ return connection.send(CommandCode.Partition.DELETE.getValue(),
payload).thenAccept(response -> {
+ response.release();
+ });
+ }
+}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PersonalAccessTokensTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PersonalAccessTokensTcpClient.java
new file mode 100644
index 000000000..fe31a549d
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/PersonalAccessTokensTcpClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import io.netty.buffer.Unpooled;
+import org.apache.iggy.client.async.PersonalAccessTokensClient;
+import org.apache.iggy.personalaccesstoken.PersonalAccessTokenInfo;
+import org.apache.iggy.personalaccesstoken.RawPersonalAccessToken;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
+import org.apache.iggy.user.IdentityInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async TCP implementation of personal access tokens client.
+ */
+public class PersonalAccessTokensTcpClient implements
PersonalAccessTokensClient {
+ private static final Logger log =
LoggerFactory.getLogger(PersonalAccessTokensTcpClient.class);
+
+ private final AsyncTcpConnection connection;
+
+ public PersonalAccessTokensTcpClient(AsyncTcpConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public CompletableFuture<RawPersonalAccessToken>
createPersonalAccessToken(String name, BigInteger expiry) {
+ var payload = Unpooled.buffer();
+ payload.writeBytes(BytesSerializer.toBytes(name));
+ payload.writeBytes(BytesSerializer.toBytesAsU64(expiry));
+
+ log.debug("Creating personal access token: {}", name);
+
+ return connection
+ .send(CommandCode.PersonalAccessToken.CREATE.getValue(),
payload)
+ .thenApply(response -> {
+ try {
+ return
BytesDeserializer.readRawPersonalAccessToken(response);
+ } finally {
+ response.release();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<List<PersonalAccessTokenInfo>>
getPersonalAccessTokens() {
+ var payload = Unpooled.EMPTY_BUFFER;
+
+ log.debug("Getting all personal access tokens");
+
+ return connection
+ .send(CommandCode.PersonalAccessToken.GET_ALL.getValue(),
payload)
+ .thenApply(response -> {
+ try {
+ List<PersonalAccessTokenInfo> tokens = new
ArrayList<>();
+ while (response.isReadable()) {
+
tokens.add(BytesDeserializer.readPersonalAccessTokenInfo(response));
+ }
+ return tokens;
+ } finally {
+ response.release();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> deletePersonalAccessToken(String name) {
+ var payload = BytesSerializer.toBytes(name);
+
+ log.debug("Deleting personal access token: {}", name);
+
+ return connection
+ .send(CommandCode.PersonalAccessToken.DELETE.getValue(),
payload)
+ .thenAccept(response -> {
+ response.release();
+ });
+ }
+
+ @Override
+ public CompletableFuture<IdentityInfo> loginWithPersonalAccessToken(String
token) {
+ var payload = BytesSerializer.toBytes(token);
+
+ log.debug("Logging in with personal access token");
+
+ return connection
+ .send(CommandCode.PersonalAccessToken.LOGIN.getValue(),
payload)
+ .thenApply(response -> {
+ try {
+ var userId = response.readUnsignedIntLE();
+ return new IdentityInfo(userId, Optional.empty());
+ } finally {
+ response.release();
+ }
+ });
+ }
+}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/SystemTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/SystemTcpClient.java
new file mode 100644
index 000000000..a8d144952
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/SystemTcpClient.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import io.netty.buffer.Unpooled;
+import org.apache.iggy.client.async.SystemClient;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
+import org.apache.iggy.system.ClientInfo;
+import org.apache.iggy.system.ClientInfoDetails;
+import org.apache.iggy.system.Stats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async TCP implementation of system client.
+ */
+public class SystemTcpClient implements SystemClient {
+ private static final Logger log =
LoggerFactory.getLogger(SystemTcpClient.class);
+
+ private final AsyncTcpConnection connection;
+
+ public SystemTcpClient(AsyncTcpConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public CompletableFuture<Stats> getStats() {
+ var payload = Unpooled.EMPTY_BUFFER;
+
+ log.debug("Getting server statistics");
+
+ return connection.send(CommandCode.System.GET_STATS.getValue(),
payload).thenApply(response -> {
+ try {
+ return BytesDeserializer.readStats(response);
+ } finally {
+ response.release();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<ClientInfoDetails> getMe() {
+ var payload = Unpooled.EMPTY_BUFFER;
+
+ log.debug("Getting current client info");
+
+ return connection.send(CommandCode.System.GET_ME.getValue(),
payload).thenApply(response -> {
+ try {
+ return BytesDeserializer.readClientInfoDetails(response);
+ } finally {
+ response.release();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<ClientInfoDetails> getClient(Long clientId) {
+ var payload = Unpooled.buffer(4);
+ payload.writeIntLE(clientId.intValue());
+
+ log.debug("Getting client info for client ID: {}", clientId);
+
+ return connection
+ .send(CommandCode.System.GET_CLIENT.getValue(), payload)
+ .thenApply(response -> {
+ try {
+ return
BytesDeserializer.readClientInfoDetails(response);
+ } finally {
+ response.release();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<List<ClientInfo>> getClients() {
+ var payload = Unpooled.EMPTY_BUFFER;
+
+ log.debug("Getting all clients");
+
+ return connection
+ .send(CommandCode.System.GET_ALL_CLIENTS.getValue(), payload)
+ .thenApply(response -> {
+ try {
+ List<ClientInfo> clients = new ArrayList<>();
+ while (response.isReadable()) {
+
clients.add(BytesDeserializer.readClientInfo(response));
+ }
+ return clients;
+ } finally {
+ response.release();
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<String> ping() {
+ var payload = Unpooled.EMPTY_BUFFER;
+
+ log.debug("Pinging server");
+
+ return connection.send(CommandCode.System.PING.getValue(),
payload).thenApply(response -> {
+ response.release();
+ return "";
+ });
+ }
+}
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
index f529cffc7..87b315266 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
@@ -22,6 +22,7 @@ package org.apache.iggy.client.async;
import org.apache.iggy.client.BaseIntegrationTest;
import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.ConsumerId;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
import org.apache.iggy.message.Message;
@@ -387,4 +388,255 @@ public class AsyncClientIntegrationTest extends
BaseIntegrationTest {
log.info("Successfully completed {} concurrent operations",
operations.size());
}
+
+ // ===== System client tests =====
+
+ @Test
+ @Order(11)
+ public void testGetStats() throws Exception {
+ log.info("Testing system getStats");
+
+ var stats = client.system().getStats().get(5, TimeUnit.SECONDS);
+
+ assertThat(stats).isNotNull();
+ log.info("Successfully retrieved server stats");
+ }
+
+ @Test
+ @Order(12)
+ public void testGetMe() throws Exception {
+ log.info("Testing system getMe");
+
+ var me = client.system().getMe().get(5, TimeUnit.SECONDS);
+
+ assertThat(me).isNotNull();
+ assertThat(me.clientId()).isGreaterThan(0);
+ log.info("Successfully retrieved current client info, clientId: {}",
me.clientId());
+ }
+
+ @Test
+ @Order(13)
+ public void testGetClients() throws Exception {
+ log.info("Testing system getClients");
+
+ var clients = client.system().getClients().get(5, TimeUnit.SECONDS);
+
+ assertThat(clients).isNotNull();
+ assertThat(clients).isNotEmpty();
+ log.info("Successfully retrieved {} connected clients",
clients.size());
+ }
+
+ @Test
+ @Order(14)
+ public void testGetClient() throws Exception {
+ log.info("Testing system getClient");
+
+ var me = client.system().getMe().get(5, TimeUnit.SECONDS);
+ var clientInfo = client.system().getClient(me.clientId()).get(5,
TimeUnit.SECONDS);
+
+ assertThat(clientInfo).isNotNull();
+ assertThat(clientInfo.clientId()).isEqualTo(me.clientId());
+ log.info("Successfully retrieved client info for clientId: {}",
clientInfo.clientId());
+ }
+
+ // ===== Consumer groups client tests =====
+
+ @Test
+ @Order(20)
+ public void testCreateAndGetConsumerGroup() throws Exception {
+ log.info("Testing consumer group create and get");
+
+ String groupName = "async-test-group";
+ var group = client.consumerGroups()
+ .createConsumerGroup(StreamId.of(TEST_STREAM),
TopicId.of(TEST_TOPIC), groupName)
+ .get(5, TimeUnit.SECONDS);
+
+ assertThat(group).isNotNull();
+ assertThat(group.name()).isEqualTo(groupName);
+ log.info("Successfully created consumer group: {}", group.name());
+
+ // Get by ID
+ var retrieved = client.consumerGroups()
+ .getConsumerGroup(StreamId.of(TEST_STREAM),
TopicId.of(TEST_TOPIC), ConsumerId.of(group.id()))
+ .get(5, TimeUnit.SECONDS);
+
+ assertThat(retrieved).isPresent();
+ assertThat(retrieved.get().id()).isEqualTo(group.id());
+ log.info("Successfully retrieved consumer group by ID: {}",
group.id());
+ }
+
+ @Test
+ @Order(21)
+ public void testGetConsumerGroups() throws Exception {
+ log.info("Testing consumer groups list");
+
+ var groups = client.consumerGroups()
+ .getConsumerGroups(StreamId.of(TEST_STREAM),
TopicId.of(TEST_TOPIC))
+ .get(5, TimeUnit.SECONDS);
+
+ assertThat(groups).isNotNull();
+ assertThat(groups).isNotEmpty();
+ log.info("Successfully retrieved {} consumer groups", groups.size());
+ }
+
+ @Test
+ @Order(22)
+ public void testDeleteConsumerGroup() throws Exception {
+ log.info("Testing consumer group deletion");
+
+ String groupName = "async-delete-group";
+ var group = client.consumerGroups()
+ .createConsumerGroup(StreamId.of(TEST_STREAM),
TopicId.of(TEST_TOPIC), groupName)
+ .get(5, TimeUnit.SECONDS);
+
+ client.consumerGroups()
+ .deleteConsumerGroup(StreamId.of(TEST_STREAM),
TopicId.of(TEST_TOPIC), ConsumerId.of(group.id()))
+ .get(5, TimeUnit.SECONDS);
+
+ var deleted = client.consumerGroups()
+ .getConsumerGroup(StreamId.of(TEST_STREAM),
TopicId.of(TEST_TOPIC), ConsumerId.of(group.id()))
+ .get(5, TimeUnit.SECONDS);
+
+ assertThat(deleted).isEmpty();
+ log.info("Successfully deleted consumer group: {}", groupName);
+ }
+
+ // ===== Consumer offsets client tests =====
+
+ @Test
+ @Order(30)
+ public void testStoreAndGetConsumerOffset() throws Exception {
+ log.info("Testing consumer offset store and get");
+
+ // Send message to partition 0 to ensure it is not empty so we can
store offset 0
+ // Note: storeConsumerOffset with empty partitionId defaults to
partition 0 on server
+ client.messages()
+ .sendMessages(
+ StreamId.of(TEST_STREAM),
+ TopicId.of(TEST_TOPIC),
+ Partitioning.partitionId(0L),
+ List.of(Message.of("test")))
+ .get(5, TimeUnit.SECONDS);
+
+ var consumer = new Consumer(Consumer.Kind.Consumer,
ConsumerId.of(5000L));
+ var offset = BigInteger.valueOf(0);
+
+ client.consumerOffsets()
+ .storeConsumerOffset(
+ StreamId.of(TEST_STREAM), TopicId.of(TEST_TOPIC),
Optional.empty(), consumer, offset)
+ .get(5, TimeUnit.SECONDS);
+
+ log.info("Successfully stored consumer offset: {}", offset);
+
+ var retrieved = client.consumerOffsets()
+ .getConsumerOffset(StreamId.of(TEST_STREAM),
TopicId.of(TEST_TOPIC), Optional.of(0L), consumer)
+ .get(5, TimeUnit.SECONDS);
+
+ assertThat(retrieved).isPresent();
+ log.info("Successfully retrieved consumer offset");
+ }
+
+ // ===== Partitions client tests =====
+
+ @Test
+ @Order(40)
+ public void testCreateAndDeletePartitions() throws Exception {
+ log.info("Testing partition create and delete");
+
+ // Get initial partition count
+ var topicBefore = client.topics()
+ .getTopic(StreamId.of(TEST_STREAM), TopicId.of(TEST_TOPIC))
+ .get(5, TimeUnit.SECONDS);
+ assertThat(topicBefore).isPresent();
+ long initialCount = topicBefore.get().partitionsCount();
+ log.info("Initial partition count: {}", initialCount);
+
+ // Create additional partitions
+ client.partitions()
+ .createPartitions(StreamId.of(TEST_STREAM),
TopicId.of(TEST_TOPIC), 3L)
+ .get(5, TimeUnit.SECONDS);
+
+ var topicAfterCreate = client.topics()
+ .getTopic(StreamId.of(TEST_STREAM), TopicId.of(TEST_TOPIC))
+ .get(5, TimeUnit.SECONDS);
+ assertThat(topicAfterCreate).isPresent();
+
assertThat(topicAfterCreate.get().partitionsCount()).isEqualTo(initialCount +
3);
+ log.info("Partition count after create: {}",
topicAfterCreate.get().partitionsCount());
+
+ // Delete the added partitions
+ client.partitions()
+ .deletePartitions(StreamId.of(TEST_STREAM),
TopicId.of(TEST_TOPIC), 3L)
+ .get(5, TimeUnit.SECONDS);
+
+ var topicAfterDelete = client.topics()
+ .getTopic(StreamId.of(TEST_STREAM), TopicId.of(TEST_TOPIC))
+ .get(5, TimeUnit.SECONDS);
+ assertThat(topicAfterDelete).isPresent();
+
assertThat(topicAfterDelete.get().partitionsCount()).isEqualTo(initialCount);
+ log.info("Partition count after delete: {}",
topicAfterDelete.get().partitionsCount());
+ }
+
+ // ===== Personal access tokens client tests =====
+
+ @Test
+ @Order(50)
+ public void testCreateAndDeletePersonalAccessToken() throws Exception {
+ log.info("Testing personal access token create and delete");
+
+ String tokenName = "async-test-token";
+ var token = client.personalAccessTokens()
+ .createPersonalAccessToken(tokenName,
BigInteger.valueOf(50_000))
+ .get(5, TimeUnit.SECONDS);
+
+ assertThat(token).isNotNull();
+ log.info("Successfully created personal access token: {}", tokenName);
+
+ // List tokens
+ var tokens =
client.personalAccessTokens().getPersonalAccessTokens().get(5,
TimeUnit.SECONDS);
+
+ assertThat(tokens).isNotNull();
+ assertThat(tokens).anyMatch(t -> t.name().equals(tokenName));
+ log.info("Found {} personal access tokens", tokens.size());
+
+ // Delete token
+
client.personalAccessTokens().deletePersonalAccessToken(tokenName).get(5,
TimeUnit.SECONDS);
+
+ var tokensAfterDelete =
+ client.personalAccessTokens().getPersonalAccessTokens().get(5,
TimeUnit.SECONDS);
+
+ assertThat(tokensAfterDelete).noneMatch(t ->
t.name().equals(tokenName));
+ log.info("Successfully deleted personal access token: {}", tokenName);
+ }
+
+ @Test
+ @Order(51)
+ public void testLoginWithPersonalAccessToken() throws Exception {
+ log.info("Testing login with personal access token");
+
+ String tokenName = "async-login-token";
+ var token = client.personalAccessTokens()
+ .createPersonalAccessToken(tokenName,
BigInteger.valueOf(50_000))
+ .get(5, TimeUnit.SECONDS);
+
+ assertThat(token).isNotNull();
+ assertThat(token.token()).isNotEmpty();
+ log.info("Created token for login test");
+
+ // Login with PAT using a separate client
+ var patClient = new AsyncIggyTcpClient(serverHost(), serverTcpPort());
+ try {
+ patClient.connect().get(5, TimeUnit.SECONDS);
+ var identity = patClient
+ .personalAccessTokens()
+ .loginWithPersonalAccessToken(token.token())
+ .get(5, TimeUnit.SECONDS);
+
+ assertThat(identity).isNotNull();
+ log.info("Successfully logged in with personal access token");
+ } finally {
+ patClient.close().get(5, TimeUnit.SECONDS);
+ // Clean up token
+
client.personalAccessTokens().deletePersonalAccessToken(tokenName).get(5,
TimeUnit.SECONDS);
+ }
+ }
}
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
index aa27421dd..f3241c51f 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
@@ -73,6 +73,10 @@ class AsyncIggyTcpClientBuilderTest extends
BaseIntegrationTest {
assertNotNull(client.streams());
assertNotNull(client.topics());
assertNotNull(client.consumerGroups());
+ assertNotNull(client.system());
+ assertNotNull(client.personalAccessTokens());
+ assertNotNull(client.partitions());
+ assertNotNull(client.consumerOffsets());
}
@Test
@@ -161,6 +165,10 @@ class AsyncIggyTcpClientBuilderTest extends
BaseIntegrationTest {
assertNotNull(client.streams(), "Streams client should not be null");
assertNotNull(client.topics(), "Topics client should not be null");
assertNotNull(client.consumerGroups(), "Consumer groups client should
not be null");
+ assertNotNull(client.system(), "System client should not be null");
+ assertNotNull(client.personalAccessTokens(), "Personal access tokens
client should not be null");
+ assertNotNull(client.partitions(), "Partitions client should not be
null");
+ assertNotNull(client.consumerOffsets(), "Consumer offsets client
should not be null");
}
@Test
@@ -242,6 +250,46 @@ class AsyncIggyTcpClientBuilderTest extends
BaseIntegrationTest {
assertThrows(IggyNotConnectedException.class, () ->
client.consumerGroups());
}
+ @Test
+ void testThrowExceptionWhenAccessingSystemBeforeConnect() {
+ client = AsyncIggyTcpClient.builder()
+ .host(serverHost())
+ .port(serverTcpPort())
+ .build();
+
+ assertThrows(IggyNotConnectedException.class, () -> client.system());
+ }
+
+ @Test
+ void testThrowExceptionWhenAccessingPersonalAccessTokensBeforeConnect() {
+ client = AsyncIggyTcpClient.builder()
+ .host(serverHost())
+ .port(serverTcpPort())
+ .build();
+
+ assertThrows(IggyNotConnectedException.class, () ->
client.personalAccessTokens());
+ }
+
+ @Test
+ void testThrowExceptionWhenAccessingPartitionsBeforeConnect() {
+ client = AsyncIggyTcpClient.builder()
+ .host(serverHost())
+ .port(serverTcpPort())
+ .build();
+
+ assertThrows(IggyNotConnectedException.class, () ->
client.partitions());
+ }
+
+ @Test
+ void testThrowExceptionWhenAccessingConsumerOffsetsBeforeConnect() {
+ client = AsyncIggyTcpClient.builder()
+ .host(serverHost())
+ .port(serverTcpPort())
+ .build();
+
+ assertThrows(IggyNotConnectedException.class, () ->
client.consumerOffsets());
+ }
+
@Test
void testThrowExceptionWhenLoginWithoutCredentials() throws Exception {
client = AsyncIggyTcpClient.builder()
@@ -407,6 +455,10 @@ class AsyncIggyTcpClientBuilderTest extends
BaseIntegrationTest {
assertNotNull(client.streams());
assertNotNull(client.topics());
assertNotNull(client.consumerGroups());
+ assertNotNull(client.system());
+ assertNotNull(client.personalAccessTokens());
+ assertNotNull(client.partitions());
+ assertNotNull(client.consumerOffsets());
}
@Test
@@ -423,6 +475,10 @@ class AsyncIggyTcpClientBuilderTest extends
BaseIntegrationTest {
assertNotNull(client.streams());
assertNotNull(client.topics());
assertNotNull(client.consumerGroups());
+ assertNotNull(client.system());
+ assertNotNull(client.personalAccessTokens());
+ assertNotNull(client.partitions());
+ assertNotNull(client.consumerOffsets());
}
@Test