dajac commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662970452
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3839,6 +3842,209 @@ public void close() {}
assertEquals("response1", write.get(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testScheduleNonAtomicWriteOperation() throws
ExecutionException, InterruptedException, TimeoutException {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with a quarter of the max batch size each. Keep in
mind that
+ // each batch has a header so it is not possible to have those four
records
+ // in one single batch.
+ List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Let's try to write all the records atomically (the default) to
ensure
+ // that it fails.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records, "write#1")
+ );
+
+ assertFutureThrows(write1, RecordTooLargeException.class);
+
+ // Let's try to write the same records non-atomically.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records, "write#2", null, true,
false)
+ );
+
+ // The write is pending.
+ assertFalse(write2.isDone());
+
+ // Verify the state.
+ assertNotNull(ctx.currentBatch);
+ // The last written offset is 3L because one batch was written to the
log with
+ // the first three records. The 4th one is pending.
+ assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(0L, 3L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Arrays.asList(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+ new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Collections.singletonList(
+ records(timer.time().milliseconds(), records.subList(0, 3))
+ ), writer.entries(TP));
+
+ // Commit up to 3L.
+ writer.commit(TP, 3L);
+
+ // The write is still pending.
+ assertFalse(write2.isDone());
+
+ // Advance past the linger time to flush the pending batch.
+ timer.advanceClock(11);
+
+ // Verify the state.
+ assertNull(ctx.currentBatch);
+ assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(3L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Arrays.asList(3L, 4L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Arrays.asList(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+ new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Arrays.asList(
+ records(timer.time().milliseconds() - 11, records.subList(0, 3)),
+ records(timer.time().milliseconds() - 11, records.subList(3, 4))
+ ), writer.entries(TP));
+
+ // Commit up to 4L.
+ writer.commit(TP, 4L);
+
+ // Verify that the write is completed.
+ assertTrue(write2.isDone());
+ assertEquals("write#2", write2.get(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testScheduleNonAtomicWriteOperationWithRecordTooLarge() throws
InterruptedException {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = new MockPartitionWriter();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with a quarter of the max batch size each. Keep in
mind that
+ // each batch has a header so it is not possible to have those four
records
+ // in one single batch.
+ List<String> records = Stream.of('1', '2', '3').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Create another record larger than the max batch size.
+ char[] payload = new char[maxBatchSize];
+ Arrays.fill(payload, '4');
+ String record = new String(payload);
+
+ // Let's write the first three records.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(records, "write#1", null, true,
false)
+ );
+
+ // Verify the state.
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(Collections.singletonList(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+ assertEquals(Arrays.asList(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(Collections.emptyList(), writer.entries(TP));
+
+ // Let's write the 4th record which is too large. This will flush the
current
+ // pending batch, allocate a new batch, and put the record into it.
+ // Note that the batch will fail only when the batch is written
because the
+ // MemoryBatchBuilder always accept one record.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new
CoordinatorResult<>(Collections.singletonList(record), "write#2", null, true,
false)
+ );
+
+ // Advance past the linger time to flush the pending batch.
+ timer.advanceClock(11);
+
+ // The write should have failed...
+ assertFutureThrows(write2, RecordTooLargeException.class);
+
+ // ... but write#1 should be left intact.
+ assertFalse(write1.isDone());
+
+ // Verify the state.
+ //assertNull(ctx.currentBatch);
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]