chia7712 commented on code in PR #16277:
URL: https://github.com/apache/kafka/pull/16277#discussion_r1635142024


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1763,4 +1747,65 @@ private RecordAccumulator createTestRecordAccumulator(
             txnManager,
             new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName));
     }
+
+    private class SequentialRecordAccumulator extends RecordAccumulator {

Review Comment:
   Could we move `AtomicInteger mockRandom` to class-level and then return 
`RecordAccumulator` with override  `createBuiltInPartitioner`? For example:
   **RecordAccumulatorTest**
   ```java
       private AtomicInteger random = null;
   
       @AfterEach
       public void teardown() {
           this.metrics.close();
       }
   
       private RecordAccumulator createTestRecordAccumulator(
           TransactionManager txnManager,
           int deliveryTimeoutMs,
           int batchSize,
           long totalSize,
           Compression compression,
           int lingerMs
       ) {
           long retryBackoffMs = 100L;
           long retryBackoffMaxMs = 1000L;
           String metricGrpName = "producer-metrics";
   
           return new RecordAccumulator(
               logContext,
               batchSize,
               compression,
               lingerMs,
               retryBackoffMs,
               retryBackoffMaxMs,
               deliveryTimeoutMs,
               metrics,
               metricGrpName,
               time,
               new ApiVersions(),
               txnManager,
               new BufferPool(totalSize, batchSize, metrics, time, 
metricGrpName)) {
               @Override
               BuiltInPartitioner createBuiltInPartitioner(LogContext 
logContext, String topic, int stickyBatchSize) {
                   return new BuiltInPartitioner(logContext, topic, 
stickyBatchSize) {
                       @Override
                       int random() {
                           return random == null ? super.random() : 
random.getAndIncrement();
                       }
                   };
               }
           };
       }
   ```
   
   With that change, `testUniformBuiltInPartitioner` can define the `random` in 
the begining
   ```java
   random = new AtomicInteger();
   ```
   
   `testAdaptiveBuiltInPartitioner` can create `RecordAccumulator` with 
override `createBuiltInPartitioner` directly.
   ```java
           RecordAccumulator accum = new RecordAccumulator(logContext, 
batchSize, Compression.NONE, 0, 0L, 0L,
               3200, config, metrics, "producer-metrics", time, new 
ApiVersions(), null,
               new BufferPool(totalSize, batchSize, metrics, time, 
"producer-internal-metrics")) {
               @Override
               BuiltInPartitioner createBuiltInPartitioner(LogContext 
logContext, String topic, int stickyBatchSize) {
                   return new BuiltInPartitioner(logContext, topic, 
stickyBatchSize) {
                       @Override
                       int random() {
                           return random == null ? super.random() : 
random.getAndIncrement();
                       }
                   };
               }
           };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to