Yunyung commented on code in PR #20040:
URL: https://github.com/apache/kafka/pull/20040#discussion_r2207399148
##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -2937,7 +2939,210 @@ public void testAppendRecordBatchSize() {
assertFalse(write1.isCompletedExceptionally());
int batchSize = writer.entries(TP).get(0).sizeInBytes();
- assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
+ assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize <
maxBatchSize);
+ }
+
+ @Test
+ public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
+ MockTimer timer = new MockTimer();
+ InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false) {
+ @Override
+ public LogConfig config(TopicPartition tp) {
+ return new LogConfig(Map.of(
+ TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024
* 1024) // 1MB
+ ));
+ }
+ };
+ StringSerializer serializer = new StringSerializer();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(mockWriter)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(serializer)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Generate a record larger than the maxBatchSize.
+ List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024));
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(largeRecords, "response1", null,
true, false)
+ );
+
+ // Verify that the write has not completed exceptionally.
+ // This will catch any exceptions thrown including
RecordTooLargeException.
+ assertFalse(write1.isCompletedExceptionally());
+
+ // Verify that the next buffer retrieved from the bufferSupplier is
the initial small one, not the large buffer.
+ assertEquals(INITIAL_BUFFER_SIZE,
ctx.bufferSupplier.get(1).capacity());
+ }
+
+ @Test
+ public void
testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() {
+ MockTimer timer = new MockTimer();
+ InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false) {
+ @Override
+ public LogConfig config(TopicPartition tp) {
+ return new LogConfig(Map.of(
+ TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024
* 1024 * 1024) // 1GB
+ ));
+ }
+ };
+ StringSerializer serializer = new StringSerializer();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(mockWriter)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(serializer)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Generate enough records to create a batch that has
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
+ List<String> records = new ArrayList<>();
+ for (int i = 0; i < 1000000; i++) {
+ records.add("record-" + i);
+ }
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // Verify that the write has not completed exceptionally.
+ // This will catch any exceptions thrown including
RecordTooLargeException.
+ assertFalse(write1.isCompletedExceptionally());
+
+ int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
+ int maxBatchSize = mockWriter.config(TP).maxMessageSize();
+ assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <=
maxBatchSize);
+
+ // Verify that the next buffer retrieved from the bufferSupplier is
the expanded buffer.
+ assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE);
+ }
+
+ @Test
+ public void
testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() {
+ MockTimer timer = new MockTimer();
+ var mockWriter = new InMemoryPartitionWriter(false) {
+ private LogConfig config = new LogConfig(Map.of(
+ TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 *
1024) // 1MB
+ ));
+
+ @Override
+ public LogConfig config(TopicPartition tp) {
+ return config;
+ }
+
+ public void updateConfig(LogConfig newConfig) {
+ this.config = newConfig;
+ }
+ };
+ StringSerializer serializer = new StringSerializer();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(mockWriter)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(serializer)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+
+ List<String> records = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ records.add("record-" + i);
+ }
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // Verify that the write has not completed exceptionally.
+ // This will catch any exceptions thrown including
RecordTooLargeException.
+ assertFalse(write1.isCompletedExceptionally());
+
+ int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
+ int maxBatchSize = mockWriter.config(TP).maxMessageSize();
+ assertTrue(batchSize < INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE <=
maxBatchSize);
Review Comment:
nit:
```suggestion
assertTrue(batchSize <= INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE
<= maxBatchSize);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]