Repository: incubator-edgent
Updated Branches:
  refs/heads/develop 05faa3307 -> e4c3d3df9


[Edgent-441] adjust kafka test timeouts

Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/f0d7fe0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/f0d7fe0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/f0d7fe0c

Branch: refs/heads/develop
Commit: f0d7fe0cc368fa1b9307819d54122e18b4242cb7
Parents: 05faa33
Author: Dale LaBossiere <dlab...@us.ibm.com>
Authored: Sun Dec 10 17:51:03 2017 -0500
Committer: Dale LaBossiere <dlab...@us.ibm.com>
Committed: Sun Dec 10 17:51:03 2017 -0500

----------------------------------------------------------------------
 .../connectors/kafka/runtime/KafkaConsumerConnector.java       | 6 ++++--
 .../edgent/test/connectors/kafka/KafkaStreamsTestManual.java   | 4 ++--
 2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/f0d7fe0c/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
----------------------------------------------------------------------
diff --git 
a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
 
b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
index f5f879f..3e280f4 100644
--- 
a/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
+++ 
b/connectors/kafka/src/main/java/org/apache/edgent/connectors/kafka/runtime/KafkaConsumerConnector.java
@@ -157,6 +157,7 @@ public class KafkaConsumerConnector extends KafkaConnector {
     }
     
     synchronized void start(KafkaSubscriber<?> subscriber) {
+        trace.info("{} starting consumer", id());
         Map<String,Integer> topicCountMap = new HashMap<>();
         int threadsPerTopic = 1;
         int totThreadCnt = 0;
@@ -176,9 +177,10 @@ public class KafkaConsumerConnector extends KafkaConnector 
{
             String topic = entry.getKey();
             int threadNum = 0;
             for (KafkaStream<byte[],byte[]> stream : entry.getValue()) {
+                final int fThreadNum = threadNum++; 
                 executor.submit(() -> {
                     try {
-                        trace.info("{} started consumer thread {} for 
topic:{}", id(), threadNum, topic);
+                        trace.info("{} started consumer thread {} for 
topic:{}", id(), fThreadNum, topic);
                         ConsumerIterator<byte[],byte[]> it = stream.iterator();
                         while (it.hasNext()) {
                             subscriber.accept(it.next());
@@ -193,7 +195,7 @@ public class KafkaConsumerConnector extends KafkaConnector {
                             trace.error("{} consumer for topic:{}. got 
exception", id(), topic, t);
                     }
                     finally {
-                        trace.info("{} consumer thread {} for topic:{} 
exiting.", id(), threadNum, topic);
+                        trace.info("{} consumer thread {} for topic:{} 
exiting.", id(), fThreadNum, topic);
                     }
                 });
             }

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/f0d7fe0c/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
----------------------------------------------------------------------
diff --git 
a/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
 
b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
index 74e5b63..11ec7dd 100644
--- 
a/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
+++ 
b/connectors/kafka/src/test/java/org/apache/edgent/test/connectors/kafka/KafkaStreamsTestManual.java
@@ -80,8 +80,8 @@ import org.junit.Test;
  * }</pre>
  */
 public class KafkaStreamsTestManual extends ConnectorTestBase {
-    private static final int PUB_DELAY_MSEC = 4*1000;
-    private static final int SEC_TIMEOUT = 10;
+    private static final int PUB_DELAY_MSEC = 15*1000; // have seen 12sec 1st 
test's consumer startup delay
+    private static final int SEC_TIMEOUT = 20;
     private final String BASE_GROUP_ID = "kafkaStreamsTestGroupId";
     private final String uniq = simpleTS();
     private final String msg1 = "Hello";

Reply via email to