[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikki Thean updated KAFKA-7055:
-------------------------------
    Description: 
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an example where you forward using name of downstream node 
rather than child index 
([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using this method: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like this one for when a user attempts to access a state 
store that is not connected to the processor: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]

  was:
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an example where you forward using name of downstream node 
rather than child index 
(https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117).

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using this method: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423.

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like this one for when a user attempts to access a state 
store that is not connected to the processor: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]]


> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-7055
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7055
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Nikki Thean
>            Assignee: Nikki Thean
>            Priority: Minor
>              Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like this one for when a user attempts to access a state 
> store that is not connected to the processor: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to