jolshan commented on code in PR #15237:
URL: https://github.com/apache/kafka/pull/15237#discussion_r1470235881
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -2056,4 +2059,70 @@ public void
testCompleteTransactionWithUnexpectedPartition() {
assertFutureThrows(future, IllegalStateException.class);
}
+
+ @Test
+ public void testOnPartitionsDeleted() {
+ int partitionCount = 3;
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+
+ service.startup(() -> partitionCount);
+
+ when(runtime.partitions()).thenReturn(
+ IntStream
+ .range(0, partitionCount)
+ .mapToObj(i -> new TopicPartition("__consumer_offsets", i))
+ .collect(Collectors.toSet())
+ );
+
+ List<CompletableFuture<Void>> futures = IntStream
+ .range(0, partitionCount)
+ .mapToObj(__ -> new CompletableFuture<Void>())
+ .collect(Collectors.toList());
+
+ IntStream.range(0, partitionCount).forEach(i -> {
+ CompletableFuture<Void> future = futures.get(i);
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("on-partition-deleted"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets",
i)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenAnswer(__ -> future);
+ });
+
+ IntStream.range(0, partitionCount - 1).forEach(i -> {
+ futures.get(i).complete(null);
+ });
+
+ futures.get(partitionCount -
1).completeExceptionally(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception());
+
+ // The exception is logged and swallowed.
+ assertDoesNotThrow(() ->
Review Comment:
Should we have any other validations that the partition is deleted or not?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -903,6 +906,45 @@ public boolean cleanupExpiredOffsets(String groupId,
List<Record> records) {
return allOffsetsExpired.get() &&
!openTransactionsByGroup.containsKey(groupId);
}
+ /**
+ * Remove offsets of the partitions that have been deleted.
+ *
+ * @param topicPartitions The partitions that have been deleted.
+ * @return The list of tombstones (offset commit) to append.
+ */
+ public List<Record> onPartitionsDeleted(
+ List<TopicPartition> topicPartitions
+ ) {
+ List<Record> records = new ArrayList<>();
+
+ Map<String, List<Integer>> partitionsByTopic = new HashMap<>();
+ topicPartitions.forEach(tp -> partitionsByTopic
+ .computeIfAbsent(tp.topic(), __ -> new ArrayList<>())
+ .add(tp.partition())
+ );
+
+ Consumer<Offsets> delete = offsets -> {
Review Comment:
This confused me for a bit because the Consumer here is not a kafka consumer
but actually the java consumer class.
These lines are creating a method called delete that takes all the offsets
and appends the tombstone they need. Offsets (the variable) is overloaded a few
times so it is a little confusing.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1001,8 +1002,26 @@ public void onTransactionCompleted(
public void onPartitionsDeleted(
List<TopicPartition> topicPartitions,
BufferSupplier bufferSupplier
- ) {
+ ) throws ExecutionException, InterruptedException {
Review Comment:
To clarify -- is onPartitionsDeleted only called when the topic behind the
partition is deleted (not reassigned etc)? Or are there other cases?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -3050,6 +3060,53 @@ public void testOffsetDeletionsSensor() {
verify(context.metrics).record(OFFSET_DELETIONS_SENSOR_NAME, 2);
}
+ @Test
+ public void testOnPartitionsDeleted() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+
+ // Commit offsets.
+ context.commitOffset("grp-0", "foo", 1, 100, 1,
context.time.milliseconds());
+ context.commitOffset("grp-0", "foo", 2, 200, 1,
context.time.milliseconds());
+ context.commitOffset("grp-0", "foo", 3, 300, 1,
context.time.milliseconds());
+
+ context.commitOffset("grp-1", "bar", 1, 100, 1,
context.time.milliseconds());
+ context.commitOffset("grp-1", "bar", 2, 200, 1,
context.time.milliseconds());
+ context.commitOffset("grp-1", "bar", 3, 300, 1,
context.time.milliseconds());
+
+ context.commitOffset(100L, "grp-2", "foo", 1, 100, 1,
context.time.milliseconds());
+ context.commitOffset(100L, "grp-2", "foo", 2, 200, 1,
context.time.milliseconds());
+ context.commitOffset(100L, "grp-2", "foo", 3, 300, 1,
context.time.milliseconds());
+
+ // Delete partitions.
+ List<Record> records = context.deletePartitions(Arrays.asList(
+ new TopicPartition("foo", 1),
Review Comment:
any particular reason why this is tested with 3 partitions per topic?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -589,6 +589,23 @@ private void scheduleGroupMetadataExpiration() {
);
}
+ /**
+ * Remove offsets of the partitions that have been deleted.
+ *
+ * @param topicPartitions The partitions that have been deleted.
+ * @return The list of tombstones (offset commit) to append.
Review Comment:
Why does the comment specify (offset commit)
Is it to distinguish from other tombstones?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -2056,4 +2059,70 @@ public void
testCompleteTransactionWithUnexpectedPartition() {
assertFutureThrows(future, IllegalStateException.class);
}
+
+ @Test
+ public void testOnPartitionsDeleted() {
+ int partitionCount = 3;
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+
+ service.startup(() -> partitionCount);
+
+ when(runtime.partitions()).thenReturn(
+ IntStream
+ .range(0, partitionCount)
+ .mapToObj(i -> new TopicPartition("__consumer_offsets", i))
+ .collect(Collectors.toSet())
+ );
+
+ List<CompletableFuture<Void>> futures = IntStream
+ .range(0, partitionCount)
+ .mapToObj(__ -> new CompletableFuture<Void>())
+ .collect(Collectors.toList());
+
+ IntStream.range(0, partitionCount).forEach(i -> {
+ CompletableFuture<Void> future = futures.get(i);
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("on-partition-deleted"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets",
i)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenAnswer(__ -> future);
+ });
+
+ IntStream.range(0, partitionCount - 1).forEach(i -> {
Review Comment:
Is this trying to get all but the last partition to succeed and the last one
to fail with an error?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]