ableegoldman commented on code in PR #17892:
URL: https://github.com/apache/kafka/pull/17892#discussion_r1851710543
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java:
##########
@@ -78,18 +80,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut>
fixedKeyProcessorSupplier() {
public void addProcessorTo(final InternalTopologyBuilder topologyBuilder,
final String[] parentNodeNames) {
if (processorSupplier != null) {
- topologyBuilder.addProcessor(processorName, processorSupplier,
parentNodeNames);
- if (processorSupplier.stores() != null) {
- for (final StoreBuilder<?> storeBuilder :
processorSupplier.stores()) {
+ ApiUtils.checkSupplier(processorSupplier);
+
+ final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =
Review Comment:
this is how the DSL implementation will work -- processors are wrapped when
building the DSL into a Topology, specifically in this method where they are
added to the InternalTopologyBuilder.
Note this PR is only a partial implementation since not all DSL operators go
through this class, some just write their processors to the
InternalTopologyBuilder directly. Followup PRs will tackle converting these
operators and enforcing that any new ones added in the future will have to go
through this method and be wrapped
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]