dajac commented on code in PR #21396:
URL: https://github.com/apache/kafka/pull/21396#discussion_r2777667400


##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -710,6 +710,87 @@ public void 
testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionE
         assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
     }
 
+    @Test
+    public void 
testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+                new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                        .withTime(timer.time())
+                        .withTimer(timer)
+                        .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                        .withLoader(new MockCoordinatorLoader())
+                        .withEventProcessor(new DirectEventProcessor())
+                        .withPartitionWriter(writer)
+                        .withCoordinatorShardBuilderSupplier(supplier)
+                        
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                        .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                        .withSerializer(new StringSerializer())
+                        .withAppendLingerMs(OptionalInt.of(10))
+                        .withExecutorService(mock(ExecutorService.class))
+                        .build();
+
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.withExecutor(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+
+        // Configure the partition writer with a normal config initially.
+        LogConfig initialLogConfig = new LogConfig(
+                Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
String.valueOf(1024 * 1024) // 1MB
+                ));
+        when(writer.config(TP)).thenReturn(initialLogConfig);
+        when(writer.append(eq(TP), any(), any(), anyShort())).thenReturn(1L);
+
+        // Load the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+
+        // Schedule a write operation to create a pending batch.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+                state -> new CoordinatorResult<>(List.of("record1"), 
"response1")

Review Comment:
   nit: indentation too.



##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -710,6 +710,87 @@ public void 
testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionE
         assertThrows(NotCoordinatorException.class, () -> 
runtime.contextOrThrow(TP));
     }
 
+    @Test
+    public void 
testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+                new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                        .withTime(timer.time())
+                        .withTimer(timer)
+                        .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                        .withLoader(new MockCoordinatorLoader())
+                        .withEventProcessor(new DirectEventProcessor())
+                        .withPartitionWriter(writer)
+                        .withCoordinatorShardBuilderSupplier(supplier)
+                        
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                        .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                        .withSerializer(new StringSerializer())
+                        .withAppendLingerMs(OptionalInt.of(10))
+                        .withExecutorService(mock(ExecutorService.class))
+                        .build();
+
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.withExecutor(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+
+        // Configure the partition writer with a normal config initially.
+        LogConfig initialLogConfig = new LogConfig(
+                Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
String.valueOf(1024 * 1024) // 1MB
+                ));
+        when(writer.config(TP)).thenReturn(initialLogConfig);
+        when(writer.append(eq(TP), any(), any(), anyShort())).thenReturn(1L);
+
+        // Load the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+
+        // Schedule a write operation to create a pending batch.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+                state -> new CoordinatorResult<>(List.of("record1"), 
"response1")
+        );
+
+        // Verify that the write is not committed yet and a batch exists.
+        assertFalse(write1.isDone());
+        assertNotNull(ctx.currentBatch);
+
+        // Simulate the broker losing leadership: partitionWriter.config() now 
throws NOT_LEADER_OR_FOLLOWER.
+        // This is the scenario described in KAFKA-20115.
+        
when(writer.config(TP)).thenThrow(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+
+        // Schedule the unloading. This should trigger the bug where 
freeCurrentBatch()
+        // tries to call partitionWriter.config(tp).maxMessageSize() and 
throws an exception.
+        // Without the fix, this would prevent the coordinator from unloading 
properly.
+        runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
+
+        // The unload should complete despite the NOT_LEADER_OR_FOLLOWER 
exception
+        // when trying to access partition writer config during buffer cleanup.
+        assertEquals(CLOSED, ctx.state);
+
+        // Verify that onUnloaded is called.
+        verify(coordinator, times(1)).onUnloaded();
+
+        // Verify that the listener is deregistered.
+        verify(writer, times(1)).deregisterListener(
+                eq(TP),
+                any(PartitionWriter.Listener.class)

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to