chia7712 commented on code in PR #20040:
URL: https://github.com/apache/kafka/pull/20040#discussion_r2179954736
##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -2937,9 +2939,208 @@ public void testAppendRecordBatchSize() {
assertFalse(write1.isCompletedExceptionally());
int batchSize = writer.entries(TP).get(0).sizeInBytes();
- assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
+ assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize <
maxBatchSize);
}
+ @Test
+ public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
+ MockTimer timer = new MockTimer();
+ InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false) {
+ @Override
+ public LogConfig config(TopicPartition tp) {
+ return new LogConfig(Map.of(
+ TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024
* 1024) // 1MB
+ ));
+ }
+ };
+ StringSerializer serializer = new StringSerializer();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(mockWriter)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(serializer)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Generate a record larger than the maxBatchSize.
+ List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024));
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(largeRecords, "response1", null,
true, false)
+ );
+
+ // Verify that the write has not completed exceptionally.
+ // This will catch any exceptions thrown including
RecordTooLargeException.
+ assertFalse(write1.isCompletedExceptionally());
+
+ // Verify that the next buffer retrieved from the bufferSupplier is
the initial small one, not the large buffer.
+ assertEquals(INITIAL_BUFFER_SIZE,
ctx.bufferSupplier.get(1).capacity());
+ }
+
+ @Test
+ public void
testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() {
+ MockTimer timer = new MockTimer();
+ InMemoryPartitionWriter mockWriter = new
InMemoryPartitionWriter(false) {
+ @Override
+ public LogConfig config(TopicPartition tp) {
+ return new LogConfig(Map.of(
+ TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024
* 1024 * 1024) // 1GB
+ ));
+ }
+ };
+ StringSerializer serializer = new StringSerializer();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(mockWriter)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(serializer)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+
+ // Generate enough records to create a batch that has
INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
+ List<String> records = new ArrayList<>();
+ for (int i = 0; i < 1000000; i++) {
+ records.add("record-" + i);
+ }
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // Verify that the write has not completed exceptionally.
+ // This will catch any exceptions thrown including
RecordTooLargeException.
+ assertFalse(write1.isCompletedExceptionally());
+
+ // Verify that the next buffer retrieved from the bufferSupplier is
the expanded buffer.
+ assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE);
+ }
+
+ @Test
+ public void
testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() {
+ MockTimer timer = new MockTimer();
+ var mockWriter = new InMemoryPartitionWriter(false) {
+ private LogConfig config;
+
+ {
+ config = new LogConfig(Map.of(
+ TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024
* 1024) // 1MB
+ ));
+ }
+
+ @Override
+ public LogConfig config(TopicPartition tp) {
+ return config;
+ }
+
+ public void updateConfig(LogConfig newConfig) {
+ this.config = newConfig;
+ }
+ };
+ StringSerializer serializer = new StringSerializer();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(mockWriter)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(serializer)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
+
+ List<String> records = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ records.add("record-" + i);
+ }
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(records, "response1")
+ );
+
+ // Verify that the write has not completed exceptionally.
+ // This will catch any exceptions thrown including
RecordTooLargeException.
+ assertFalse(write1.isCompletedExceptionally());
+
+ ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1);
+ assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity());
+ ctx.bufferSupplier.release(cachedBuffer);
+
+ // Reduce max message size below initial buffer size.
+ mockWriter.updateConfig(new LogConfig(
+ Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG,
String.valueOf(INITIAL_BUFFER_SIZE - 66))));
+
+ // Write #2.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
+ state -> new CoordinatorResult<>(records, "response2")
+ );
+ assertFalse(write2.isCompletedExceptionally());
+
+ // Verify that there is no cached buffer.
+ assertEquals(1, ctx.bufferSupplier.get(1).capacity());
+
+ // Write #3.
Review Comment:
Could you please add unit test to ensure the maximum value of capacity of
buffer is equal to `maxMessageSize`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]