Author: bvahdat Date: Thu Mar 8 10:11:06 2012 New Revision: 1298337 URL: http://svn.apache.org/viewvc?rev=1298337&view=rev Log: CAMEL-5058: Polished the provided unit-test.
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java?rev=1298337&r1=1298336&r2=1298337&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultInflightRepositoryConcurrentTest.java Thu Mar 8 10:11:06 2012 @@ -27,26 +27,26 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.spi.InflightRepository; /** * @version */ public class DefaultInflightRepositoryConcurrentTest extends ContextTestSupport { - public static final int THREAD_COUNT = 20; - public static final int TOTAL_ENDPOINTS = 10000; - public static final int LOOP_COUNT = 100000; + private static final int THREAD_COUNT = 20; + private static final int TOTAL_ENDPOINTS = 10000; + private static final int LOOP_COUNT = 100000; - private static boolean failure; + // the failed flag should be marked as volatile so that we have got guaranteed visibility inside the main thread + private static volatile boolean failed; private static CamelContext context = new DefaultCamelContext(); public void testThreaded() throws Exception { - long started = System.currentTimeMillis(); - DefaultInflightRepository toTest = new DefaultInflightRepository(); - Endpoint[] eps = new Endpoint[TOTAL_ENDPOINTS]; + Endpoint[] endpoints = new Endpoint[TOTAL_ENDPOINTS]; - for (int i = 0; i < eps.length; i++) { + for (int i = 0; i < endpoints.length; i++) { // create TOTAL_ENDPOINTS endpoints Endpoint endpoint = new DefaultEndpoint() { final String uri = "def:" + System.nanoTime(); @@ -67,50 +67,54 @@ public class DefaultInflightRepositoryCo return null; } }; - eps[i] = endpoint; + endpoints[i] = endpoint; } - AtomicInteger locker = new AtomicInteger(0); + AtomicInteger locker = new AtomicInteger(); - Thread[] ts = new Thread[THREAD_COUNT]; - for (int i = 0; i < ts.length; i++) { - TypicalConsumer consumer = new TypicalConsumer(); - consumer.eps = eps; - consumer.repo = toTest; - consumer.locker = locker; - ts[i] = new Thread(consumer); + Thread[] threads = new Thread[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(new TypicalConsumer(endpoints, toTest, locker)); } - for (int i = 0; i < ts.length; i++) { - ts[i].start(); + for (int i = 0; i < threads.length; i++) { + threads[i].start(); } + Thread.sleep(1000); + while (locker.get() > 0) { synchronized (locker) { locker.wait(); } } - if (failure) { - throw new Exception("Failed to properly track endpoints"); + if (failed) { + throw new AssertionError("Failed to properly track endpoints"); } - for (Endpoint ep : eps) { - Assert.assertTrue("Size MUST be 0", 0 == toTest.size(ep)); + for (Endpoint endpoint : endpoints) { + Assert.assertEquals("Size MUST be 0", 0, toTest.size(endpoint)); } if (toTest.size() > 0) { - throw new Exception("Test either incomplete or tracking failed"); + throw new AssertionError("Test either incomplete or tracking failed"); } - Assert.assertTrue("Must not have any references left", 0 == toTest.endpointSize()); + Assert.assertEquals("Must not have any references left", 0, toTest.endpointSize()); } private static class TypicalConsumer implements Runnable { - Endpoint[] eps; - DefaultInflightRepository repo; - Random rand = new Random(System.nanoTime()); + Endpoint[] endpoints; + InflightRepository repo; AtomicInteger locker; + Random rand = new Random(System.nanoTime()); + + TypicalConsumer(Endpoint[] endpoints, InflightRepository repo, AtomicInteger locker) { + this.endpoints = endpoints; + this.repo = repo; + this.locker = locker; + } public void run() { synchronized (locker) { @@ -118,18 +122,19 @@ public class DefaultInflightRepositoryCo } try { for (int i = 0; i < LOOP_COUNT; i++) { - Endpoint ep = eps[Math.abs(rand.nextInt() % eps.length)]; - ep.setCamelContext(context); - Exchange ex = new DefaultExchange(ep); - repo.add(ex); - int size = repo.size(ep); + Endpoint endpoint = endpoints[Math.abs(rand.nextInt() % endpoints.length)]; + endpoint.setCamelContext(context); + Exchange exchange = new DefaultExchange(endpoint); + repo.add(exchange); + int size = repo.size(endpoint); if (size <= 0) { - failure = true; + failed = true; } - repo.remove(ex); + repo.remove(exchange); } - } catch (Exception e) { - failure = true; + // just to make it sure do catch any possible Throwable + } catch (Throwable t) { + failed = true; } synchronized (locker) {