Hi PoAn, Thanks. Makes sense to me. Best regards, Andrew
On 2026/03/20 05:42:08 PoAn Yang wrote: > Hi Andrew, > > Thanks for the review and suggestions. I have updated the KIP accordingly. > Here is a summary of the changes: > > 1. Term change > max.idempotence.batches.to.retain -> producer.state.batches.to.retain > MaxIdempotenceBatchesToRetain -> ProducerStateBatchesToRetain > > 2. Topic-level configuration > The configuration has been changed from a broker-level config to a > topic-level config with a server default. > The ProducerStateBatchesToRetain field in ProduceResponse is now placed > inside TopicProduceResponse rather than as a top-level tagged field, since > different partitions on the same broker can have different values. > > 3. Two-level in-flight check > To support per-topic deduplication window sizes, the producer now enforces > two independent in-flight checks: > Per-partition check (new): the number of in-flight batches to a specific > partition must not exceed that partition's discovered > ProducerStateBatchesToRetain limit. > Per-connection check (existing): the total number of in-flight requests to a > broker node must not exceed max.in.flight.requests.per.connection. > > Kind regards, > PoAn > > > On Mar 9, 2026, at 8:43 PM, Andrew Schofield <[email protected]> wrote: > > > > Hi PoAn, > > Thanks for your response. I'm going to try again :) > > > > AS2: ProducerStateEntry#NUM_BATCHES_TO_RETAIN is already a partition-level > > concept. The new max.idempotence.batches.to.retain acts per-partition as > > far as I understand. The fact that we have connection-level and > > partition-level concepts colliding is weird to me. > > > > If I created 20 partitions and set max.in.flight.requests.per.connection to > > 100, and also changed the producer to limit its in-flight requests at the > > partition level, couldn't I get the best of both worlds? I could use the > > usual scaling mechanism of adding partitions to get higher throughput and I > > could continue to use 5 requests per partition. > > > > I don't mind adding a config to set the number of batches to retain, but I > > think that's only half the problem. > > > > Thanks, > > Andrew > > > > On 2026/03/08 06:33:35 PoAn Yang wrote: > >> Hi Andrew, > >> > >> Thanks for the review and sorry for the late reply. I took some time to > >> think through how a partition-level approach could be implemented, what > >> benefits it might bring, and to run additional experiments in low-latency > >> environments. > >> > >> AS1 & AS2: Since both comments converge on the idea of introducing > >> partition-level configuration, I'll address them together. > >> > >> 1. Partition-level configuration does not satisfy all use cases. While a > >> partition-level in-flight limit offers finer-grained control per > >> partition, it doesn't cover the case where a user wants to bound the total > >> number of in-flight requests on a single broker connection. These are two > >> distinct concerns: per-partition flow control vs. per-connection > >> back-pressure. A partition-level configuration alone cannot replace the > >> connection-level limit that max.in.flight.requests.per.connection > >> currently provides. > >> > >> 2. Having two levels of in-flight limits increases both user-facing and > >> implementation complexity. > >> > >> 3. A broker-level configuration benefits both high-latency and low-latency > >> environments. I've added localhost benchmark results to the KIP. Even in a > >> low-latency environment, setting max.in.flight.requests.per.connection=1 > >> causes each request to queue at the producer level, resulting in > >> significantly higher latency compared to allowing more in-flight requests. > >> > >> Thank you, > >> PoAn > >> > >>> On Mar 2, 2026, at 7:11 AM, Andrew Schofield <[email protected]> > >>> wrote: > >>> > >>> Hi PoAn, > >>> Thanks for your KIP. This is seems like a good area to improve, not just > >>> for the high-latency connections between clients and brokers that you > >>> mentioned, but also because diskless is introducing topics which have > >>> high write latency too. > >>> > >>> AS1: In general, I'm nervous of having to set broker configurations based > >>> on knowledge of the client latency. If you have an asymmetrical > >>> configuration with a mixture of high and low latency clients, you end up > >>> having to configure for the worst case. I'd prefer the client to behave > >>> differently in the event that it is experiencing high latency, and also > >>> to be responsive to the difference in latency for specific topics which > >>> have higher latency, rather than to change the broker configuration for > >>> all clients. wdyt? > >>> > >>> AS2: If I understand the code correctly (and that's not guaranteed), > >>> ProducerStateEntry.NUM_BATCHES_TO_RETAIN (5) is the number of batches per > >>> producer ID per topic-partition that the broker can retain. The Java > >>> producer client uses max.in-flight.batches.per.connection (also 5) to > >>> limit how many requests it is prepared to have in flight, but this is at > >>> the level of the entire connection. Would an alternative change be to > >>> switch the producer's limit from a connection-level limit to a > >>> partition-level limit matching the broker implementation? You could get a > >>> lot of in-flight requests by using more partitions. The key is the amount > >>> of data in flight, not really the number of batches. I may have > >>> misunderstood how this area works, but it doesn't seem optimal. > >>> > >>> Thanks, > >>> Andrew > >>> > >>> On 2026/02/28 12:21:10 PoAn Yang wrote: > >>>> Hi Luke, > >>>> > >>>> Thanks for the review. > >>>> > >>>> 2 & 4. I add more background to Broker Configuration and > >>>> Dynamic Capacity Discovery paragraphs. In the initial state, > >>>> the producer can only send at most min(5, > >>>> max.in.flight.requests.per.connection) requests, so it doesn’t > >>>> break old brokers capacity. > >>>> > >>>> Thank you, > >>>> PoAn > >>>> > >>>>> On Feb 27, 2026, at 4:27 PM, Luke Chen <[email protected]> wrote: > >>>>> > >>>>> Hi PoAn, > >>>>> > >>>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause > >>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove > >>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related. > >>>>> > >>>>> OK, I see. > >>>>> > >>>>>> Yes, if max.in.flight.requests.per.connection is larger than > >>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained. > >>>>> That is why we have initial state to make sure the producer sends > >>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN. > >>>>> Only if it finds a broker can retain more batches, it adjusts its > >>>>> limitation. > >>>>> > >>>>> So, currently, when idempotent/transactional producer is enabled, we > >>>>> will > >>>>> throw exception if the max.in.flight.requests.per.connection > 5. > >>>>> When we allow users to configure the NUM_BATCHES_TO_RETAIN, the > >>>>> validation > >>>>> will not be applied before sending the produce request. > >>>>> And that's why we need the produce response to tell the producer what > >>>>> the > >>>>> setting in the broker side is. > >>>>> Could you make it more clear about this in the KIP? > >>>>> > >>>>> Also, if the max.in.flight.requests.per.connection is set to 100, > >>>>> and NUM_BATCHES_TO_RETAIN is 5, then it means it's a little late when > >>>>> the > >>>>> first producer response is received if we already allow producers to > >>>>> send > >>>>> 100 requests in flight. If we want to adopt this solution, maybe we > >>>>> need to > >>>>> let the producer begins from max.in.flight.requests.per.connection = 1 > >>>>> and > >>>>> then adjust it to the expected value after the first producer response > >>>>> is > >>>>> received. Does that make sense? > >>>>> > >>>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However, > >>>>> if a broker works with old producers, it may waste memory. Old > >>>>> producers can't send more in flight requests cause of ConfigException. > >>>>> How about we still use 5 in 4.x and adjust to a larger value in 5.0? > >>>>> > >>>>> Sounds good to me. > >>>>> > >>>>> Thank you, > >>>>> Luke > >>>>> > >>>>> > >>>>> > >>>>> On Thu, Feb 26, 2026 at 9:22 PM PoAn Yang <[email protected]> wrote: > >>>>> > >>>>>> Hi Luke, > >>>>>> > >>>>>> Thanks for the review and suggestions. > >>>>>> > >>>>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause > >>>>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove > >>>>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related. > >>>>>> > >>>>>> 2. Agree, transactional producers are based on idempotent producers. > >>>>>> Updated it. > >>>>>> > >>>>>> 3. > >>>>>>> So, I'd like to know why we have to adjust the > >>>>>>> `max.in.flight.requests.per.connection` value in the producer side? > >>>>>> > >>>>>> > >>>>>> User doesn’t need to update max.in.flight.requests.per.connection in > >>>>>> this case. The producer will automatically adjust internal limitation > >>>>>> of > >>>>>> in flight requests. > >>>>>> > >>>>>>> Using the example above, after this KIP, > >>>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained > >>>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right? > >>>>>> > >>>>>> > >>>>>> Yes, if max.in.flight.requests.per.connection is larger than > >>>>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained. > >>>>>> That is why we have initial state to make sure the producer sends > >>>>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN. > >>>>>> Only if it finds a broker can retain more batches, it adjusts its > >>>>>> limitation. > >>>>>> > >>>>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However, > >>>>>> if a broker works with old producers, it may waste memory. Old > >>>>>> producers can't send more in flight requests cause of ConfigException. > >>>>>> How about we still use 5 in 4.x and adjust to a larger value in 5.0? > >>>>>> > >>>>>> Thank you, > >>>>>> PoAn > >>>>>> > >>>>>>> On Feb 25, 2026, at 9:07 PM, Luke Chen <[email protected]> wrote: > >>>>>>> > >>>>>>> Hi PoAn, > >>>>>>> > >>>>>>> Thanks for the KIP! > >>>>>>> I agree the number of batches to retain should be configurable to > >>>>>>> improve > >>>>>>> the throughput. > >>>>>>> > >>>>>>> Comments: > >>>>>>> 1. Could you add the issue: KAFKA-18905 > >>>>>>> <https://issues.apache.org/jira/browse/KAFKA-18905> into the > >>>>>>> motivation section? I think this is the issue we want to address, > >>>>>>> right? > >>>>>>> > >>>>>>> 2. > Introduce a new config on the broker, as the broker must know how > >>>>>> much > >>>>>>> memory to allocate. Operators can set a limitation on the broker side > >>>>>>> to > >>>>>>> prevent malicious producers. This configuration only takes effect for > >>>>>>> idempotent producers. > >>>>>>> I think not only the idempotent producers, but also the > >>>>>>> transactional producers, as long as they have the PID. > >>>>>>> > >>>>>>> 3. About the producer response update, I'm wondering if it is > >>>>>>> necessary? > >>>>>>> Currently, when producer with > >>>>>>> `max.in.flight.requests.per.connection=10` > >>>>>>> and NUM_BATCHES_TO_RETAIN=5, we won't adjust the producer config to 5. > >>>>>>> Of course it is possible to the duplication cannot be detected, but > >>>>>>> that > >>>>>>> might be user's choice to improve the throughput (though it might be > >>>>>> rare). > >>>>>>> So, I'd like to know why we have to adjust the > >>>>>>> `max.in.flight.requests.per.connection` value in the producer side? > >>>>>>> Using the example above, after this KIP, > >>>>>>> the `max.in.flight.requests.per.connection=10` cannot be retained > >>>>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right? > >>>>>>> > >>>>>>> 4. The default value of `max.idempotence.batches.to.retain` > >>>>>>> In the performance test you showed, it obviously shows > >>>>>>> larger `max.idempotence.batches.to.retain` will get better throughput. > >>>>>>> Also, the memory usage is small, do we have any reason we keep the > >>>>>> default > >>>>>>> value for 5? > >>>>>>> > >>>>>>> Thank you, > >>>>>>> Luke > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Sun, Feb 22, 2026 at 9:48 PM PoAn Yang <[email protected]> wrote: > >>>>>>> > >>>>>>>> Hi all, > >>>>>>>> > >>>>>>>> I would like to start a discussion thread on KIP-1269. In this KIP, > >>>>>>>> we > >>>>>> aim > >>>>>>>> to remove limitation of maximal number of batches to retain for a > >>>>>>>> idempotent producer. In our test, it can improve throughput and > >>>>>>>> reduce > >>>>>>>> latency. > >>>>>>>> > >>>>>>>> https://cwiki.apache.org/confluence/x/loI8G > >>>>>>>> > >>>>>>>> Please take a look and feel free to share any thoughts. > >>>>>>>> > >>>>>>>> Thanks. > >>>>>>>> PoAn > >>>>>> > >>>>>> > >>>> > >>>> > >> > >> > >
