rodesai commented on code in PR #17892:
URL: https://github.com/apache/kafka/pull/17892#discussion_r1855993900
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() {
public synchronized Map<String, StoreFactory> stateStores() {
return stateFactories;
}
+
+ public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut>
wrapFixedKeyProcessorSupplier(
+ final String name,
+ final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier
+ ) {
+ return ProcessorWrapper.asWrappedFixedKey(
+ processorWrapper.wrapFixedKeyProcessorSupplier(name,
processorSupplier)
Review Comment:
> Ah, yes, it's correct but I see how it's confusing -- basically the
non-static #wrapFixedKeyProcessorSupplier is what does the actual wrapping via
the configured ProcessorWrapper instance, whereas the static
ProcessorWrapper#asWrapped method is what converts a regular ProcessorSupplier
into the WrappedProcessorSupplier subclass that we use as a marker interface to
distinguish between processors that are and aren't wrapped yet.
I'm not following this. `wrappedFixedKeyProcessorSupplier` is already
expected to return an instance that implements `WrappedProcessorSupplier` - so
why does `InternalTopologyBuilder` also need to enclose it in a
`WrappedProcessorSupplierImpl`? Do we explicitly check for the
`WrappedProcessorSupplierImpl` class elsewhere?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]