[
https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851101#comment-17851101
]
Edoardo Comar edited comment on KAFKA-16570 at 5/31/24 2:12 PM:
----------------------------------------------------------------
[~jolshan] I agree I too think that
FenceProducersHandler.handleError
should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or
log.info it
I used a simple test like
{code:java}
producer.initTransactions();
producer.beginTransaction();
producer.send(record).get();
producer.commitTransaction();
admin.fenceProducers(Collections.singleton(txId)).all().get();
producer.beginTransaction();
producer.send(record).get(); //throws ProducerFenced
{code}
while
{code:java}
producer.initTransactions();
producer.beginTransaction();
producer.send(record).get();
admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws
ConcurrentTransactionsException
producer.commitTransaction(); //2
{code}
however if the ConcurrentTransactionsException is swallowed,
then //2 throws ProducerFencedException as expected
for the ... record, another
producer.send().get()
before commit would instead throw an InvalidProducerEpochException
was (Author: ecomar):
[~jolshan] I agree I too think that
FenceProducersHandler.handleError
should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or
log.info it
I used a simple test like
{code:java}
producer.initTransactions();
producer.beginTransaction();
producer.send(record).get();
producer.commitTransaction();
admin.fenceProducers(Collections.singleton(txId)).all().get();
producer.beginTransaction();
producer.send(record).get(); //throws ProducerFenced
{code}
while
{code:java}
producer.initTransactions();
producer.beginTransaction();
producer.send(record).get();
admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws
ConcurrentTransactionsException
producer.commitTransaction(); //2
{code}
however if the ConcurrentTransactionsException is swallowed,
then //2 throws ProducerFencedException as expected
> FenceProducers API returns "unexpected error" when successful
> -------------------------------------------------------------
>
> Key: KAFKA-16570
> URL: https://issues.apache.org/jira/browse/KAFKA-16570
> Project: Kafka
> Issue Type: Bug
> Reporter: Justine Olshan
> Assignee: Justine Olshan
> Priority: Major
>
> When we want to fence a producer using the admin client, we send an
> InitProducerId request.
> There is logic in that API to fence (and abort) any ongoing transactions and
> that is what the API relies on to fence the producer. However, this handling
> also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because
> we want to actually get a new producer ID and want to retry until the the ID
> is supplied or we time out.
> [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
>
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
>
> In the case of fence producer, we don't retry and instead we have no handling
> for concurrent transactions and log a message about an unexpected error.
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
>
> This is not unexpected though and the operation was successful. We should
> just swallow this error and treat this as a successful run of the command.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)