lucasbru commented on code in PR #19697:
URL: https://github.com/apache/kafka/pull/19697#discussion_r2086150911
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -550,12 +551,16 @@ private static MainConsumerSetup setupMainConsumer(final
TopologyMetadata topolo
);
final ByteArrayDeserializer keyDeserializer = new
ByteArrayDeserializer();
final ByteArrayDeserializer valueDeserializer = new
ByteArrayDeserializer();
+
return new MainConsumerSetup(
- new AsyncKafkaConsumer<>(
- new
ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(consumerConfigs,
keyDeserializer, valueDeserializer)),
- keyDeserializer,
- valueDeserializer,
- streamsRebalanceData
+ maybeWrapConsumer(
Review Comment:
I think this approach will break here:
https://github.com/apache/kafka/blame/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1108
With the latest changes above, we need to make the extended subscribe method
available to streams in the interface returned by the wrapper.
Could we introduce a little private interface `StreamsConsumer` that
contains all extension methods for streams and let `AsyncKafkaConsumer`
implement it, and have the wrapper return a `StreamsConsumer`?
This could serve as a basis for a public interface later on in KIP-1088.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]