Repository: incubator-edgent Updated Branches: refs/heads/develop 05faa3307 -> e4c3d3df9
[Edgent-441] adjust kafka test timeouts Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/f0d7fe0c Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/f0d7fe0c Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/f0d7fe0c Branch: refs/heads/develop Commit: f0d7fe0cc368fa1b9307819d54122e18b4242cb7 Parents: 05faa33 Author: Dale LaBossiere <dlab...@us.ibm.com> Authored: Sun Dec 10 17:51:03 2017 -0500 Committer: Dale LaBossiere <dlab...@us.ibm.com> Committed: Sun Dec 10 17:51:03 2017 -0500 ---------------------------------------------------------------------- .../connectors/kafka/runtime/KafkaConsumerConnector.java | 6 ++++-- .../edgent/test/connectors/kafka/KafkaStreamsTestManual.java | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/f0d7fe0c/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java ---------------------------------------------------------------------- diff --git a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java index f5f879f..3e280f4 100644 --- a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java +++ b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java @@ -157,6 +157,7 @@ public class KafkaConsumerConnector extends KafkaConnector { } synchronized void start(KafkaSubscriber<?> subscriber) { + trace.info("{} starting consumer", id()); Map<String,Integer> topicCountMap = new HashMap<>(); int threadsPerTopic = 1; int totThreadCnt = 0; @@ -176,9 +177,10 @@ public class KafkaConsumerConnector extends KafkaConnector { String topic = entry.getKey(); int threadNum = 0; for (KafkaStream<byte[],byte[]> stream : entry.getValue()) { + final int fThreadNum = threadNum++; executor.submit(() -> { try { - trace.info("{} started consumer thread {} for topic:{}", id(), threadNum, topic); + trace.info("{} started consumer thread {} for topic:{}", id(), fThreadNum, topic); ConsumerIterator<byte[],byte[]> it = stream.iterator(); while (it.hasNext()) { subscriber.accept(it.next()); @@ -193,7 +195,7 @@ public class KafkaConsumerConnector extends KafkaConnector { trace.error("{} consumer for topic:{}. got exception", id(), topic, t); } finally { - trace.info("{} consumer thread {} for topic:{} exiting.", id(), threadNum, topic); + trace.info("{} consumer thread {} for topic:{} exiting.", id(), fThreadNum, topic); } }); } http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/f0d7fe0c/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java ---------------------------------------------------------------------- diff --git a/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java index 74e5b63..11ec7dd 100644 --- a/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java +++ b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java @@ -80,8 +80,8 @@ import org.junit.Test; * }</pre> */ public class KafkaStreamsTestManual extends ConnectorTestBase { - private static final int PUB_DELAY_MSEC = 4*1000; - private static final int SEC_TIMEOUT = 10; + private static final int PUB_DELAY_MSEC = 15*1000; // have seen 12sec 1st test's consumer startup delay + private static final int SEC_TIMEOUT = 20; private final String BASE_GROUP_ID = "kafkaStreamsTestGroupId"; private final String uniq = simpleTS(); private final String msg1 = "Hello";