chia7712 commented on code in PR #16277:
URL: https://github.com/apache/kafka/pull/16277#discussion_r1635142024
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1763,4 +1747,65 @@ private RecordAccumulator createTestRecordAccumulator(
txnManager,
new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName));
}
+
+ private class SequentialRecordAccumulator extends RecordAccumulator {
Review Comment:
Could we move `AtomicInteger mockRandom` to class-level and then return
`RecordAccumulator` with override `createBuiltInPartitioner`? For example:
**RecordAccumulatorTest**
```java
private AtomicInteger random = null;
@AfterEach
public void teardown() {
this.metrics.close();
}
private RecordAccumulator createTestRecordAccumulator(
TransactionManager txnManager,
int deliveryTimeoutMs,
int batchSize,
long totalSize,
Compression compression,
int lingerMs
) {
long retryBackoffMs = 100L;
long retryBackoffMaxMs = 1000L;
String metricGrpName = "producer-metrics";
return new RecordAccumulator(
logContext,
batchSize,
compression,
lingerMs,
retryBackoffMs,
retryBackoffMaxMs,
deliveryTimeoutMs,
metrics,
metricGrpName,
time,
new ApiVersions(),
txnManager,
new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName)) {
@Override
BuiltInPartitioner createBuiltInPartitioner(LogContext
logContext, String topic, int stickyBatchSize) {
return new BuiltInPartitioner(logContext, topic,
stickyBatchSize) {
@Override
int random() {
return random == null ? super.random() :
random.getAndIncrement();
}
};
}
};
}
```
With that change, `testUniformBuiltInPartitioner` can define the `random` in
the begining
```java
random = new AtomicInteger();
```
`testAdaptiveBuiltInPartitioner` can create `RecordAccumulator` with
override `createBuiltInPartitioner` directly.
```java
RecordAccumulator accum = new RecordAccumulator(logContext,
batchSize, Compression.NONE, 0, 0L, 0L,
3200, config, metrics, "producer-metrics", time, new
ApiVersions(), null,
new BufferPool(totalSize, batchSize, metrics, time,
"producer-internal-metrics")) {
@Override
BuiltInPartitioner createBuiltInPartitioner(LogContext
logContext, String topic, int stickyBatchSize) {
return new BuiltInPartitioner(logContext, topic,
stickyBatchSize) {
@Override
int random() {
return random == null ? super.random() :
random.getAndIncrement();
}
};
}
};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]