jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1635284684
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3664,6 +3664,87 @@ public void
testScheduleTransactionalWriteOperationWithBatching() throws Executi
assertNull(complete1.get(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testStateMachineIsReloadedWhenOutOfSync() {
+ MockTimer timer = new MockTimer();
+ MockCoordinatorLoader loader = spy(new MockCoordinatorLoader());
+ MockPartitionWriter writer = new MockPartitionWriter() {
+ @Override
+ public long append(
+ TopicPartition tp,
+ VerificationGuard verificationGuard,
+ MemoryRecords batch
+ ) {
+ // Add 1 to the returned offsets.
+ return super.append(tp, verificationGuard, batch) + 1;
+ }
+ };
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(loader)
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with a quarter of the max batch size each. Keep in
mind that
+ // each batch has a header so it is not possible to have those four
records
+ // in one single batch.
+ List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(0, 1),
"response1"));
+
+ // Write #2.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(1, 2),
"response2"));
+
+ // Write #3.
+ CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(2, 3),
"response3"));
+
+ // Write #4. This write cannot make it in the current batch. So the
current batch
+ // is flushed. It will fail. So we expect all writes to fail.
+ CompletableFuture<String> write4 =
runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records.subList(3, 4),
"response4"));
+
+ // Verify the futures.
+ assertFutureThrows(write1, NotCoordinatorException.class);
Review Comment:
just for my understanding -- the failure is on this first batch that
triggers the reloading and the second one is failed because of the first
failure.
I think that's basically what the comment above says. But wanted to close
the loop (confirming the reloading happens on the first write, and the second
one just fails due to the first failure)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]