[
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007664#comment-17007664
]
Jonathan Santilli commented on KAFKA-9312:
------------------------------------------
Checking a little bit further seems like yes, it finishes but if the batch was
splitted, the _future_ gets chained:
{code:java}
@Override
public RecordMetadata get() throws InterruptedException, ExecutionException {
this.result.await(); // If Finish here
if (nextRecordMetadata != null)
return nextRecordMetadata.get();
return valueOrError();
}
...
/**
* This method is used when we have to split a large batch in smaller ones. A
chained metadata will allow the
* future that has already returned to the users to wait on the newly created
split batches even after the
* old big batch has been deemed as done.
*/
void chain(FutureRecordMetadata futureRecordMetadata) {
if (nextRecordMetadata == null)
nextRecordMetadata = futureRecordMetadata;
else
nextRecordMetadata.chain(futureRecordMetadata);
}
And ProducerBatch#tryAppendForSplit calls thunk.future.chain(future);{code}
So, I think is ok, I will create a test case to verify it.
> KafkaProducer flush behavior does not guarantee completed sends under record
> batch splitting
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
> Reporter: Lucas Bradstreet
> Assignee: Jonathan Santilli
> Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent
> at time of the flush call will be either sent successfully or will result in
> an error.
> The KafkaProducer will split record batches upon receiving a
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on
> the accumulator checking incomplete sends that exist at the time of the flush
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is
> completed, and new batches added to the incomplete list of record batches.
> This will break the flush guarantee as awaitFlushCompletion will finish
> without awaiting the new split batches, and any pre-split batches being
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of
> KAFKA-3995; KIP-126; 0.11.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)