squah-confluent commented on code in PR #21396:
URL: https://github.com/apache/kafka/pull/21396#discussion_r2780365024
##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -710,6 +710,87 @@ public void
testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionE
assertThrows(NotCoordinatorException.class, () ->
runtime.contextOrThrow(TP));
}
+ @Test
+ public void
testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+ MockCoordinatorShardBuilderSupplier supplier =
mock(MockCoordinatorShardBuilderSupplier.class);
+ MockCoordinatorShardBuilder builder =
mock(MockCoordinatorShardBuilder.class);
+ MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(supplier)
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(OptionalInt.of(10))
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
+ when(builder.withTime(any())).thenReturn(builder);
+ when(builder.withTimer(any())).thenReturn(builder);
+ when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+ when(builder.withTopicPartition(any())).thenReturn(builder);
+ when(builder.withExecutor(any())).thenReturn(builder);
+ when(builder.build()).thenReturn(coordinator);
+ when(supplier.get()).thenReturn(builder);
+
+ // Configure the partition writer with a normal config initially.
+ LogConfig initialLogConfig = new LogConfig(
+ Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 *
1024)) // 1MB
+ );
+ when(writer.config(TP)).thenReturn(initialLogConfig);
+ when(writer.append(eq(TP), any(), any(), anyShort())).thenReturn(1L);
+
+ // Load the coordinator.
+ runtime.scheduleLoadOperation(TP, 10);
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+
+ // Schedule a write operation to create a pending batch.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(List.of("record1"), "response1")
+ );
+
+ // Verify that the write is not committed yet and a batch exists.
+ assertFalse(write1.isDone());
+ assertNotNull(ctx.currentBatch);
+
+ // Simulate the broker losing leadership: partitionWriter.config() now
throws NOT_LEADER_OR_FOLLOWER.
+ // This is the scenario described in KAFKA-20115.
+
when(writer.config(TP)).thenThrow(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
+ // Schedule the unloading. This should trigger the bug where
freeCurrentBatch()
+ // tries to call partitionWriter.config(tp).maxMessageSize() and
throws an exception.
+ // Without the fix, this would prevent the coordinator from unloading
properly.
+ runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
+
+ // The unload should complete despite the NOT_LEADER_OR_FOLLOWER
exception
+ // when trying to access partition writer config during buffer cleanup.
+ assertEquals(CLOSED, ctx.state);
+
+ // Verify that onUnloaded is called.
+ verify(coordinator, times(1)).onUnloaded();
+
+ // Verify that the listener is deregistered.
+ verify(writer, times(1)).deregisterListener(
+ eq(TP),
+ any(PartitionWriter.Listener.class)
+ );
+
+ // Getting the coordinator context fails because it no longer exists.
+ assertThrows(NotCoordinatorException.class, () ->
runtime.contextOrThrow(TP));
+ }
+
+
Review Comment:
nit: we have too many newlines here
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]