jeffkbkim commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1664527321


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3839,6 +3842,294 @@ public void close() {}
         assertEquals("response1", write.get(5, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testScheduleNonAtomicWriteOperation() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each. Keep in 
mind that
+        // each batch has a header so it is not possible to have those four 
records
+        // in one single batch.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Let's try to write all the records atomically (the default) to 
ensure
+        // that it fails.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records, "write#1")
+        );
+
+        assertFutureThrows(write1, RecordTooLargeException.class);
+
+        // Let's try to write the same records non-atomically.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records, "write#2", null, true, 
false)
+        );
+
+        // The write is pending.
+        assertFalse(write2.isDone());
+
+        // Verify the state.
+        assertNotNull(ctx.currentBatch);
+        // The last written offset is 3L because one batch was written to the 
log with
+        // the first three records. The 4th one is pending.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.singletonList(
+            records(timer.time().milliseconds(), records.subList(0, 3))
+        ), writer.entries(TP));
+
+        // Commit up to 3L.
+        writer.commit(TP, 3L);
+
+        // The write is still pending.
+        assertFalse(write2.isDone());
+
+        // Advance past the linger time to flush the pending batch.
+        timer.advanceClock(11);
+
+        // Verify the state.
+        assertNull(ctx.currentBatch);
+        assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(3L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds() - 11, records.subList(0, 3)),
+            records(timer.time().milliseconds() - 11, records.subList(3, 4))
+        ), writer.entries(TP));
+
+        // Commit up to 4L.
+        writer.commit(TP, 4L);
+
+        // Verify that the write is completed.
+        assertTrue(write2.isDone());
+        assertEquals("write#2", write2.get(5, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testScheduleNonAtomicWriteOperationWithRecordTooLarge() throws 
InterruptedException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each. Keep in 
mind that
+        // each batch has a header so it is not possible to have those four 
records
+        // in one single batch.
+        List<String> records = Stream.of('1', '2', '3').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Create another record larger than the max batch size.
+        char[] payload = new char[maxBatchSize];
+        Arrays.fill(payload, '4');
+        String record = new String(payload);
+
+        // Let's write the first three records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records, "write#1", null, true, 
false)
+        );
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Let's write the 4th record which is too large. This will flush the 
current
+        // pending batch, allocate a new batch, and put the record into it.
+        // Note that the batch will fail only when the batch is written 
because the
+        // MemoryBatchBuilder always accept one record.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList(record), "write#2", null, true, 
false)
+        );
+
+        // Advance past the linger time to flush the pending batch.
+        timer.advanceClock(11);
+
+        // The write should have failed...
+        assertFutureThrows(write2, RecordTooLargeException.class);
+
+        // ... but write#1 should be left intact.
+        assertFalse(write1.isDone());
+
+        // Verify the state.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.singletonList(
+            records(timer.time().milliseconds() - 11, records.subList(0, 3))
+        ), writer.entries(TP));
+    }
+
+    @Test
+    public void testScheduleNonAtomicWriteOperationWhenWriteFails() {
+        MockTimer timer = new MockTimer();
+        // The partition writer only accept no writes.

Review Comment:
   nit: writer does not accept any writes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to