> > 1. Could you add the issue: KAFKA-18905 >> <https://issues.apache.org/jira/browse/KAFKA-18905> into the >> motivation section? I think this is the issue we want to address, right? > > 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause > OUT_OF_ORDER_SEQUENCE error. This KIP is to remove > NUM_BATCHES_TO_RETAIN limitation. I think they’re not related.
I agree with this. KAFKA-18905 is orthogonal to the KIP. However, my takeaway from KAFKA-18905 is that NUM_BATCHES_TO_RETAIN should not be a cap on the number of in-flight produce requests, but instead a cap on how far ahead an idempotent producer can speculatively send produce requests after an unacked produce. I believe we've been thinking about it incorrectly for a while. This doesn't affect the KIP as far as I can tell. Thanks, Sean On Fri, Apr 10, 2026 at 10:33 PM Andrew Schofield <[email protected]> wrote: > Hi PoAn, > Thanks. Makes sense to me. > > Best regards, > Andrew > > On 2026/03/20 05:42:08 PoAn Yang wrote: > > Hi Andrew, > > > > Thanks for the review and suggestions. I have updated the KIP > accordingly. Here is a summary of the changes: > > > > 1. Term change > > max.idempotence.batches.to.retain -> producer.state.batches.to.retain > > MaxIdempotenceBatchesToRetain -> ProducerStateBatchesToRetain > > > > 2. Topic-level configuration > > The configuration has been changed from a broker-level config to a > topic-level config with a server default. > > The ProducerStateBatchesToRetain field in ProduceResponse is now placed > inside TopicProduceResponse rather than as a top-level tagged field, since > different partitions on the same broker can have different values. > > > > 3. Two-level in-flight check > > To support per-topic deduplication window sizes, the producer now > enforces two independent in-flight checks: > > Per-partition check (new): the number of in-flight batches to a specific > partition must not exceed that partition's discovered > ProducerStateBatchesToRetain limit. > > Per-connection check (existing): the total number of in-flight requests > to a broker node must not exceed max.in.flight.requests.per.connection. > > > > Kind regards, > > PoAn > > > > > On Mar 9, 2026, at 8:43 PM, Andrew Schofield <[email protected]> > wrote: > > > > > > Hi PoAn, > > > Thanks for your response. I'm going to try again :) > > > > > > AS2: ProducerStateEntry#NUM_BATCHES_TO_RETAIN is already a > partition-level concept. The new max.idempotence.batches.to.retain acts > per-partition as far as I understand. The fact that we have > connection-level and partition-level concepts colliding is weird to me. > > > > > > If I created 20 partitions and set > max.in.flight.requests.per.connection to 100, and also changed the producer > to limit its in-flight requests at the partition level, couldn't I get the > best of both worlds? I could use the usual scaling mechanism of adding > partitions to get higher throughput and I could continue to use 5 requests > per partition. > > > > > > I don't mind adding a config to set the number of batches to retain, > but I think that's only half the problem. > > > > > > Thanks, > > > Andrew > > > > > > On 2026/03/08 06:33:35 PoAn Yang wrote: > > >> Hi Andrew, > > >> > > >> Thanks for the review and sorry for the late reply. I took some time > to think through how a partition-level approach could be implemented, what > benefits it might bring, and to run additional experiments in low-latency > environments. > > >> > > >> AS1 & AS2: Since both comments converge on the idea of introducing > partition-level configuration, I'll address them together. > > >> > > >> 1. Partition-level configuration does not satisfy all use cases. > While a partition-level in-flight limit offers finer-grained control per > partition, it doesn't cover the case where a user wants to bound the total > number of in-flight requests on a single broker connection. These are two > distinct concerns: per-partition flow control vs. per-connection > back-pressure. A partition-level configuration alone cannot replace the > connection-level limit that max.in.flight.requests.per.connection currently > provides. > > >> > > >> 2. Having two levels of in-flight limits increases both user-facing > and implementation complexity. > > >> > > >> 3. A broker-level configuration benefits both high-latency and > low-latency environments. I've added localhost benchmark results to the > KIP. Even in a low-latency environment, setting > max.in.flight.requests.per.connection=1 causes each request to queue at the > producer level, resulting in significantly higher latency compared to > allowing more in-flight requests. > > >> > > >> Thank you, > > >> PoAn > > >> > > >>> On Mar 2, 2026, at 7:11 AM, Andrew Schofield <[email protected]> > wrote: > > >>> > > >>> Hi PoAn, > > >>> Thanks for your KIP. This is seems like a good area to improve, not > just for the high-latency connections between clients and brokers that you > mentioned, but also because diskless is introducing topics which have high > write latency too. > > >>> > > >>> AS1: In general, I'm nervous of having to set broker configurations > based on knowledge of the client latency. If you have an asymmetrical > configuration with a mixture of high and low latency clients, you end up > having to configure for the worst case. I'd prefer the client to behave > differently in the event that it is experiencing high latency, and also to > be responsive to the difference in latency for specific topics which have > higher latency, rather than to change the broker configuration for all > clients. wdyt? > > >>> > > >>> AS2: If I understand the code correctly (and that's not guaranteed), > ProducerStateEntry.NUM_BATCHES_TO_RETAIN (5) is the number of batches per > producer ID per topic-partition that the broker can retain. The Java > producer client uses max.in-flight.batches.per.connection (also 5) to limit > how many requests it is prepared to have in flight, but this is at the > level of the entire connection. Would an alternative change be to switch > the producer's limit from a connection-level limit to a partition-level > limit matching the broker implementation? You could get a lot of in-flight > requests by using more partitions. The key is the amount of data in flight, > not really the number of batches. I may have misunderstood how this area > works, but it doesn't seem optimal. > > >>> > > >>> Thanks, > > >>> Andrew > > >>> > > >>> On 2026/02/28 12:21:10 PoAn Yang wrote: > > >>>> Hi Luke, > > >>>> > > >>>> Thanks for the review. > > >>>> > > >>>> 2 & 4. I add more background to Broker Configuration and > > >>>> Dynamic Capacity Discovery paragraphs. In the initial state, > > >>>> the producer can only send at most min(5, > > >>>> max.in.flight.requests.per.connection) requests, so it doesn’t > > >>>> break old brokers capacity. > > >>>> > > >>>> Thank you, > > >>>> PoAn > > >>>> > > >>>>> On Feb 27, 2026, at 4:27 PM, Luke Chen <[email protected]> wrote: > > >>>>> > > >>>>> Hi PoAn, > > >>>>> > > >>>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause > > >>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove > > >>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related. > > >>>>> > > >>>>> OK, I see. > > >>>>> > > >>>>>> Yes, if max.in.flight.requests.per.connection is larger than > > >>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained. > > >>>>> That is why we have initial state to make sure the producer sends > > >>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN. > > >>>>> Only if it finds a broker can retain more batches, it adjusts its > > >>>>> limitation. > > >>>>> > > >>>>> So, currently, when idempotent/transactional producer is enabled, > we will > > >>>>> throw exception if the max.in.flight.requests.per.connection > 5. > > >>>>> When we allow users to configure the NUM_BATCHES_TO_RETAIN, the > validation > > >>>>> will not be applied before sending the produce request. > > >>>>> And that's why we need the produce response to tell the producer > what the > > >>>>> setting in the broker side is. > > >>>>> Could you make it more clear about this in the KIP? > > >>>>> > > >>>>> Also, if the max.in.flight.requests.per.connection is set to 100, > > >>>>> and NUM_BATCHES_TO_RETAIN is 5, then it means it's a little late > when the > > >>>>> first producer response is received if we already allow producers > to send > > >>>>> 100 requests in flight. If we want to adopt this solution, maybe > we need to > > >>>>> let the producer begins from max.in.flight.requests.per.connection > = 1 and > > >>>>> then adjust it to the expected value after the first producer > response is > > >>>>> received. Does that make sense? > > >>>>> > > >>>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However, > > >>>>> if a broker works with old producers, it may waste memory. Old > > >>>>> producers can't send more in flight requests cause of > ConfigException. > > >>>>> How about we still use 5 in 4.x and adjust to a larger value in > 5.0? > > >>>>> > > >>>>> Sounds good to me. > > >>>>> > > >>>>> Thank you, > > >>>>> Luke > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Thu, Feb 26, 2026 at 9:22 PM PoAn Yang <[email protected]> > wrote: > > >>>>> > > >>>>>> Hi Luke, > > >>>>>> > > >>>>>> Thanks for the review and suggestions. > > >>>>>> > > >>>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause > > >>>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove > > >>>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related. > > >>>>>> > > >>>>>> 2. Agree, transactional producers are based on idempotent > producers. > > >>>>>> Updated it. > > >>>>>> > > >>>>>> 3. > > >>>>>>> So, I'd like to know why we have to adjust the > > >>>>>>> `max.in.flight.requests.per.connection` value in the producer > side? > > >>>>>> > > >>>>>> > > >>>>>> User doesn’t need to update max.in.flight.requests.per.connection > in > > >>>>>> this case. The producer will automatically adjust internal > limitation of > > >>>>>> in flight requests. > > >>>>>> > > >>>>>>> Using the example above, after this KIP, > > >>>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained > > >>>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right? > > >>>>>> > > >>>>>> > > >>>>>> Yes, if max.in.flight.requests.per.connection is larger than > > >>>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained. > > >>>>>> That is why we have initial state to make sure the producer sends > > >>>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN. > > >>>>>> Only if it finds a broker can retain more batches, it adjusts its > > >>>>>> limitation. > > >>>>>> > > >>>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However, > > >>>>>> if a broker works with old producers, it may waste memory. Old > > >>>>>> producers can't send more in flight requests cause of > ConfigException. > > >>>>>> How about we still use 5 in 4.x and adjust to a larger value in > 5.0? > > >>>>>> > > >>>>>> Thank you, > > >>>>>> PoAn > > >>>>>> > > >>>>>>> On Feb 25, 2026, at 9:07 PM, Luke Chen <[email protected]> > wrote: > > >>>>>>> > > >>>>>>> Hi PoAn, > > >>>>>>> > > >>>>>>> Thanks for the KIP! > > >>>>>>> I agree the number of batches to retain should be configurable > to improve > > >>>>>>> the throughput. > > >>>>>>> > > >>>>>>> Comments: > > >>>>>>> 1. Could you add the issue: KAFKA-18905 > > >>>>>>> <https://issues.apache.org/jira/browse/KAFKA-18905> into the > > >>>>>>> motivation section? I think this is the issue we want to > address, right? > > >>>>>>> > > >>>>>>> 2. > Introduce a new config on the broker, as the broker must > know how > > >>>>>> much > > >>>>>>> memory to allocate. Operators can set a limitation on the broker > side to > > >>>>>>> prevent malicious producers. This configuration only takes > effect for > > >>>>>>> idempotent producers. > > >>>>>>> I think not only the idempotent producers, but also the > > >>>>>>> transactional producers, as long as they have the PID. > > >>>>>>> > > >>>>>>> 3. About the producer response update, I'm wondering if it is > necessary? > > >>>>>>> Currently, when producer with > `max.in.flight.requests.per.connection=10` > > >>>>>>> and NUM_BATCHES_TO_RETAIN=5, we won't adjust the producer config > to 5. > > >>>>>>> Of course it is possible to the duplication cannot be detected, > but that > > >>>>>>> might be user's choice to improve the throughput (though it > might be > > >>>>>> rare). > > >>>>>>> So, I'd like to know why we have to adjust the > > >>>>>>> `max.in.flight.requests.per.connection` value in the producer > side? > > >>>>>>> Using the example above, after this KIP, > > >>>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained > > >>>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right? > > >>>>>>> > > >>>>>>> 4. The default value of `max.idempotence.batches.to.retain` > > >>>>>>> In the performance test you showed, it obviously shows > > >>>>>>> larger `max.idempotence.batches.to.retain` will get better > throughput. > > >>>>>>> Also, the memory usage is small, do we have any reason we keep > the > > >>>>>> default > > >>>>>>> value for 5? > > >>>>>>> > > >>>>>>> Thank you, > > >>>>>>> Luke > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> On Sun, Feb 22, 2026 at 9:48 PM PoAn Yang <[email protected]> > wrote: > > >>>>>>> > > >>>>>>>> Hi all, > > >>>>>>>> > > >>>>>>>> I would like to start a discussion thread on KIP-1269. In this > KIP, we > > >>>>>> aim > > >>>>>>>> to remove limitation of maximal number of batches to retain for > a > > >>>>>>>> idempotent producer. In our test, it can improve throughput and > reduce > > >>>>>>>> latency. > > >>>>>>>> > > >>>>>>>> https://cwiki.apache.org/confluence/x/loI8G > > >>>>>>>> > > >>>>>>>> Please take a look and feel free to share any thoughts. > > >>>>>>>> > > >>>>>>>> Thanks. > > >>>>>>>> PoAn > > >>>>>> > > >>>>>> > > >>>> > > >>>> > > >> > > >> > > > > >
