mjsax commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542738987
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final
Cluster metadata) {
final boolean isMissingInputTopics =
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
if (isMissingInputTopics) {
if (!taskManager.topologyMetadata().hasNamedTopologies()) {
- throw new MissingSourceTopicException("Missing source
topics.");
Review Comment:
Was Bruno meant was, that we need to add `log.error(...)` to log the error
message before throwing the exception. Seems you did not add this yet?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final
Cluster metadata) {
final boolean isMissingInputTopics =
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
if (isMissingInputTopics) {
if (!taskManager.topologyMetadata().hasNamedTopologies()) {
- throw new MissingSourceTopicException("Missing source
topics.");
Review Comment:
Adding to a previous comment from Bruno, I am wondering if we should also
change the error log in `StreamsRebalanceListener` to point out that the
missing source topic names might be logged on a different instance?
The `StreamsRebalanceListener` is executed on every instance, but
`StreamsPartitionAssignor` only on the group leader.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]