Thank you for the clarification, Andrew. 

I’d initially made the changes to the KIP along the same lines after your 
suggestion. I think I just got a little ahead of my thinking without a 
setListener method; instead to create an interceptor like we do for consumer - 
the KIP did not reflect this idea. It was just me floating the idea.  

I think the KIP currently reflects all your suggestions, please take a look 
when you can. It includes the explicit mark for removal of the subscribe 
methods,  new setConsumerRebalanceListener method on Consumer. I’ve renamed the 
name ConsumerRebalanceAdapter - it’s not really just a view because we do allow 
write-like ops like commit/seek.

Best,
Aditya

> On Apr 13, 2026, at 10:12, Andrew Schofield <[email protected]> wrote:
> 
> Hi Aditya,
> I think I might have confused you. I didn't write clearly enough so 
> apologies. I'll try again.
> 
> Today, we have:
> 
> public interface ConsumerRebalanceListener {
> 
>    void onPartitionsAssigned(Collection<TopicPartition> partitions);
> 
>    void onPartitionsRevoked(Collection<TopicPartition> partitions);
> 
>    default void onPartitionsLost(Collection<TopicPartition> partitions) {
>        onPartitionsRevoked(partitions);
>    }
> }
> 
> I think what you end up with is:
> 
> public interface ConsumerRebalanceListener {
> 
>    void onPartitionsAssigned(Collection<TopicPartition> partitions);
> 
>    default void onPartitionsAssigned(Collection<TopicPartition> partitions,
>        ConsumerRebalanceAdapter consumerAdapter) {
>      onPartitionsAssigned(partitions);
>    }
> 
>    void onPartitionsRevoked(Collection<TopicPartition> partitions);
> 
>    default void onPartitionsRevoked(Collection<TopicPartition> partitions,
>        ConsumerRebalanceAdapter consumerAdapter) {
>      onPartitionsRevoked(partitions);
>    }
> 
>    default void onPartitionsLost(Collection<TopicPartition> partitions) {
>      onPartitionsRevoked(partitions);
>    }
> 
>    default void onPartitionsLost(Collection<TopicPartition> partitions,
>        ConsumerRebalanceAdapter consumerAdapter) {
>      onPartitionsRevoked(partitions, consumerAdapter);
>    }
> }
> 
> ConsumerRebalanceView is also fine for the cut-down interface to the 
> Consumer. I do think it would be best to start the name with 
> ConsumerRebalance... .
> 
> And then the changes to Consumer are:
> 
> * Add setConsumerRebalanceListener(ConsumerRebalanceListener listener)
> * Deprecate all of the subscribe variants which take a 
> ConsumerRebalanceListener parameter
> 
> 
> I do take the point about having a list of rebalance listeners like the 
> interceptor classes, but I wouldn't wrap that into the same KIP personally. 
> People tend to implement these things in lambdas.
> 
> Thanks,
> Andrew
> 
>> On 2026/04/08 05:30:51 Aditya Kousik wrote:
>> Hello Andrew,
>> 
>> I dwelt on this a bit more. I think supporting both listeners until AK5.0 
>> may not be as irksome as I initially feared. We already do this for 
>> classic/async consumers with ConsumerDelegate.
>> 
>> I was curious why subscribe() alone took a client-constructed object like 
>> ConsumerRebalanceListener whereas all other hooks were instantiated by the 
>> client code via Configurable. We can support both but amply call out via 
>> documentation and a log info line about which one will be activated at 
>> runtime. This can be opt-in at launch and eventually made the de facto 
>> pattern in the next major release.
>> 
>> This would also tie in nicely with subscribe only supporting topic 
>> collections + regex and move the callback within the client code like other 
>> interceptors already.
>> 
>> I also wanted to call out that this could be an opportunity to support a 
>> list of ConsumerRebalanceInterceptor like the others. Currently in my code, 
>> i wrap the outer one as a CompositeRebalanceListener with 
>> List<ConsumerRebalanceListener> invoked serially. We already do this as I 
>> mentioned earlier with producer/consumerInterceptors handling exceptions 
>> within each call in a loop by logging a warn.
>> 
>> So rebalance.interceptors with a LIST of fqdn classnames instantiated within 
>> the constructor is my current favourite approach. We support one API exactly 
>> for all rebalances indicating which one at runtime.
>> 
>> Lmk your thoughts on this.
>> 
>> Thanks,
>> Aditya Kousik
>> 
>>>> On Apr 5, 2026, at 12:37, Aditya Kousik <[email protected]> wrote:
>>> 
>>> Hi Andrew,
>>> 
>>> I see what you’re saying.
>>> With AS1,2 the flow becomes clearer for the subscribe interaction: we only 
>>> change the subscription state for topics and leave rebalance events to a 
>>> separate mechanism uncoupled from the subscribe() call.
>>> 
>>> To keep in line with other kafka client classes, can we follow the same 
>>> convention of using ConsumerConfig to handle this? A new 
>>> `ConsumerRebalanceInterceptor` with the same signature I’d proposed. 
>>> Instantiated with Utils#newConfiguredInstance and make the class 
>>> Configurable. Naming and instantiating makes it closer to existing 
>>> interceptor classes.
>>> 
>>> My only worry is that as long as ConsumerRebalanceListener exists, this can 
>>> be a source of confusion for which interface to use for rebalance events. 
>>> Unless we deprecate it, we bear the burden of invoking both, even if we 
>>> state that only oneOf(ConsumerRebalanceListener, 
>>> ConsumberRebalanceInterceptor) will be invoked during rebalances.
>>> 
>>> Would love to hear your thoughts on this.
>>> 
>>> -Aditya
>>> 
>>>> On Apr 5, 2026, at 09:39, Andrew Schofield <[email protected]> wrote:
>>>> 
>>>> Hi Aditya,
>>>> I agree that using the existing ConsumerRebalanceListener gives a lower 
>>>> adoption burden.
>>>> 
>>>> AS1: To be more concrete with what I mean here, we could:
>>>> * Deprecate Consumer.subscribe(Collection<String>, 
>>>> ConsumerRebalanceListener) for removal in AK 5.0
>>>> * Introduce 
>>>> Consumer.setConsumerRebalanceListener(ConsumerRebalanceListener)
>>>> 
>>>> AS2: Given that we already have an interface called 
>>>> ConsumerRebalanceListener, I suggest that ConsumerRebalanceXXX would be a 
>>>> better naming choice for naming your new interface in terms of consistency.
>>>> 
>>>> Thanks,
>>>> Andrew
>>>> 
>>>>>> On 2026/04/04 23:48:28 Aditya Kousik wrote:
>>>>> Hi Andrew, thank you for the quick feedback. It turned out to be pivotal.
>>>>> 
>>>>> One of the rejected alternatives was to Add default methods to 
>>>>> ConsumerRebalanceListener.
>>>>> I was ambivalent on this approach with the hopes that a new method and 
>>>>> new interface would create the least friction.
>>>>> 
>>>>> You’re right about the state change w.r.t subscribe() variants. With the 
>>>>> Classic consumer, we directly update SubscriptionType with 
>>>>> setSubscriptionType and similarly a more complex setup for the async 
>>>>> consumer. So your setRebalanceHander suggestion seems to follow existing 
>>>>> patterns.
>>>>> 
>>>>> However, looking at the places I’d need to pipe RebalanceHandler through, 
>>>>> it’s going to add a burden to the plumbing and subscription state.
>>>>> 
>>>>> I’m falling squarely in the extending the existing 
>>>>> ConsumerRebalanceListener with new default methods. This also allows 
>>>>> existing frameworks like Spring and SmallRye can directly hook into the 
>>>>> new method with minimal change.
>>>>> 
>>>>> I’ve renamed/updated the KIP to reflect this. (I can see why people use 
>>>>> shareable URLs for the confluence docs)
>>>>> 
>>>>> https://cwiki.apache.org/confluence/x/9ZU8G
>>>>> 
>>>>> -Aditya
>>>>> 
>>>>> 
>>>>>>> On Apr 2, 2026, at 05:50, Andrew Schofield <[email protected]> 
>>>>>>> wrote:
>>>>>> 
>>>>>> Hi Aditya,
>>>>>> Thanks for the KIP. I've only taken a quick look so far, but here's an 
>>>>>> initial comment.
>>>>>> 
>>>>>> AS1: One of the mistakes in the Kafka consumer API today is that the 
>>>>>> `subscribe(Collection<String>, ConsumerRebalanceListener)` does two 
>>>>>> jobs. First, it replaces the rebalance listener (when you might assume 
>>>>>> that the listener applies only to rebalance changes resulting from this 
>>>>>> call to subscribe). Second, it changes the subscription. If the second 
>>>>>> of these throws an exception, the first will still occur. It's a bit of 
>>>>>> a mess. I suggest you have a 
>>>>>> `Consumer.setRebalanceHandler(RebalanceHandler)` method and do not add a 
>>>>>> new override for `Consumer.subscribe`.
>>>>>> 
>>>>>> Thanks,
>>>>>> Andrew
>>>>>> 
>>>>>> On 2026/04/01 15:16:36 Aditya Kousik wrote:
>>>>>>> Hi all,
>>>>>>> 
>>>>>>> I'd like to start a discussion on KIP-1306: RebalanceHandler: 
>>>>>>> Consumer-Aware Rebalance Callback.
>>>>>>> 
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1306%3A+RebalanceHandler%3A+Consumer-Aware+Rebalance+Callback
>>>>>>> 
>>>>>>> Spring Kafka, SmallRye, and Micronaut all pass the consumer into 
>>>>>>> rebalance callbacks; the client doesn't. The standard workaround of 
>>>>>>> constructor-injecting a full Consumer reference allows dangerous 
>>>>>>> operations like poll() and close() inside a callback. This KIP proposes 
>>>>>>> RebalanceHandler, with a RebalanceConsumerView that exposes only safe 
>>>>>>> operations, making misuse a compile error.
>>>>>>> 
>>>>>>> Looking forward to your feedback.
>>>>>>> 
>>>>>>> Thanks
>>>>>>> 
>>>>> 
>>>>> 
>> 

Reply via email to