chia7712 commented on code in PR #16277:
URL: https://github.com/apache/kafka/pull/16277#discussion_r1635740247
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -103,13 +103,13 @@ public class RecordAccumulatorTest {
private final Map<Integer, Node> nodes = Stream.of(node1,
node2).collect(Collectors.toMap(Node::id, Function.identity()));
private MetadataSnapshot metadataCache = new MetadataSnapshot(null,
- nodes,
- partMetadatas,
- Collections.emptySet(),
- Collections.emptySet(),
- Collections.emptySet(),
- null,
- Collections.emptyMap());
+ nodes,
+ partMetadatas,
Review Comment:
please revert unrelated changes
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -121,12 +121,14 @@ public class RecordAccumulatorTest {
private final Metrics metrics = new Metrics(time);
private final long maxBlockTimeMs = 1000;
private final LogContext logContext = new LogContext();
+ private AtomicInteger mockRandom = null;
@BeforeEach void setup() {}
@AfterEach
public void teardown() {
this.metrics.close();
+ mockRandom = null;
Review Comment:
this is redundant since the variable is case-level
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -146,17 +148,17 @@ public void testDrainBatches() throws Exception {
PartitionInfo part3 = MetadataResponse.toPartitionInfo(partMetadata3,
nodes);
PartitionInfo part4 = MetadataResponse.toPartitionInfo(partMetadata4,
nodes);
Cluster cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1, part2, part3, part4),
- Collections.emptySet(), Collections.emptySet());
+ Collections.emptySet(), Collections.emptySet());
metadataCache = new MetadataSnapshot(null,
- nodes,
- partMetadatas,
- Collections.emptySet(),
- Collections.emptySet(),
- Collections.emptySet(),
- null,
- Collections.emptyMap(),
- cluster);
+ nodes,
+ partMetadatas,
+ Collections.emptySet(),
+ Collections.emptySet(),
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1737,30 +1735,48 @@ private RecordAccumulator
createTestRecordAccumulator(int deliveryTimeoutMs, int
* Return a test RecordAccumulator instance
*/
private RecordAccumulator createTestRecordAccumulator(
- TransactionManager txnManager,
- int deliveryTimeoutMs,
- int batchSize,
- long totalSize,
- Compression compression,
- int lingerMs
+ TransactionManager txnManager,
+ int deliveryTimeoutMs,
+ int batchSize,
+ long totalSize,
+ Compression compression,
+ int lingerMs
) {
long retryBackoffMs = 100L;
long retryBackoffMaxMs = 1000L;
String metricGrpName = "producer-metrics";
return new RecordAccumulator(
- logContext,
- batchSize,
- compression,
- lingerMs,
- retryBackoffMs,
- retryBackoffMaxMs,
- deliveryTimeoutMs,
- metrics,
- metricGrpName,
- time,
- new ApiVersions(),
- txnManager,
- new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName));
+ logContext,
+ batchSize,
+ compression,
+ lingerMs,
+ retryBackoffMs,
+ retryBackoffMaxMs,
+ deliveryTimeoutMs,
+ metrics,
+ metricGrpName,
+ time,
+ new ApiVersions(),
+ txnManager,
+ new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName)) {
+ @Override
+ protected BuiltInPartitioner createBuiltInPartitioner(LogContext
logContext, String topic,
+ int
stickyBatchSize) {
+ return new MockRandomBuiltInPartitioner(logContext, topic,
stickyBatchSize);
+ }
+ };
+ }
+
+ private class MockRandomBuiltInPartitioner extends BuiltInPartitioner {
Review Comment:
it can be static class, and it would be great to unify the naming ->
`SequentialPartitioner`
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -146,17 +148,17 @@ public void testDrainBatches() throws Exception {
PartitionInfo part3 = MetadataResponse.toPartitionInfo(partMetadata3,
nodes);
PartitionInfo part4 = MetadataResponse.toPartitionInfo(partMetadata4,
nodes);
Cluster cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1, part2, part3, part4),
- Collections.emptySet(), Collections.emptySet());
+ Collections.emptySet(), Collections.emptySet());
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java:
##########
@@ -203,4 +189,19 @@ void testStickyBatchSizeMoreThatZero() {
assertThrows(IllegalArgumentException.class, () -> new
BuiltInPartitioner(logContext, TOPIC_A, 0));
assertDoesNotThrow(() -> new BuiltInPartitioner(logContext, TOPIC_A,
1));
}
+
+
+ static class SequentialPartitioner extends BuiltInPartitioner {
Review Comment:
could you please make it be private ?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1033,9 +1033,17 @@ public Deque<ProducerBatch> getDeque(TopicPartition tp) {
* Get the deque for the given topic-partition, creating it if necessary.
*/
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
- TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(), k ->
new TopicInfo(logContext, k, batchSize));
+ TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(),
+ k -> new TopicInfo(createBuiltInPartitioner(logContext, k,
batchSize)));
return topicInfo.batches.computeIfAbsent(tp.partition(), k -> new
ArrayDeque<>());
}
+
+ /**
+ * Subclass can custom {@link BuiltInPartitioner}
+ **/
+ protected BuiltInPartitioner createBuiltInPartitioner(LogContext
logContext, String topic, int stickyBatchSize) {
Review Comment:
Could you change it to package-private and add visible for testing?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -113,6 +110,10 @@ private int nextPartition(Cluster cluster) {
return partition;
}
+ protected int randomPartition() {
Review Comment:
Could you change it to package-private and add `visible for testing`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]