[
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836736#comment-17836736
]
A. Sophie Blee-Goldman commented on KAFKA-16514:
------------------------------------------------
I haven't gone back and re-read the KIP, but IIRC the reason for adding these
CloseOptions was specific to solving an issue with static membership, hence why
it only takes affect there.
That said – I completely agree that there's no reason why this should only work
with static membership, and the decision to not leave the group for
non-static-membership is one of those biases that Kafka Streams has in assuming
persistent state stores.
I would fully support changing the behavior to work with non-static-members
rather than just updating the javadocs to explain this. [~mjsax] would this
need a KIP? Or can we just consider this a "bug" (especially since the javadocs
make no mention that it's intended to only work on static members) and since we
don't need any API changes, simply make the change without a KIP?
Either way – [~sal.sorrentino] would you be interested in picking this up?
> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup
> flag.
> -----------------------------------------------------------------------------------
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.7.0
> Reporter: Sal Sorrentino
> Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated
> consumer group, the supplied `leaveGroup` option seems to have no effect.
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave
> the group, immediately triggering a consumer group rebalance. In practice,
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any
> associated StateStores would be persisted to disk and that in the case of a
> rolling restart/deployment, the rebalance delay may be preferable. However,
> in our application we are using in-memory state stores and standby replicas.
> There is no benefit in delaying the rebalance in this setup and we are in
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)