ableegoldman commented on code in PR #17892:
URL: https://github.com/apache/kafka/pull/17892#discussion_r1853099939
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java:
##########
@@ -78,18 +80,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut>
fixedKeyProcessorSupplier() {
public void addProcessorTo(final InternalTopologyBuilder topologyBuilder,
final String[] parentNodeNames) {
if (processorSupplier != null) {
- topologyBuilder.addProcessor(processorName, processorSupplier,
parentNodeNames);
- if (processorSupplier.stores() != null) {
- for (final StoreBuilder<?> storeBuilder :
processorSupplier.stores()) {
+ ApiUtils.checkSupplier(processorSupplier);
+
+ final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =
Review Comment:
Ultimately it might make sense to move the extraction of the stores from a
ProcessorSupplier into a single InternalTopologyBuilder method (eg
#addStatefulProcessor) that calls both #addProcessor and #addStateStore, and do
the wrapping there. This will also help future-proof new DSL operators from
being implemented incorrectly and missing the wrapper step.
But I think it makes sense to wait until we've completed all the followup
PRs for the remaining DSL operators, in case there are any weird edge cases we
haven't thought of/seen yet. Then we can do one final PR to clean up and
future-proof the wrapping process
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]