[
https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851160#comment-17851160
]
Ganesh Sadanala edited comment on KAFKA-16811 at 5/31/24 4:47 PM:
------------------------------------------------------------------
[~sebviale] [~rohanpd] I have completed the implementation using the
SlidingWindow approach with x=30 seconds for testing. Here are the changes:
[https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99]
I have followed these steps to test the changes, but I still see the
puncutate-ratio as zero for all the instances of example Demo class.
# Start ZooKeeper, Kafka Broker.
# Created input and output topics with 3 partitions (for the sake of having
active tasks distributed to multiple instances of WordCountProcessorDemo stream
class)
#
{code:java}
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic
streams-plaintext-input --partitions 3 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic
streams-wordcount-output --partitions 3 --replication-factor 1 {code}
4. Run the 3 instances of Kafka Streams Demo Application in different
terminals/processors:
#
{code:java}
bin/kafka-run-class.sh
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code}
5. Produce and consume data
#
{code:java}
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic
streams-plaintext-input
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
streams-wordcount-output --from-beginning{code}
6. Open the jconsole and watch the metrics
I see that all the metrics are getting calculated, but when in run the
debugger, I see that in this code tasks.activeTasks() is empty list.
TaskExecutor.java
{code:java}
int punctuate() {
int punctuated = 0;
for (final Task task : tasks.activeTasks()) {
try {
if (executionMetadata.canPunctuateTask(task)) {
if (task.maybePunctuateStreamTime()) {
punctuated++;
}
if (task.maybePunctuateSystemTime()) {
punctuated++;
}
}
} catch (final TaskMigratedException e) {
log.info("Failed to punctuate stream task {} since it got
migrated to another thread already. " +
"Will trigger a new rebalance and close all tasks as
zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
log.error("Failed to punctuate stream task {} due to the
following error:", task.id(), e);
e.setTaskId(task.id());
throw e;
} catch (final KafkaException e) {
log.error("Failed to punctuate stream task {} due to the
following error:", task.id(), e);
throw new StreamsException(e, task.id());
}
}
return punctuated;
}
} {code}
Is there a way to make active tasks list non-empty, thus I can test the changes
and write some unit tests?
Is this behaviour normal in the local environment?
was (Author: JIRAUSER305566):
[~sebviale] I have completed the implementation using the SlidingWindow
approach with x=30 seconds for testing. Here are the changes:
[https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99]
I have followed these steps to test the changes, but I still see the
puncutate-ratio as zero for all the instances of example Demo class.
# Start ZooKeeper, Kafka Broker.
# Created input and output topics with 3 partitions (for the sake of having
active tasks distributed to multiple instances of WordCountProcessorDemo stream
class)
#
{code:java}
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic
streams-plaintext-input --partitions 3 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic
streams-wordcount-output --partitions 3 --replication-factor 1 {code}
4. Run the 3 instances of Kafka Streams Demo Application in different
terminals/processors:
#
{code:java}
bin/kafka-run-class.sh
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code}
5. Produce and consume data
#
{code:java}
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic
streams-plaintext-input
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
streams-wordcount-output --from-beginning{code}
6. Open the jconsole and watch the metrics
I see that all the metrics are getting calculated, but when in run the
debugger, I see that in this code tasks.activeTasks() is empty list.
TaskExecutor.java
{code:java}
int punctuate() {
int punctuated = 0;
for (final Task task : tasks.activeTasks()) {
try {
if (executionMetadata.canPunctuateTask(task)) {
if (task.maybePunctuateStreamTime()) {
punctuated++;
}
if (task.maybePunctuateSystemTime()) {
punctuated++;
}
}
} catch (final TaskMigratedException e) {
log.info("Failed to punctuate stream task {} since it got
migrated to another thread already. " +
"Will trigger a new rebalance and close all tasks as
zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
log.error("Failed to punctuate stream task {} due to the
following error:", task.id(), e);
e.setTaskId(task.id());
throw e;
} catch (final KafkaException e) {
log.error("Failed to punctuate stream task {} due to the
following error:", task.id(), e);
throw new StreamsException(e, task.id());
}
}
return punctuated;
}
} {code}
Is there a way to make active tasks list non-empty, thus I can test the changes
and write some unit tests?
Is this behaviour normal in the local environment?
> Punctuate Ratio metric almost impossible to track
> -------------------------------------------------
>
> Key: KAFKA-16811
> URL: https://issues.apache.org/jira/browse/KAFKA-16811
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.7.0
> Reporter: Sebastien Viale
> Assignee: Ganesh Sadanala
> Priority: Minor
>
> The Punctuate ratio metric is returned after the last record of the poll
> loop. It is recomputed in every poll loop.
> After a puntuate, the value is close to 1, but there is little chance that
> metric is sampled at this time.
> So its value is almost always 0.
> A solution could be to apply a kind of "sliding window" to it and report the
> value for the last x seconds.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)