This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 40ff1c1e986 CAMEL-20252: remove misleading tests 40ff1c1e986 is described below commit 40ff1c1e9864b5502b656dce7322e2f3ae7d8da7 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Mon Dec 18 17:47:25 2023 +0100 CAMEL-20252: remove misleading tests --- .../disruptor/SedaDisruptorCompareTest.java | 437 --------------------- .../bean/CamelSimpleExpressionPerfTestRunner.java | 77 ---- 2 files changed, 514 deletions(-) diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/SedaDisruptorCompareTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/SedaDisruptorCompareTest.java deleted file mode 100644 index 872b034a3b3..00000000000 --- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/SedaDisruptorCompareTest.java +++ /dev/null @@ -1,437 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.disruptor; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.HdrHistogram.Histogram; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Processor; -import org.apache.camel.Produce; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.seda.SedaEndpoint; -import org.apache.camel.test.junit5.CamelTestSupport; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; - -/** - * This class does not perform any functional test, but instead makes a comparison between the performance of the - * Disruptor and SEDA component in several use cases. - * <p/> - * As memory management may have great impact on the results, it is adviced to run this test with a large, fixed heap - * (e.g. run with -Xmx1024m -Xms1024m JVM parameters) - */ -@Disabled -public class SedaDisruptorCompareTest extends CamelTestSupport { - // Use '0' for default value, '1'+ for specific value to be used by both SEDA and DISRUPTOR. - private static final int SIZE_PARAMETER_VALUE = 1024; - private static final int SPEED_TEST_EXCHANGE_COUNT = 80000; - private static final long[] LATENCY_HISTOGRAM_BOUNDS = new long[] { 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000 }; - private static final long[] DISRUPTOR_SIZE_HISTOGRAM_BOUNDS = generateLinearHistogramBounds( - SIZE_PARAMETER_VALUE == 0 ? 1024 : SIZE_PARAMETER_VALUE, 8); - private static final long[] SEDA_SIZE_HISTOGRAM_BOUNDS = generateLinearHistogramBounds( - SIZE_PARAMETER_VALUE == 0 ? SPEED_TEST_EXCHANGE_COUNT : SIZE_PARAMETER_VALUE, 10); - - private static Collection<Object[]> tPARAMETERS; - - @Produce - protected ProducerTemplate producerTemplate; - - private final Queue<Integer> endpointSizeQueue = new ConcurrentLinkedQueue<>(); - - @BeforeAll - public static void legend() { - tPARAMETERS = parameters(); - System.out.println("-----------------------"); - System.out.println("- Tests output legend -"); - System.out.println("-----------------------"); - System.out.println( - "P: Number of concurrent Producer(s) sharing the load for publishing exchanges to the disruptor."); - System.out.println( - "C: Number of Consumer(s) receiving a copy of each exchange from the disruptor (pub/sub)."); - System.out.println( - "CCT: Number of ConcurrentConsumerThreads sharing the load for consuming exchanges from the disruptor."); - System.out.println( - "SIZE: Maximum number of elements a SEDA or disruptor endpoint can have in memory before blocking the Producer thread(s)."); - System.out.println(" 0 means default value, so unbounded for SEDA and 1024 for disruptor."); - System.out.println("Each test is creating " + SPEED_TEST_EXCHANGE_COUNT + " exchanges."); - System.out.println(); - } - - private static long[] generateLinearHistogramBounds(final int maxValue, final int nbSlots) { - final long slotSize = maxValue / nbSlots; - final long[] bounds = new long[nbSlots]; - for (int i = 0; i < nbSlots; i++) { - bounds[i] = slotSize * (i + 1); - } - return bounds; - } - - private static int singleProducer() { - return 1; - } - - private static int multipleProducers() { - return 4; - } - - private static ExchangeAwaiter[] singleConsumer() { - return new ExchangeAwaiter[] { new ExchangeAwaiter(SPEED_TEST_EXCHANGE_COUNT) }; - } - - private static ExchangeAwaiter[] multipleConsumers() { - ExchangeAwaiter[] exchangeAwaiters = new ExchangeAwaiter[4]; - for (int i = 0; i < exchangeAwaiters.length; ++i) { - exchangeAwaiters[i] = new ExchangeAwaiter(SPEED_TEST_EXCHANGE_COUNT); - } - return exchangeAwaiters; - } - - private static int singleConcurrentConsumerThread() { - return 1; - } - - private static int multipleConcurrentConsumerThreads() { - return 2; - } - - public static Collection<Object[]> getTestParameters() { - return tPARAMETERS; - } - - public static Collection<Object[]> parameters() { - final List<Object[]> parameters = new ArrayList<>(); - - // This parameter set can be compared to the next and shows the impact of a 'long' endpoint name - // It defines all parameters to the same values as the default, so the result should be the same as - // 'seda:speedtest'. This shows that disruptor has a slight disadvantage as its name is longer than 'seda' :) - // The reason why this test takes so long is because Camel has a SLF4J call in ProducerCache: - // log.debug(">>>> {} {}", endpoint, exchange); - // and the DefaultEndpoint.toString() method will use a Matcher to sanitize the URI. There should be a guard - // before the debug() call to only evaluate the args when required: if(log.isDebugEnabled())... - if (SIZE_PARAMETER_VALUE == 0) { - parameters - .add(new Object[] { - "SEDA LONG {P=1, C=1, CCT=1, SIZE=0}", - "seda:speedtest?concurrentConsumers=1&waitForTaskToComplete=IfReplyExpected&timeout=30000&multipleConsumers=false&limitConcurrentConsumers=true&blockWhenFull=false", - singleProducer(), singleConsumer(), singleConcurrentConsumerThread(), - SEDA_SIZE_HISTOGRAM_BOUNDS }); - } else { - parameters - .add(new Object[] { - "SEDA LONG {P=1, C=1, CCT=1, SIZE=" + SIZE_PARAMETER_VALUE + "}", - "seda:speedtest?concurrentConsumers=1&waitForTaskToComplete=IfReplyExpected&timeout=30000&multipleConsumers=false&limitConcurrentConsumers=true&blockWhenFull=true&size=" - + SIZE_PARAMETER_VALUE, - singleProducer(), singleConsumer(), - singleConcurrentConsumerThread(), SEDA_SIZE_HISTOGRAM_BOUNDS }); - } - addParameterPair(parameters, singleProducer(), singleConsumer(), singleConcurrentConsumerThread()); - addParameterPair(parameters, singleProducer(), singleConsumer(), multipleConcurrentConsumerThreads()); - addParameterPair(parameters, singleProducer(), multipleConsumers(), singleConcurrentConsumerThread()); - addParameterPair(parameters, singleProducer(), multipleConsumers(), multipleConcurrentConsumerThreads()); - addParameterPair(parameters, multipleProducers(), singleConsumer(), singleConcurrentConsumerThread()); - addParameterPair(parameters, multipleProducers(), singleConsumer(), multipleConcurrentConsumerThreads()); - addParameterPair(parameters, multipleProducers(), multipleConsumers(), singleConcurrentConsumerThread()); - addParameterPair(parameters, multipleProducers(), multipleConsumers(), multipleConcurrentConsumerThreads()); - - // Make endpointUris unique - int i = 0; - for (Object[] params : parameters) { - String endpointUri = (String) params[1]; - String uniqueEndpointUri = endpointUri.replaceFirst("([a-z]+):([^?]+)\\?(.*)", "$1:$2-" + i + "?$3"); - params[1] = uniqueEndpointUri; - i++; - } - - return parameters; - } - - private static void addParameterPair( - final List<Object[]> parameters, final int producers, - final ExchangeAwaiter[] consumers, final int parallelConsumerThreads) { - final String multipleConsumerOption = consumers.length > 1 ? "multipleConsumers=true" : ""; - final String concurrentConsumerOptions - = parallelConsumerThreads > 1 ? "concurrentConsumers=" + parallelConsumerThreads : ""; - final String sizeOption = SIZE_PARAMETER_VALUE > 0 ? "size=" + SIZE_PARAMETER_VALUE : ""; - final String sizeOptionSeda = SIZE_PARAMETER_VALUE > 0 ? "&blockWhenFull=true" : ""; - - String options = ""; - if (!multipleConsumerOption.isEmpty()) { - if (!options.isEmpty()) { - options += "&"; - } - options += multipleConsumerOption; - } - if (!concurrentConsumerOptions.isEmpty()) { - if (!options.isEmpty()) { - options += "&"; - } - options += concurrentConsumerOptions; - } - if (!sizeOption.isEmpty()) { - if (!options.isEmpty()) { - options += "&"; - } - options += sizeOption; - } - - if (!options.isEmpty()) { - options = "?" + options; - } - - final String sedaOptions = sizeOptionSeda.isEmpty() ? options : options + sizeOptionSeda; - // Using { ... } because there is a bug in JUnit 4.11 and Eclipse: https://bugs.eclipse.org/bugs/show_bug.cgi?id=102512 - final String testDescription = " { P=" + producers + ", C=" + consumers + ", CCT=" - + parallelConsumerThreads + ", SIZE=" + SIZE_PARAMETER_VALUE + " }"; - parameters.add(new Object[] { - "SEDA" + testDescription, "seda:speedtest" + sedaOptions, producers, - consumers, parallelConsumerThreads, SEDA_SIZE_HISTOGRAM_BOUNDS }); - parameters.add(new Object[] { - "Disruptor" + testDescription, "disruptor:speedtest" + options, producers, - consumers, parallelConsumerThreads, DISRUPTOR_SIZE_HISTOGRAM_BOUNDS }); - } - - @ParameterizedTest - @MethodSource("getTestParameters") - void speedTestDisruptor( - final String componentName, final String endpointUri, final int amountProducers, - final ExchangeAwaiter[] exchangeAwaiters, - final int concurrentConsumerThreads, final long[] sizeHistogramBounds) - throws InterruptedException { - - System.out.println("Warming up for test of: " + componentName); - - performTest(componentName, endpointUri, true, exchangeAwaiters, amountProducers, sizeHistogramBounds); - System.out.println("Starting real test of: " + componentName); - - forceGC(); - Thread.sleep(1000); - - performTest(componentName, endpointUri, false, exchangeAwaiters, amountProducers, sizeHistogramBounds); - } - - private void forceGC() { - // unfortunately there is no nice API that forces the Garbage collector to run, but it may consider our request - // more seriously if we ask it twice :) - System.gc(); - System.gc(); - } - - private void resetExchangeAwaiters(ExchangeAwaiter[] exchangeAwaiters) { - for (final ExchangeAwaiter exchangeAwaiter : exchangeAwaiters) { - exchangeAwaiter.reset(); - } - } - - private void awaitExchangeAwaiters(String componentName, ExchangeAwaiter[] exchangeAwaiters) throws InterruptedException { - for (final ExchangeAwaiter exchangeAwaiter : exchangeAwaiters) { - while (!exchangeAwaiter.awaitMessagesReceived(10, TimeUnit.SECONDS)) { - System.err.println( - "Processing takes longer then expected: " + componentName + " " + exchangeAwaiter - .getStatus()); - } - } - } - - private void outputExchangeAwaitersResult(String componentName, final long start, ExchangeAwaiter[] exchangeAwaiters) { - for (final ExchangeAwaiter exchangeAwaiter : exchangeAwaiters) { - final long stop = exchangeAwaiter.getCountDownReachedTime(); - final Histogram histogram = exchangeAwaiter.getLatencyHistogram(); - - System.out.printf("%-45s time spent = %5d ms.%n", componentName, stop - start); - histogram.outputPercentileDistribution(System.out, 1, 1000.0); - } - } - - private void performTest( - String componentName, String endpointUri, final boolean warmup, ExchangeAwaiter[] exchangeAwaiters, - int amountProducers, long[] sizeHistogramBounds) - throws InterruptedException { - resetExchangeAwaiters(exchangeAwaiters); - - final ProducerThread[] producerThread = new ProducerThread[amountProducers]; - for (int i = 0; i < producerThread.length; ++i) { - producerThread[i] = new ProducerThread(SPEED_TEST_EXCHANGE_COUNT / amountProducers, endpointUri); - } - - ExecutorService monitoring = null; - if (!warmup) { - monitoring = installSizeMonitoring(context.getEndpoint(endpointUri)); - } - final long start = System.currentTimeMillis(); - - for (ProducerThread element : producerThread) { - element.start(); - } - - awaitExchangeAwaiters(componentName, exchangeAwaiters); - - if (!warmup) { - outputExchangeAwaitersResult(componentName, start, exchangeAwaiters); - uninstallSizeMonitoring(monitoring, sizeHistogramBounds); - } - } - - private ExecutorService installSizeMonitoring(final Endpoint endpoint) { - final ScheduledExecutorService service = context.getExecutorServiceManager() - .newScheduledThreadPool(this, "SizeMonitoringThread", 1); - endpointSizeQueue.clear(); - final Runnable monitoring = new Runnable() { - @Override - public void run() { - if (endpoint instanceof SedaEndpoint) { - final SedaEndpoint sedaEndpoint = (SedaEndpoint) endpoint; - endpointSizeQueue.offer(sedaEndpoint.getCurrentQueueSize()); - } else if (endpoint instanceof DisruptorEndpoint) { - final DisruptorEndpoint disruptorEndpoint = (DisruptorEndpoint) endpoint; - - long remainingCapacity = 0; - try { - remainingCapacity = disruptorEndpoint.getRemainingCapacity(); - } catch (DisruptorNotStartedException e) { - //ignore - } - endpointSizeQueue.offer((int) (disruptorEndpoint.getBufferSize() - remainingCapacity)); - } - } - }; - service.scheduleAtFixedRate(monitoring, 0, 100, TimeUnit.MILLISECONDS); - return service; - } - - private void uninstallSizeMonitoring(final ExecutorService monitoring, long[] sizeHistogramBounds) { - if (monitoring != null) { - monitoring.shutdownNow(); - } - final Histogram histogram = new Histogram(sizeHistogramBounds[sizeHistogramBounds.length - 1], 4); - for (final int observation : endpointSizeQueue) { - histogram.recordValue(observation); - } - System.out.printf("%82s %s%n", "Endpoint size (# exchanges pending):", histogram.toString()); - } - - @Override - protected RouteBuilder createRouteBuilder() { - return new RouteBuilder() { - @Override - public void configure() { - for (Object[] parameters : tPARAMETERS) { - ExchangeAwaiter[] exchangeAwaiters = (ExchangeAwaiter[]) parameters[3]; - String endpointUri = (String) parameters[1]; - for (final ExchangeAwaiter exchangeAwaiter : exchangeAwaiters) { - from(endpointUri).process(exchangeAwaiter); - } - } - } - }; - } - - private static final class ExchangeAwaiter implements Processor { - - private CountDownLatch latch; - private final int count; - private long countDownReachedTime; - - private Queue<Long> latencyQueue = new ConcurrentLinkedQueue<>(); - - ExchangeAwaiter(final int count) { - this.count = count; - } - - public void reset() { - latencyQueue = new ConcurrentLinkedQueue<>(); - latch = new CountDownLatch(count); - countDownReachedTime = 0; - } - - public boolean awaitMessagesReceived(final long timeout, final TimeUnit unit) throws InterruptedException { - return latch.await(timeout, unit); - } - - public String getStatus() { - final StringBuilder sb = new StringBuilder(100); - sb.append("processed "); - sb.append(count - latch.getCount()); - sb.append('/'); - sb.append(count); - sb.append(" messages"); - - return sb.toString(); - } - - @Override - public void process(final Exchange exchange) { - final long sentTimeNs = exchange.getIn().getBody(Long.class); - latencyQueue.offer(Long.valueOf(System.nanoTime() - sentTimeNs)); - - countDownReachedTime = System.currentTimeMillis(); - latch.countDown(); - } - - public long getCountDownReachedTime() { - // Make sure we wait until all exchanges have been processed. Otherwise the time value doesn't make sense. - try { - latch.await(); - } catch (InterruptedException e) { - countDownReachedTime = 0; - } - return countDownReachedTime; - } - - public Histogram getLatencyHistogram() { - final Histogram histogram = new Histogram(LATENCY_HISTOGRAM_BOUNDS[LATENCY_HISTOGRAM_BOUNDS.length - 1], 4); - for (final Long latencyValue : latencyQueue) { - histogram.recordValue(latencyValue / 1000000); - } - return histogram; - } - } - - private final class ProducerThread extends Thread { - - private final int totalMessageCount; - private int producedMessageCount; - private String endpointUri; - - ProducerThread(final int totalMessageCount, String endpointUri) { - super("TestDataProducerThread"); - this.totalMessageCount = totalMessageCount; - this.endpointUri = endpointUri; - } - - @Override - public void run() { - final Endpoint endpoint = context().getEndpoint(endpointUri); - while (producedMessageCount++ < totalMessageCount) { - producerTemplate.sendBody(endpoint, ExchangePattern.InOnly, System.nanoTime()); - } - } - } -} diff --git a/core/camel-core/src/test/java/org/apache/camel/component/bean/CamelSimpleExpressionPerfTestRunner.java b/core/camel-core/src/test/java/org/apache/camel/component/bean/CamelSimpleExpressionPerfTestRunner.java deleted file mode 100644 index 34002e89a05..00000000000 --- a/core/camel-core/src/test/java/org/apache/camel/component/bean/CamelSimpleExpressionPerfTestRunner.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.bean; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.camel.CamelContext; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.DefaultCamelContext; - -public final class CamelSimpleExpressionPerfTestRunner { - private static final int MESSAGE_LOOP_COUNT = 1000; - private static final int TEST_EXECUTION_COUNT = 5; - - private CamelSimpleExpressionPerfTestRunner() { - // Utils class - } - - public static void main(String[] args) throws Exception { - long bodyOnly = executePerformanceTest("${body}"); - long bodyProperty = executePerformanceTest("${body[p]}"); - long bodyPropertyWithCache = executePerformanceTest("${body[p]}"); - - System.out.printf("${body}: %dms%n", bodyOnly); - System.out.printf("${body[p]} : %dms%n", bodyProperty); - System.out.printf("${body[p]} with cache : %dms%n", bodyPropertyWithCache); - } - - private static long executePerformanceTest(final String simpleExpression) throws Exception { - CamelContext ctx = new DefaultCamelContext(); - - ctx.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:start").loop(MESSAGE_LOOP_COUNT).setHeader("test").simple(simpleExpression).to("mock:plop"); - } - }); - - ctx.start(); - - Map<String, String> body = new HashMap<>(); - body.put("p", "q"); - - ProducerTemplate template = ctx.createProducerTemplate(); - // Initial one, it's a dry start, we don't care about this one. - template.sendBody("direct:start", body); - - // Measure the duration of the executions in nanoseconds - long totalNsDuration = 0; - for (int i = 0; i < TEST_EXECUTION_COUNT; i++) { - long tick = System.nanoTime(); - template.sendBody("direct:start", body); - totalNsDuration += System.nanoTime() - tick; - } - - ctx.stop(); - - // Return the average duration in milliseconds - return totalNsDuration / TEST_EXECUTION_COUNT / 1000 / 1000; - } -}