>
> 1. Could you add the issue: KAFKA-18905
>> <https://issues.apache.org/jira/browse/KAFKA-18905> into the
>> motivation section? I think this is the issue we want to address, right?
>
> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause
> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove
> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related.


I agree with this. KAFKA-18905 is orthogonal to the KIP.

However, my takeaway from KAFKA-18905 is that NUM_BATCHES_TO_RETAIN should
not be a cap on the number of in-flight produce requests, but instead a cap
on how far ahead an idempotent producer can speculatively send produce
requests after an unacked produce. I believe we've been thinking about it
incorrectly for a while. This doesn't affect the KIP as far as I can tell.

Thanks,
Sean


On Fri, Apr 10, 2026 at 10:33 PM Andrew Schofield <[email protected]>
wrote:

> Hi PoAn,
> Thanks. Makes sense to me.
>
> Best regards,
> Andrew
>
> On 2026/03/20 05:42:08 PoAn Yang wrote:
> > Hi Andrew,
> >
> > Thanks for the review and suggestions. I have updated the KIP
> accordingly. Here is a summary of the changes:
> >
> > 1. Term change
> > max.idempotence.batches.to.retain -> producer.state.batches.to.retain
> > MaxIdempotenceBatchesToRetain -> ProducerStateBatchesToRetain
> >
> > 2. Topic-level configuration
> > The configuration has been changed from a broker-level config to a
> topic-level config with a server default.
> > The ProducerStateBatchesToRetain field in ProduceResponse is now placed
> inside TopicProduceResponse rather than as a top-level tagged field, since
> different partitions on the same broker can have different values.
> >
> > 3. Two-level in-flight check
> > To support per-topic deduplication window sizes, the producer now
> enforces two independent in-flight checks:
> > Per-partition check (new): the number of in-flight batches to a specific
> partition must not exceed that partition's discovered
> ProducerStateBatchesToRetain limit.
> > Per-connection check (existing): the total number of in-flight requests
> to a broker node must not exceed max.in.flight.requests.per.connection.
> >
> > Kind regards,
> > PoAn
> >
> > > On Mar 9, 2026, at 8:43 PM, Andrew Schofield <[email protected]>
> wrote:
> > >
> > > Hi PoAn,
> > > Thanks for your response. I'm going to try again :)
> > >
> > > AS2: ProducerStateEntry#NUM_BATCHES_TO_RETAIN is already a
> partition-level concept. The new max.idempotence.batches.to.retain acts
> per-partition as far as I understand. The fact that we have
> connection-level and partition-level concepts colliding is weird to me.
> > >
> > > If I created 20 partitions and set
> max.in.flight.requests.per.connection to 100, and also changed the producer
> to limit its in-flight requests at the partition level, couldn't I get the
> best of both worlds? I could use the usual scaling mechanism of adding
> partitions to get higher throughput and I could continue to use 5 requests
> per partition.
> > >
> > > I don't mind adding a config to set the number of batches to retain,
> but I think that's only half the problem.
> > >
> > > Thanks,
> > > Andrew
> > >
> > > On 2026/03/08 06:33:35 PoAn Yang wrote:
> > >> Hi Andrew,
> > >>
> > >> Thanks for the review and sorry for the late reply. I took some time
> to think through how a partition-level approach could be implemented, what
> benefits it might bring, and to run additional experiments in low-latency
> environments.
> > >>
> > >> AS1 & AS2: Since both comments converge on the idea of introducing
> partition-level configuration, I'll address them together.
> > >>
> > >> 1. Partition-level configuration does not satisfy all use cases.
> While a partition-level in-flight limit offers finer-grained control per
> partition, it doesn't cover the case where a user wants to bound the total
> number of in-flight requests on a single broker connection. These are two
> distinct concerns: per-partition flow control vs. per-connection
> back-pressure. A partition-level configuration alone cannot replace the
> connection-level limit that max.in.flight.requests.per.connection currently
> provides.
> > >>
> > >> 2. Having two levels of in-flight limits increases both user-facing
> and implementation complexity.
> > >>
> > >> 3. A broker-level configuration benefits both high-latency and
> low-latency environments. I've added localhost benchmark results to the
> KIP. Even in a low-latency environment, setting
> max.in.flight.requests.per.connection=1 causes each request to queue at the
> producer level, resulting in significantly higher latency compared to
> allowing more in-flight requests.
> > >>
> > >> Thank you,
> > >> PoAn
> > >>
> > >>> On Mar 2, 2026, at 7:11 AM, Andrew Schofield <[email protected]>
> wrote:
> > >>>
> > >>> Hi PoAn,
> > >>> Thanks for your KIP. This is seems like a good area to improve, not
> just for the high-latency connections between clients and brokers that you
> mentioned, but also because diskless is introducing topics which have high
> write latency too.
> > >>>
> > >>> AS1: In general, I'm nervous of having to set broker configurations
> based on knowledge of the client latency. If you have an asymmetrical
> configuration with a mixture of high and low latency clients, you end up
> having to configure for the worst case. I'd prefer the client to behave
> differently in the event that it is experiencing high latency, and also to
> be responsive to the difference in latency for specific topics which have
> higher latency, rather than to change the broker configuration for all
> clients. wdyt?
> > >>>
> > >>> AS2: If I understand the code correctly (and that's not guaranteed),
> ProducerStateEntry.NUM_BATCHES_TO_RETAIN (5) is the number of batches per
> producer ID per topic-partition that the broker can retain. The Java
> producer client uses max.in-flight.batches.per.connection (also 5) to limit
> how many requests it is prepared to have in flight, but this is at the
> level of the entire connection. Would an alternative change be to switch
> the producer's limit from a connection-level limit to a partition-level
> limit matching the broker implementation? You could get a lot of in-flight
> requests by using more partitions. The key is the amount of data in flight,
> not really the number of batches. I may have misunderstood how this area
> works, but it doesn't seem optimal.
> > >>>
> > >>> Thanks,
> > >>> Andrew
> > >>>
> > >>> On 2026/02/28 12:21:10 PoAn Yang wrote:
> > >>>> Hi Luke,
> > >>>>
> > >>>> Thanks for the review.
> > >>>>
> > >>>> 2 & 4. I add more background to Broker Configuration and
> > >>>> Dynamic Capacity Discovery paragraphs. In the initial state,
> > >>>> the producer can only send at most min(5,
> > >>>> max.in.flight.requests.per.connection) requests, so it doesn’t
> > >>>> break old brokers capacity.
> > >>>>
> > >>>> Thank you,
> > >>>> PoAn
> > >>>>
> > >>>>> On Feb 27, 2026, at 4:27 PM, Luke Chen <[email protected]> wrote:
> > >>>>>
> > >>>>> Hi PoAn,
> > >>>>>
> > >>>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause
> > >>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove
> > >>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related.
> > >>>>>
> > >>>>> OK, I see.
> > >>>>>
> > >>>>>> Yes, if max.in.flight.requests.per.connection is larger than
> > >>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained.
> > >>>>> That is why we have initial state to make sure the producer sends
> > >>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN.
> > >>>>> Only if it finds a broker can retain more batches, it adjusts its
> > >>>>> limitation.
> > >>>>>
> > >>>>> So, currently, when idempotent/transactional producer is enabled,
> we will
> > >>>>> throw exception if the max.in.flight.requests.per.connection > 5.
> > >>>>> When we allow users to configure the NUM_BATCHES_TO_RETAIN, the
> validation
> > >>>>> will not be applied before sending the produce request.
> > >>>>> And that's why we need the produce response to tell the producer
> what the
> > >>>>> setting in the broker side is.
> > >>>>> Could you make it more clear about this in the KIP?
> > >>>>>
> > >>>>> Also, if the max.in.flight.requests.per.connection is set to 100,
> > >>>>> and NUM_BATCHES_TO_RETAIN is 5, then it means it's a little late
> when the
> > >>>>> first producer response is received if we already allow producers
> to send
> > >>>>> 100 requests in flight. If we want to adopt this solution, maybe
> we need to
> > >>>>> let the producer begins from max.in.flight.requests.per.connection
> = 1 and
> > >>>>> then adjust it to the expected value after the first producer
> response is
> > >>>>> received. Does that make sense?
> > >>>>>
> > >>>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However,
> > >>>>> if a broker works with old producers, it may waste memory. Old
> > >>>>> producers can't send more in flight requests cause of
> ConfigException.
> > >>>>> How about we still use 5 in 4.x and adjust to a larger value in
> 5.0?
> > >>>>>
> > >>>>> Sounds good to me.
> > >>>>>
> > >>>>> Thank you,
> > >>>>> Luke
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Thu, Feb 26, 2026 at 9:22 PM PoAn Yang <[email protected]>
> wrote:
> > >>>>>
> > >>>>>> Hi Luke,
> > >>>>>>
> > >>>>>> Thanks for the review and suggestions.
> > >>>>>>
> > >>>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause
> > >>>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove
> > >>>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related.
> > >>>>>>
> > >>>>>> 2. Agree, transactional producers are based on idempotent
> producers.
> > >>>>>> Updated it.
> > >>>>>>
> > >>>>>> 3.
> > >>>>>>> So, I'd like to know why we have to adjust the
> > >>>>>>> `max.in.flight.requests.per.connection` value in the producer
> side?
> > >>>>>>
> > >>>>>>
> > >>>>>> User doesn’t need to update max.in.flight.requests.per.connection
> in
> > >>>>>> this case. The producer will automatically adjust internal
> limitation of
> > >>>>>> in flight requests.
> > >>>>>>
> > >>>>>>> Using the example above, after this KIP,
> > >>>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained
> > >>>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right?
> > >>>>>>
> > >>>>>>
> > >>>>>> Yes, if max.in.flight.requests.per.connection is larger than
> > >>>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained.
> > >>>>>> That is why we have initial state to make sure the producer sends
> > >>>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN.
> > >>>>>> Only if it finds a broker can retain more batches, it adjusts its
> > >>>>>> limitation.
> > >>>>>>
> > >>>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However,
> > >>>>>> if a broker works with old producers, it may waste memory. Old
> > >>>>>> producers can't send more in flight requests cause of
> ConfigException.
> > >>>>>> How about we still use 5 in 4.x and adjust to a larger value in
> 5.0?
> > >>>>>>
> > >>>>>> Thank you,
> > >>>>>> PoAn
> > >>>>>>
> > >>>>>>> On Feb 25, 2026, at 9:07 PM, Luke Chen <[email protected]>
> wrote:
> > >>>>>>>
> > >>>>>>> Hi PoAn,
> > >>>>>>>
> > >>>>>>> Thanks for the KIP!
> > >>>>>>> I agree the number of batches to retain should be configurable
> to improve
> > >>>>>>> the throughput.
> > >>>>>>>
> > >>>>>>> Comments:
> > >>>>>>> 1. Could you add the issue: KAFKA-18905
> > >>>>>>> <https://issues.apache.org/jira/browse/KAFKA-18905> into the
> > >>>>>>> motivation section? I think this is the issue we want to
> address, right?
> > >>>>>>>
> > >>>>>>> 2. > Introduce a new config on the broker, as the broker must
> know how
> > >>>>>> much
> > >>>>>>> memory to allocate. Operators can set a limitation on the broker
> side to
> > >>>>>>> prevent malicious producers. This configuration only takes
> effect for
> > >>>>>>> idempotent producers.
> > >>>>>>> I think not only the idempotent producers, but also the
> > >>>>>>> transactional producers, as long as they have the PID.
> > >>>>>>>
> > >>>>>>> 3. About the producer response update, I'm wondering if it is
> necessary?
> > >>>>>>> Currently, when producer with
> `max.in.flight.requests.per.connection=10`
> > >>>>>>> and NUM_BATCHES_TO_RETAIN=5, we won't adjust the producer config
> to 5.
> > >>>>>>> Of course it is possible to the duplication cannot be detected,
> but that
> > >>>>>>> might be user's choice to improve the throughput (though it
> might be
> > >>>>>> rare).
> > >>>>>>> So, I'd like to know why we have to adjust the
> > >>>>>>> `max.in.flight.requests.per.connection` value in the producer
> side?
> > >>>>>>> Using the example above, after this KIP,
> > >>>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained
> > >>>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right?
> > >>>>>>>
> > >>>>>>> 4. The default value of `max.idempotence.batches.to.retain`
> > >>>>>>> In the performance test you showed, it obviously shows
> > >>>>>>> larger `max.idempotence.batches.to.retain` will get better
> throughput.
> > >>>>>>> Also, the memory usage is small, do we have any reason we keep
> the
> > >>>>>> default
> > >>>>>>> value for 5?
> > >>>>>>>
> > >>>>>>> Thank you,
> > >>>>>>> Luke
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Sun, Feb 22, 2026 at 9:48 PM PoAn Yang <[email protected]>
> wrote:
> > >>>>>>>
> > >>>>>>>> Hi all,
> > >>>>>>>>
> > >>>>>>>> I would like to start a discussion thread on KIP-1269. In this
> KIP, we
> > >>>>>> aim
> > >>>>>>>> to remove limitation of maximal number of batches to retain for
> a
> > >>>>>>>> idempotent producer. In our test, it can improve throughput and
> reduce
> > >>>>>>>> latency.
> > >>>>>>>>
> > >>>>>>>> https://cwiki.apache.org/confluence/x/loI8G
> > >>>>>>>>
> > >>>>>>>> Please take a look and feel free to share any thoughts.
> > >>>>>>>>
> > >>>>>>>> Thanks.
> > >>>>>>>> PoAn
> > >>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >>
> > >>
> >
> >
>

Reply via email to