This is an automated email from the ASF dual-hosted git repository.
kirktrue pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 67850b0b68d KAFKA-20136: Add javadoc to public APIs in
org.apache.kafka.clients.consumer module (#21429)
67850b0b68d is described below
commit 67850b0b68d92d39c095cc91bf455d9445336e09
Author: zoo-code <[email protected]>
AuthorDate: Thu Apr 2 01:17:02 2026 +0900
KAFKA-20136: Add javadoc to public APIs in
org.apache.kafka.clients.consumer module (#21429)
### Summary
This PR adds missing Javadocs to public APIs within the
`org.apache.kafka.clients.consumer` package to improve documentation
coverage and consistency with project standards.
### Changes
- Added missing Javadocs to **public classes, methods, and fields** in
18 files.
- Focused on ensuring all public APIs are properly documented according
to Kafka guidelines.
### Test Plan
- Verified that the Javadoc formatting follows existing Kafka
conventions.
- No functional changes, documentation only.
---
.../kafka/clients/consumer/AcknowledgeType.java | 9 +-
.../kafka/clients/consumer/CloseOptions.java | 33 ++++--
.../clients/consumer/CommitFailedException.java | 15 ++-
.../kafka/clients/consumer/ConsumerConfig.java | 20 ++++
.../clients/consumer/ConsumerGroupMetadata.java | 20 ++++
.../consumer/ConsumerPartitionAssignor.java | 127 +++++++++++++++++++++
.../kafka/clients/consumer/ConsumerRecords.java | 24 +++-
.../kafka/clients/consumer/GroupProtocol.java | 11 ++
.../clients/consumer/InvalidOffsetException.java | 10 ++
.../clients/consumer/LogTruncationException.java | 13 +++
.../kafka/clients/consumer/MockConsumer.java | 71 ++++++++++++
.../kafka/clients/consumer/MockShareConsumer.java | 3 +
.../consumer/NoOffsetForPartitionException.java | 14 ++-
.../kafka/clients/consumer/OffsetAndMetadata.java | 9 +-
.../kafka/clients/consumer/OffsetAndTimestamp.java | 25 ++++
.../consumer/OffsetOutOfRangeException.java | 11 ++
.../consumer/RetriableCommitFailedException.java | 25 ++++
.../clients/consumer/SubscriptionPattern.java | 5 +
18 files changed, 427 insertions(+), 18 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
index 8f6cf41ba83..74ddee17c08 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
@@ -35,6 +35,7 @@ public enum AcknowledgeType {
/** The record is still being processed. Renew the acquisition lock so
processing can continue. */
RENEW((byte) 4);
+ /** The unique identifier for this acknowledge type. */
public final byte id;
AcknowledgeType(byte id) {
@@ -46,7 +47,13 @@ public enum AcknowledgeType {
return super.toString().toLowerCase(Locale.ROOT);
}
-
+ /**
+ * Returns the AcknowledgeType for the given identifier.
+ *
+ * @param id The identifier for the acknowledge type
+ * @return The corresponding AcknowledgeType
+ * @throws IllegalArgumentException If the ID is not recognized
+ */
public static AcknowledgeType forId(byte id) {
switch (id) {
case 1:
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
index 0aae57d39cf..70619f69748 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
@@ -22,6 +22,13 @@ import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
+/**
+ * Options for controlling the consumer close behavior.
+ * <p>
+ * This class allows customization of the close timeout and group membership
operation
+ * when a consumer is being shut down.
+ * </p>
+ */
public class CloseOptions {
/**
* Enum to specify the group membership operation upon leaving group.
@@ -62,8 +69,8 @@ public class CloseOptions {
/**
* Static method to create a {@code CloseOptions} with a custom timeout.
*
- * @param timeout the maximum time to wait for the consumer to close.
- * @return a new {@code CloseOptions} instance with the specified timeout.
+ * @param timeout The maximum time to wait for the consumer to close.
+ * @return A new {@code CloseOptions} instance with the specified timeout.
*/
public static CloseOptions timeout(final Duration timeout) {
return new CloseOptions().withTimeout(timeout);
@@ -72,9 +79,9 @@ public class CloseOptions {
/**
* Static method to create a {@code CloseOptions} with a specified group
membership operation.
*
- * @param operation the group membership operation to apply. Must be one
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
+ * @param operation The group membership operation to apply. Must be one
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
* or {@code DEFAULT}.
- * @return a new {@code CloseOptions} instance with the specified group
membership operation.
+ * @return A new {@code CloseOptions} instance with the specified group
membership operation.
*/
public static CloseOptions groupMembershipOperation(final
GroupMembershipOperation operation) {
return new CloseOptions().withGroupMembershipOperation(operation);
@@ -83,8 +90,8 @@ public class CloseOptions {
/**
* Fluent method to set the timeout for the close process.
*
- * @param timeout the maximum time to wait for the consumer to close. If
{@code null}, the default timeout will be used.
- * @return this {@code CloseOptions} instance.
+ * @param timeout The maximum time to wait for the consumer to close. If
{@code null}, the default timeout will be used.
+ * @return This {@code CloseOptions} instance.
*/
public CloseOptions withTimeout(final Duration timeout) {
this.timeout = Optional.ofNullable(timeout);
@@ -94,18 +101,28 @@ public class CloseOptions {
/**
* Fluent method to set the group membership operation upon shutdown.
*
- * @param operation the group membership operation to apply. Must be one
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
- * @return this {@code CloseOptions} instance.
+ * @param operation The group membership operation to apply. Must be one
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
+ * @return This {@code CloseOptions} instance.
*/
public CloseOptions withGroupMembershipOperation(final
GroupMembershipOperation operation) {
this.operation = Objects.requireNonNull(operation, "operation should
not be null");
return this;
}
+ /**
+ * Returns the group membership operation configured for this close.
+ *
+ * @return The group membership operation
+ */
public GroupMembershipOperation groupMembershipOperation() {
return operation;
}
+ /**
+ * Returns the timeout configured for this close.
+ *
+ * @return The timeout, or empty if using the default timeout
+ */
public Optional<Duration> timeout() {
return timeout;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
index 2040216cd02..720fc70301d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
@@ -20,18 +20,27 @@ import org.apache.kafka.common.KafkaException;
/**
* This exception is raised when an offset commit with {@link
KafkaConsumer#commitSync()} fails
- * with an unrecoverable error. This can happen when a group rebalance
completes before the commit
- * could be successfully applied. In this case, the commit cannot generally be
retried because some
- * of the partitions may have already been assigned to another member in the
group.
+ * with an unrecoverable error. This exception is generated on the client
side, typically when
+ * a group rebalance completes before the commit could be successfully
applied. In this case,
+ * the commit cannot generally be retried because some of the partitions may
have already been
+ * assigned to another member in the group.
*/
public class CommitFailedException extends KafkaException {
private static final long serialVersionUID = 1L;
+ /**
+ * Constructs a new CommitFailedException with the specified detail
message.
+ *
+ * @param message The error message
+ */
public CommitFailedException(final String message) {
super(message);
}
+ /**
+ * Constructs a new CommitFailedException with a default message
explaining the cause of the commit failure.
+ */
public CommitFailedException() {
super("Commit cannot be completed since the group has already " +
"rebalanced and assigned the partitions to another member.
This means that the time " +
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 7ad52c21226..d8608e3d79d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -788,10 +788,20 @@ public class ConsumerConfig extends AbstractConfig {
}
}
+ /**
+ * Constructs a new ConsumerConfig with the given properties.
+ *
+ * @param props The consumer configuration properties
+ */
public ConsumerConfig(Properties props) {
super(CONFIG, props);
}
+ /**
+ * Constructs a new ConsumerConfig with the given properties.
+ *
+ * @param props The consumer configuration properties
+ */
public ConsumerConfig(Map<String, Object> props) {
super(CONFIG, props);
}
@@ -800,10 +810,20 @@ public class ConsumerConfig extends AbstractConfig {
super(CONFIG, props, doLog);
}
+ /**
+ * Returns the set of all configuration keys.
+ *
+ * @return The set of configuration keys
+ */
public static Set<String> configNames() {
return CONFIG.names();
}
+ /**
+ * Returns the configuration definition.
+ *
+ * @return The configuration definition
+ */
public static ConfigDef configDef() {
return new ConfigDef(CONFIG);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java
index cfbc69f6209..95f4ff23fdf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java
@@ -56,18 +56,38 @@ public class ConsumerGroupMetadata {
Optional.empty());
}
+ /**
+ * Returns the consumer group ID.
+ *
+ * @return The group ID
+ */
public String groupId() {
return groupId;
}
+ /**
+ * Returns the generation ID of the consumer group.
+ *
+ * @return The generation ID
+ */
public int generationId() {
return generationId;
}
+ /**
+ * Returns the member ID of this consumer.
+ *
+ * @return The member ID
+ */
public String memberId() {
return memberId;
}
+ /**
+ * Returns the group instance ID if this is a static member.
+ *
+ * @return The group instance ID, or empty if this is a dynamic member
+ */
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index 45cb505c744..f7158a9fd8d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -102,6 +102,9 @@ public interface ConsumerPartitionAssignor {
*/
String name();
+ /**
+ * Represents a consumer's subscription information including topics, user
data, and owned partitions.
+ */
final class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
@@ -110,6 +113,15 @@ public interface ConsumerPartitionAssignor {
private Optional<String> groupInstanceId;
private final Optional<Integer> generationId;
+ /**
+ * Constructs a subscription with full details.
+ *
+ * @param topics The list of topics to subscribe to
+ * @param userData Nullable user data to include in the subscription
+ * @param ownedPartitions The partitions currently owned by this
consumer
+ * @param generationId The generation ID of the consumer group
+ * @param rackId Optional rack ID for rack-aware assignment
+ */
public Subscription(List<String> topics, ByteBuffer userData,
List<TopicPartition> ownedPartitions, int generationId, Optional<String>
rackId) {
this.topics = topics;
this.userData = userData;
@@ -119,42 +131,95 @@ public interface ConsumerPartitionAssignor {
this.rackId = rackId;
}
+ /**
+ * Constructs a subscription without generation ID and rack ID.
+ *
+ * @param topics The list of topics to subscribe to
+ * @param userData Nullable user data to include in the subscription
+ * @param ownedPartitions The partitions currently owned by this
consumer
+ */
public Subscription(List<String> topics, ByteBuffer userData,
List<TopicPartition> ownedPartitions) {
this(topics, userData, ownedPartitions, DEFAULT_GENERATION,
Optional.empty());
}
+ /**
+ * Constructs a subscription without owned partitions.
+ *
+ * @param topics The list of topics to subscribe to
+ * @param userData Nullable user data to include in the subscription
+ */
public Subscription(List<String> topics, ByteBuffer userData) {
this(topics, userData, Collections.emptyList(),
DEFAULT_GENERATION, Optional.empty());
}
+ /**
+ * Constructs a basic subscription with only topics.
+ *
+ * @param topics The list of topics to subscribe to
+ */
public Subscription(List<String> topics) {
this(topics, null, Collections.emptyList(), DEFAULT_GENERATION,
Optional.empty());
}
+ /**
+ * Returns the list of topics subscribed to.
+ *
+ * @return The list of topics
+ */
public List<String> topics() {
return topics;
}
+ /**
+ * Returns the user data included in the subscription.
+ *
+ * @return The user data, or null if none was provided
+ */
public ByteBuffer userData() {
return userData;
}
+ /**
+ * Returns the partitions currently owned by this consumer.
+ *
+ * @return The list of owned partitions
+ */
public List<TopicPartition> ownedPartitions() {
return ownedPartitions;
}
+ /**
+ * Returns the rack ID for rack-aware assignment.
+ *
+ * @return The rack ID, or empty if not provided
+ */
public Optional<String> rackId() {
return rackId;
}
+ /**
+ * Sets the group instance ID for static membership.
+ *
+ * @param groupInstanceId The group instance ID
+ */
public void setGroupInstanceId(Optional<String> groupInstanceId) {
this.groupInstanceId = groupInstanceId;
}
+ /**
+ * Returns the group instance ID if this is a static member.
+ *
+ * @return The group instance ID, or empty if this is a dynamic member
+ */
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
+ /**
+ * Returns the generation ID of the consumer group.
+ *
+ * @return The generation ID, or empty if not provided
+ */
public Optional<Integer> generationId() {
return generationId;
}
@@ -172,23 +237,47 @@ public interface ConsumerPartitionAssignor {
}
}
+ /**
+ * Represents the partition assignment for a consumer.
+ */
final class Assignment {
private final List<TopicPartition> partitions;
private final ByteBuffer userData;
+ /**
+ * Constructs an assignment with partitions and user data.
+ *
+ * @param partitions The list of partitions assigned to the consumer
+ * @param userData Nullable user data to include in the assignment
+ */
public Assignment(List<TopicPartition> partitions, ByteBuffer
userData) {
this.partitions = partitions;
this.userData = userData;
}
+ /**
+ * Constructs an assignment with only partitions.
+ *
+ * @param partitions The list of partitions assigned to the consumer
+ */
public Assignment(List<TopicPartition> partitions) {
this(partitions, null);
}
+ /**
+ * Returns the list of partitions assigned to the consumer.
+ *
+ * @return The list of partitions
+ */
public List<TopicPartition> partitions() {
return partitions;
}
+ /**
+ * Returns the user data included in the assignment.
+ *
+ * @return The user data, or null if none was provided
+ */
public ByteBuffer userData() {
return userData;
}
@@ -202,13 +291,26 @@ public interface ConsumerPartitionAssignor {
}
}
+ /**
+ * Represents the subscriptions of all members in a consumer group.
+ */
final class GroupSubscription {
private final Map<String, Subscription> subscriptions;
+ /**
+ * Constructs a group subscription with member subscriptions.
+ *
+ * @param subscriptions A map from member ID to their subscription
+ */
public GroupSubscription(Map<String, Subscription> subscriptions) {
this.subscriptions = subscriptions;
}
+ /**
+ * Returns the subscriptions of all members in the group.
+ *
+ * @return A map from member ID to their subscription
+ */
public Map<String, Subscription> groupSubscription() {
return subscriptions;
}
@@ -221,13 +323,26 @@ public interface ConsumerPartitionAssignor {
}
}
+ /**
+ * Represents the partition assignments for all members in a consumer
group.
+ */
final class GroupAssignment {
private final Map<String, Assignment> assignments;
+ /**
+ * Constructs a group assignment with member assignments.
+ *
+ * @param assignments A map from member ID to their partition
assignment
+ */
public GroupAssignment(Map<String, Assignment> assignments) {
this.assignments = assignments;
}
+ /**
+ * Returns the partition assignments for all members in the group.
+ *
+ * @return A map from member ID to their partition assignment
+ */
public Map<String, Assignment> groupAssignment() {
return assignments;
}
@@ -266,10 +381,22 @@ public interface ConsumerPartitionAssignor {
this.id = id;
}
+ /**
+ * Returns the unique identifier for this rebalance protocol.
+ *
+ * @return The protocol ID
+ */
public byte id() {
return id;
}
+ /**
+ * Returns the rebalance protocol for the given identifier.
+ *
+ * @param id The identifier for the rebalance protocol
+ * @return The corresponding rebalance protocol
+ * @throws IllegalArgumentException If the ID is not recognized
+ */
public static RebalanceProtocol forId(byte id) {
switch (id) {
case 0:
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 0cb3b8f9677..4f179492d2e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -45,6 +45,14 @@ public class ConsumerRecords<K, V> implements
Iterable<ConsumerRecord<K, V>> {
this(records, Map.of());
}
+ /**
+ * Constructs a new ConsumerRecords with the given records and next
offsets.
+ *
+ * @param records The records for each partition
+ * @param nextOffsets The next offset and metadata for each partition
whose position was advanced
+ * during the poll call. These represent the offsets
that the consumer will
+ * start reading from on the next poll.
+ */
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>>
records, final Map<TopicPartition, OffsetAndMetadata> nextOffsets) {
this.records = records;
this.nextOffsets = Map.copyOf(nextOffsets);
@@ -65,7 +73,7 @@ public class ConsumerRecords<K, V> implements
Iterable<ConsumerRecord<K, V>> {
/**
* Get the next offsets and metadata corresponding to all topic partitions
for which the position have been advanced in this poll call
- * @return the next offsets that the consumer will consume
+ * @return The next offsets that the consumer will consume
*/
public Map<TopicPartition, OffsetAndMetadata> nextOffsets() {
return nextOffsets;
@@ -87,7 +95,7 @@ public class ConsumerRecords<K, V> implements
Iterable<ConsumerRecord<K, V>> {
/**
* Get the partitions which have records contained in this record set.
- * @return the set of partitions with data in this record set (may be
empty if no data was returned)
+ * @return The set of partitions with data in this record set (may be
empty if no data was returned)
*/
public Set<TopicPartition> partitions() {
return Collections.unmodifiableSet(records.keySet());
@@ -135,10 +143,22 @@ public class ConsumerRecords<K, V> implements
Iterable<ConsumerRecord<K, V>> {
}
}
+ /**
+ * Returns whether this container has any records.
+ *
+ * @return True if there are no records, false otherwise
+ */
public boolean isEmpty() {
return records.isEmpty();
}
+ /**
+ * Returns an empty ConsumerRecords instance.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ * @return An empty ConsumerRecords
+ */
@SuppressWarnings("unchecked")
public static <K, V> ConsumerRecords<K, V> empty() {
return (ConsumerRecords<K, V>) EMPTY;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/GroupProtocol.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/GroupProtocol.java
index 9dd46748e3d..7afc8b89cb2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/GroupProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/GroupProtocol.java
@@ -18,6 +18,13 @@ package org.apache.kafka.clients.consumer;
import java.util.Locale;
+/**
+ * Enum representing the supported consumer group protocols.
+ * <ul>
+ * <li>{@link #CLASSIC} - The Classic consumer group protocol (pre
KIP-848)</li>
+ * <li>{@link #CONSUMER} - The Consumer rebalance protocol (KIP-848)</li>
+ * </ul>
+ */
public enum GroupProtocol {
/** Classic group protocol. */
CLASSIC("CLASSIC"),
@@ -36,6 +43,10 @@ public enum GroupProtocol {
/**
* Case-insensitive group protocol lookup by string name.
+ *
+ * @param name The name of the group protocol
+ * @return The corresponding GroupProtocol
+ * @throws IllegalArgumentException If the name does not match any protocol
*/
public static GroupProtocol of(final String name) {
return GroupProtocol.valueOf(name.toUpperCase(Locale.ROOT));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
index b23ca867359..b1e16892904 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
@@ -29,10 +29,20 @@ import java.util.Set;
*/
public abstract class InvalidOffsetException extends KafkaException {
+ /**
+ * Constructs a new InvalidOffsetException with the specified detail
message.
+ *
+ * @param message The detail message
+ */
public InvalidOffsetException(String message) {
super(message);
}
+ /**
+ * Returns the set of partitions for which the offset was invalid.
+ *
+ * @return The set of partitions with invalid offsets
+ */
public abstract Set<TopicPartition> partitions();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
index f13f61e156d..ead11628c78 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/LogTruncationException.java
@@ -33,11 +33,24 @@ public class LogTruncationException extends
OffsetOutOfRangeException {
private final Map<TopicPartition, OffsetAndMetadata> divergentOffsets;
+ /**
+ * Constructs a new LogTruncationException with the fetch offsets and
divergent offsets.
+ *
+ * @param fetchOffsets The fetch offsets that were out of range
+ * @param divergentOffsets The offsets where the consumer's log diverged
from the broker's log
+ */
public LogTruncationException(Map<TopicPartition, Long> fetchOffsets,
Map<TopicPartition, OffsetAndMetadata>
divergentOffsets) {
this("Truncated partitions detected with divergent offsets " +
divergentOffsets, fetchOffsets, divergentOffsets);
}
+ /**
+ * Constructs a new LogTruncationException with a custom message.
+ *
+ * @param message The detail message
+ * @param fetchOffsets The fetch offsets that were out of range
+ * @param divergentOffsets The offsets where the consumer's log diverged
from the broker's log
+ */
public LogTruncationException(String message,
Map<TopicPartition, Long> fetchOffsets,
Map<TopicPartition, OffsetAndMetadata>
divergentOffsets) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index f727eb68bea..92a3435800b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -319,6 +319,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
return new ConsumerRecords<>(results, nextOffsetAndMetadata);
}
+ /**
+ * Adds a record to be returned when {@link #poll(Duration)} is called.
+ *
+ * @param record the record to add
+ * @throws IllegalStateException if the partition is not assigned to the
consumer
+ */
public synchronized void addRecord(ConsumerRecord<K, V> record) {
ensureNotClosed();
TopicPartition tp = new TopicPartition(record.topic(),
record.partition());
@@ -341,10 +347,20 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
this.maxPollRecords = maxPollRecords;
}
+ /**
+ * Sets an exception to throw when {@link #poll(Duration)} is called.
+ *
+ * @param exception the exception to throw
+ */
public synchronized void setPollException(KafkaException exception) {
this.pollException = exception;
}
+ /**
+ * Sets an exception to throw when offset-related methods are called.
+ *
+ * @param exception the exception to throw
+ */
public synchronized void setOffsetsException(KafkaException exception) {
this.offsetsException = exception;
}
@@ -440,6 +456,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
subscriptions.requestOffsetReset(partitions,
AutoOffsetResetStrategy.EARLIEST);
}
+ /**
+ * Updates the beginning offsets for the specified partitions.
+ *
+ * @param newOffsets the beginning offsets to set
+ */
public synchronized void updateBeginningOffsets(Map<TopicPartition, Long>
newOffsets) {
beginningOffsets.putAll(newOffsets);
}
@@ -450,14 +471,27 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
subscriptions.requestOffsetReset(partitions,
AutoOffsetResetStrategy.LATEST);
}
+ /**
+ * Updates the end offsets for the specified partitions.
+ *
+ * @param newOffsets the end offsets to set
+ */
public synchronized void updateEndOffsets(final Map<TopicPartition, Long>
newOffsets) {
endOffsets.putAll(newOffsets);
}
+ /**
+ * Updates the duration-based reset offsets for the specified partitions.
+ *
+ * @param newOffsets the duration offsets to set
+ */
public synchronized void updateDurationOffsets(final Map<TopicPartition,
Long> newOffsets) {
durationResetOffsets.putAll(newOffsets);
}
+ /**
+ * Disables telemetry for this mock consumer.
+ */
public void disableTelemetry() {
telemetryDisabled = true;
}
@@ -469,6 +503,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
this.injectTimeoutExceptionCounter = injectTimeoutExceptionCounter;
}
+ /**
+ * Sets the client instance ID for this mock consumer.
+ *
+ * @param instanceId the client instance ID
+ */
public void setClientInstanceId(final Uuid instanceId) {
clientInstanceId = instanceId;
}
@@ -510,6 +549,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
return partitions;
}
+ /**
+ * Updates the partition information for the specified topic.
+ *
+ * @param topic the topic to update
+ * @param partitions the partition information
+ */
public synchronized void updatePartitions(String topic,
List<PartitionInfo> partitions) {
ensureNotClosed();
this.partitions.put(topic, partitions);
@@ -581,6 +626,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
this.closed = true;
}
+ /**
+ * Returns whether this consumer has been closed.
+ *
+ * @return true if closed, false otherwise
+ */
public synchronized boolean closed() {
return this.closed;
}
@@ -606,6 +656,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
}
+ /**
+ * Schedules a no-op task to be executed during a poll invocation.
+ */
public synchronized void scheduleNopPollTask() {
schedulePollTask(() -> { });
}
@@ -703,18 +756,36 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
shouldRebalance = true;
}
+ /**
+ * Returns whether a rebalance has been triggered via {@link
#enforceRebalance()}.
+ *
+ * @return true if rebalance is pending, false otherwise
+ */
public boolean shouldRebalance() {
return shouldRebalance;
}
+ /**
+ * Resets the rebalance flag set by {@link #enforceRebalance()}.
+ */
public void resetShouldRebalance() {
shouldRebalance = false;
}
+ /**
+ * Returns the timeout used in the last {@link #poll(Duration)} call.
+ *
+ * @return the last poll timeout, or null if poll has not been called
+ */
public Duration lastPollTimeout() {
return lastPollTimeout;
}
+ /**
+ * Returns the metrics registered via {@link
#registerMetricForSubscription(KafkaMetric)}.
+ *
+ * @return an unmodifiable list of added metrics
+ */
public List<KafkaMetric> addedMetrics() {
return Collections.unmodifiableList(addedMetrics);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
index 049732bb2ea..3167113caae 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
@@ -54,6 +54,9 @@ public class MockShareConsumer<K, V> implements
ShareConsumer<K, V> {
private boolean closed;
private Uuid clientInstanceId;
+ /**
+ * Constructs a new MockShareConsumer for testing.
+ */
public MockShareConsumer() {
this.subscriptions = new SubscriptionState(new LogContext(),
AutoOffsetResetStrategy.NONE);
this.records = new HashMap<>();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
index 93b6094fcfc..9ea7abefb71 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -32,19 +32,29 @@ public class NoOffsetForPartitionException extends
InvalidOffsetException {
private final Set<TopicPartition> partitions;
+ /**
+ * Constructs a new NoOffsetForPartitionException for a single partition.
+ *
+ * @param partition The partition for which no offset is defined
+ */
public NoOffsetForPartitionException(TopicPartition partition) {
super("Undefined offset with no reset policy for partition: " +
partition);
this.partitions = Collections.singleton(partition);
}
+ /**
+ * Constructs a new NoOffsetForPartitionException for multiple partitions.
+ *
+ * @param partitions The partitions for which no offset is defined
+ */
public NoOffsetForPartitionException(Collection<TopicPartition>
partitions) {
super("Undefined offset with no reset policy for partitions: " +
partitions);
this.partitions = Set.copyOf(partitions);
}
/**
- * returns all partitions for which no offsets are defined.
- * @return all partitions without offsets
+ * Returns all partitions for which no offsets are defined.
+ * @return All partitions without offsets
*/
public Set<TopicPartition> partitions() {
return partitions;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index f459dd5ba55..98f847df64c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -75,6 +75,11 @@ public class OffsetAndMetadata implements Serializable {
this(offset, "");
}
+ /**
+ * Returns the offset to be committed.
+ *
+ * @return The offset
+ */
public long offset() {
return offset;
}
@@ -82,7 +87,7 @@ public class OffsetAndMetadata implements Serializable {
/**
* Get the metadata of the previously consumed record.
*
- * @return the metadata or empty string if no metadata
+ * @return The metadata or empty string if no metadata
*/
public String metadata() {
return metadata;
@@ -93,7 +98,7 @@ public class OffsetAndMetadata implements Serializable {
* if there exists a leader epoch which is larger than this epoch and
begins at an offset earlier than
* the committed offset.
*
- * @return the leader epoch or empty if not known
+ * @return The leader epoch or empty if not known
*/
public Optional<Integer> leaderEpoch() {
if (leaderEpoch == null || leaderEpoch < 0)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
index 40d993074a2..a62d69607da 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
@@ -27,10 +27,25 @@ public final class OffsetAndTimestamp {
private final long offset;
private final Optional<Integer> leaderEpoch;
+ /**
+ * Constructs a new OffsetAndTimestamp with the given offset and timestamp.
+ *
+ * @param offset The offset
+ * @param timestamp The timestamp
+ * @throws IllegalArgumentException If the offset or timestamp is negative
+ */
public OffsetAndTimestamp(long offset, long timestamp) {
this(offset, timestamp, Optional.empty());
}
+ /**
+ * Constructs a new OffsetAndTimestamp with the given offset, timestamp,
and leader epoch.
+ *
+ * @param offset The offset
+ * @param timestamp The timestamp
+ * @param leaderEpoch The leader epoch
+ * @throws IllegalArgumentException If the offset or timestamp is negative
+ */
public OffsetAndTimestamp(long offset, long timestamp, Optional<Integer>
leaderEpoch) {
if (offset < 0)
throw new IllegalArgumentException("Invalid negative offset");
@@ -43,10 +58,20 @@ public final class OffsetAndTimestamp {
this.leaderEpoch = leaderEpoch;
}
+ /**
+ * Returns the timestamp.
+ *
+ * @return The timestamp
+ */
public long timestamp() {
return timestamp;
}
+ /**
+ * Returns the offset.
+ *
+ * @return The offset
+ */
public long offset() {
return offset;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
index c98e22fc825..c2d4a2d020d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
@@ -30,11 +30,22 @@ public class OffsetOutOfRangeException extends
InvalidOffsetException {
private static final long serialVersionUID = 1L;
private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
+ /**
+ * Constructs a new OffsetOutOfRangeException with the offsets that are
out of range.
+ *
+ * @param offsetOutOfRangePartitions Map of partition to the out-of-range
offset
+ */
public OffsetOutOfRangeException(Map<TopicPartition, Long>
offsetOutOfRangePartitions) {
this("Offsets out of range with no configured reset policy for
partitions: " +
offsetOutOfRangePartitions, offsetOutOfRangePartitions);
}
+ /**
+ * Constructs a new OffsetOutOfRangeException with a custom message.
+ *
+ * @param message The detail message
+ * @param offsetOutOfRangePartitions Map of partition to the out-of-range
offset
+ */
public OffsetOutOfRangeException(String message, Map<TopicPartition, Long>
offsetOutOfRangePartitions) {
super(message);
this.offsetOutOfRangePartitions = offsetOutOfRangePartitions;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
index f44dce6187c..7729ca8abb8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
@@ -18,19 +18,44 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.errors.RetriableException;
+/**
+ * Exception thrown when an offset commit fails with a retriable error.
+ * This exception is generated on the client side upon receiving retriable
error codes
+ * from the Group Coordinator in a commit response.
+ * <p>
+ * Unlike {@link CommitFailedException}, this exception indicates that the
commit
+ * can be retried. The consumer should attempt to commit the offsets again.
+ * </p>
+ */
public class RetriableCommitFailedException extends RetriableException {
private static final long serialVersionUID = 1L;
+ /**
+ * Constructs a new RetriableCommitFailedException with the specified
cause.
+ *
+ * @param t The cause of the exception
+ */
public RetriableCommitFailedException(Throwable t) {
super("Offset commit failed with a retriable exception. You should
retry committing " +
"the latest consumed offsets.", t);
}
+ /**
+ * Constructs a new RetriableCommitFailedException with the specified
detail message.
+ *
+ * @param message The detail message
+ */
public RetriableCommitFailedException(String message) {
super(message);
}
+ /**
+ * Constructs a new RetriableCommitFailedException with the specified
detail message and cause.
+ *
+ * @param message The detail message
+ * @param t The cause of the exception
+ */
public RetriableCommitFailedException(String message, Throwable t) {
super(message, t);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
index d6e168b8da1..fbada960026 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
@@ -30,6 +30,11 @@ public class SubscriptionPattern {
*/
private final String pattern;
+ /**
+ * Constructs a new subscription pattern.
+ *
+ * @param pattern The regular expression pattern compatible with RE2/J
+ */
public SubscriptionPattern(String pattern) {
this.pattern = pattern;
}