ganesh-sadanala commented on PR #16162:
URL: https://github.com/apache/kafka/pull/16162#issuecomment-2143705807
I have completed the implementation using the SlidingWindow approach with
x=30 seconds for testing. Here are the changes:
https://github.com/apache/kafka/pull/16162
I have followed these steps to test the changes, but I still see the
puncutate-ratio as zero for all the instances of example Demo class.
Start ZooKeeper, Kafka Broker.
Created input and output topics with 3 partitions (for the sake of having
active tasks distributed to multiple instances of WordCountProcessorDemo stream
class)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic
streams-plaintext-input --partitions 3 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic
streams-wordcount-output --partitions 3 --replication-factor 1
4. Run the 3 instances of Kafka Streams Demo Application in different
terminals/processors:
bin/kafka-run-class.sh
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo
5. Produce and consume data
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic
streams-plaintext-input
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
streams-wordcount-output --from-beginning
6. Open the jconsole and watch the metrics
I see that all the metrics are getting calculated. When I run the debugger,
I see that in this code tasks.activeTasks() is an empty list. Because of that
punctuated values is becoming zero, hence the punctuate ratio.
TaskExecutor.java
```
int punctuate() {
int punctuated = 0;
for (final Task task : tasks.activeTasks()) {
try {
if (executionMetadata.canPunctuateTask(task)) {
if (task.maybePunctuateStreamTime()) {
punctuated++;
}
if (task.maybePunctuateSystemTime()) {
punctuated++;
}
}
} catch (final TaskMigratedException e) {
log.info("Failed to punctuate stream task {} since it got
migrated to another thread already. " +
"Will trigger a new rebalance and close all tasks as
zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
log.error("Failed to punctuate stream task {} due to the
following error:", task.id(), e);
e.setTaskId(task.id());
throw e;
} catch (final KafkaException e) {
log.error("Failed to punctuate stream task {} due to the
following error:", task.id(), e);
throw new StreamsException(e, task.id());
}
}
return punctuated;
}
}
```
Is there a way to make active tasks list non-empty, thus I can test the
changes and write some unit tests?
Is this behaviour normal in the local environment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]