This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c58ce9cac39 MINOR: Add javadoc to share coord member fields and jobs.
(#22305)
c58ce9cac39 is described below
commit c58ce9cac39f09dd03dfdb3595ae66570a6f3a18
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon May 18 18:15:17 2026 +0530
MINOR: Add javadoc to share coord member fields and jobs. (#22305)
* Add comprehensive javadoc for member fields in
`ShareCoordinatorService`
* Provide javadoc based explanation for the periodic jobs.
Reviewers: Andrew Schofield <[email protected]>
---
.../coordinator/share/ShareCoordinatorService.java | 62 +++++++++++++++++++++-
1 file changed, 60 insertions(+), 2 deletions(-)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 1a30a8a13a3..bc6632f0adb 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -83,16 +83,63 @@ import static
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationEx
@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
public class ShareCoordinatorService implements ShareCoordinator {
+ /**
+ * Object for supplying share coordinator related configs.
+ */
private final ShareCoordinatorConfig config;
+
+ /**
+ * Logger instance.
+ */
private final Logger log;
- private final AtomicBoolean isActive = new AtomicBoolean(false); // for
controlling start and stop
+
+ /**
+ * Sentinel to indicate the state of the component. Used for guarding
start and stop activity.
+ */
+ private final AtomicBoolean isActive = new AtomicBoolean(false);
+
+ /**
+ * The main coordinator runtime reference used to queue callbacks to the
share coordinator shards.
+ */
private final CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord>
runtime;
+
+ /**
+ * Metrics object for recording share coordinator related metrics.
+ */
private final ShareCoordinatorMetrics shareCoordinatorMetrics;
- private volatile int numPartitions = -1; // Number of partitions for
__share_group_state. Provided when component is started.
+
+ /**
+ * Integer specifying the number of partitions in __share_group_state
topic. It helps create a hash which is
+ * used for the mapping between {@link SharePartitionKey} and the relevant
{@link ShareCoordinatorShard}.
+ * Updated by call from broker lifecycle manager.
+ */
+ private volatile int numPartitions = -1;
+
+ /**
+ * Time reference.
+ */
private final Time time;
+
+ /**
+ * Timer object to schedule tasks.
+ */
private final Timer timer;
+
+ /**
+ * Reference used to write records to the internal __share_group_state. To
be passed to the coordinator runtime
+ * and utilized in the periodic snapshot job.
+ */
private final PartitionWriter writer;
+
+ /**
+ * Cache used to optimize calls to clean up __share_group_state. Presence
of same entry in the cache helps prevent
+ * issuing redundant record delete calls. Used in the redundant offset
prune job.
+ */
private final Map<TopicPartition, Long> lastPrunedOffsets;
+
+ /**
+ * Config based sentinel used to start or eventually stop defined periodic
job tasks.
+ */
private volatile boolean shouldRunPeriodicJob;
public static class Builder {
@@ -278,6 +325,11 @@ public class ShareCoordinatorService implements
ShareCoordinator {
setupSnapshotColdPartitions();
}
+ /**
+ * Sets up a timer based task which fetches the latest redundant offset
information from the coordinator
+ * runtime and if new information is obtained, issue pruning calls via the
partition writer.
+ * On completion, a new task to be invoked after configured duration is
enqueued in the timer object.
+ */
// Visibility for tests
void setupRecordPruning() {
log.debug("Scheduling share-group state topic prune job.");
@@ -356,6 +408,12 @@ public class ShareCoordinatorService implements
ShareCoordinator {
return fut;
}
+ /**
+ * Sets up a timer task to create new share snapshots keyed on their
{@link SharePartitionKey},
+ * incase there hasn't been any activity on the corresponding share
partition for while. This helps
+ * in cleanup of redundant records from the internal __share_group_state
topic.
+ * On completion, a new task to be invoked after configured duration is
enqueued in the timer object.
+ */
// Visibility for tests
void setupSnapshotColdPartitions() {
log.debug("Scheduling cold share-partition snapshotting.");