junrao commented on PR #20254:
URL: https://github.com/apache/kafka/pull/20254#issuecomment-3152755901
@mjsax : Hmm, the splitting code has some logic to chain the response
futures. When we append a record to a new batch after a split, we create a new
future for the record and chain it to the old future.
```
private boolean tryAppendForSplit(long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers, Thunk thunk) {
...
FutureRecordMetadata future = new
FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp,
key ==
null ? -1 : key.remaining(),
value ==
null ? -1 : value.remaining(),
Time.SYSTEM);
// Chain the future to the original thunk.
thunk.future.chain(future);
```
`FutureRecordMetadata.chain()` then adds the new future as a dependency.
```
void chain(FutureRecordMetadata futureRecordMetadata) {
if (nextRecordMetadata == null)
nextRecordMetadata = futureRecordMetadata;
else
nextRecordMetadata.chain(futureRecordMetadata);
}
```
`FutureRecordMetadata.get()` will then wait until all chained response
futures complete.
```
public RecordMetadata get() throws InterruptedException,
ExecutionException {
this.result.await();
if (nextRecordMetadata != null)
return nextRecordMetadata.get();
return valueOrError();
}
```
So, a pending `flush()` waiting for an original batch is not supposed to
complete until all split batches have completed. Did you actually observe that
`flush()` returns early?
@lianetm : Currently, `KafkaProducer.doSend()` only rejects a record if its
size is > max.request.size. It's ok if a record has a size larger than
batch.size. We will just create a larger new batch to accommodate for it. So,
during split, it's reasonable to follow the same approach to allow the first
record in a new batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]