Author: ningjiang Date: Mon Jan 10 05:24:23 2011 New Revision: 1057086 URL: http://svn.apache.org/viewvc?rev=1057086&view=rev Log: CAMEL-3514 allow sampling throttler to sample based on message frequency
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/SamplingDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1057086&r1=1057085&r2=1057086&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Mon Jan 10 05:24:23 2011 @@ -1538,6 +1538,23 @@ public abstract class ProcessorDefinitio } /** + * <a href="http://camel.apache.org/sampling.html">Sampling Throttler</a> + * Creates a sampling throttler allowing you to extract a sample of exchanges + * from the traffic through a route. It is configured with a sampling message frequency + * during which only a single exchange is allowed to pass through. + * All other exchanges will be stopped. + * + * @param messageFrequency this is the sample message frequency, only one exchange is + * allowed through for this many messages received + * @return the builder + */ + public SamplingDefinition sample(long messageFrequency) { + SamplingDefinition answer = new SamplingDefinition(messageFrequency); + addOutput(answer); + return answer; + } + + /** * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a> * Creates a splitter allowing you split a message into a number of pieces and process them individually. * <p> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SamplingDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SamplingDefinition.java?rev=1057086&r1=1057085&r2=1057086&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SamplingDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SamplingDefinition.java Mon Jan 10 05:24:23 2011 @@ -43,6 +43,9 @@ public class SamplingDefinition extends private Long samplePeriod; @XmlAttribute() + private Long messageFrequency; + + @XmlAttribute() @XmlJavaTypeAdapter(TimeUnitAdapter.class) private TimeUnit units; @@ -54,9 +57,17 @@ public class SamplingDefinition extends this.units = units; } + public SamplingDefinition(long messageFrequency) { + this.messageFrequency = messageFrequency; + } + @Override public String toString() { - return "Sample[1 Exchange per " + getSamplePeriod() + " " + getUnits().toString().toLowerCase() + " -> " + getOutputs() + "]"; + if (messageFrequency != null) { + return "Sample[1 Exchange per " + getMessageFrequency() + " messages received -> " + getOutputs() + "]"; + } else { + return "Sample[1 Exchange per " + getSamplePeriod() + " " + getUnits().toString().toLowerCase() + " -> " + getOutputs() + "]"; + } } @Override @@ -66,23 +77,43 @@ public class SamplingDefinition extends @Override public String getLabel() { - return "sample[1 Exchange per " + getSamplePeriod() + " " + getUnits().toString().toLowerCase() + "]"; + if (messageFrequency != null) { + return "sample[1 Exchange per " + getMessageFrequency() + " messages received]"; + } else { + return "sample[1 Exchange per " + getSamplePeriod() + " " + getUnits().toString().toLowerCase() + "]"; + } } @Override public Processor createProcessor(RouteContext routeContext) throws Exception { Processor childProcessor = this.createChildProcessor(routeContext, true); - // should default be 1 sample period - long time = getSamplePeriod() != null ? getSamplePeriod() : 1L; - // should default be in seconds - TimeUnit tu = getUnits() != null ? getUnits() : TimeUnit.SECONDS; - return new SamplingThrottler(childProcessor, time, tu); + + if (messageFrequency != null) { + return new SamplingThrottler(childProcessor, messageFrequency); + } else { + // should default be 1 sample period + long time = getSamplePeriod() != null ? getSamplePeriod() : 1L; + // should default be in seconds + TimeUnit tu = getUnits() != null ? getUnits() : TimeUnit.SECONDS; + return new SamplingThrottler(childProcessor, time, tu); + } } // Fluent API // ------------------------------------------------------------------------- /** + * Sets the sample message count which only a single {...@link org.apache.camel.Exchange} will pass through after this many received. + * + * @param messageFrequency + * @return the builder + */ + public SamplingDefinition sampleMessageFrequency(long messageFrequency) { + setMessageFrequency(messageFrequency); + return this; + } + + /** * Sets the sample period during which only a single {...@link org.apache.camel.Exchange} will pass through. * * @param samplePeriod the period @@ -115,6 +146,14 @@ public class SamplingDefinition extends this.samplePeriod = samplePeriod; } + public Long getMessageFrequency() { + return messageFrequency; + } + + public void setMessageFrequency(Long messageFrequency) { + this.messageFrequency = messageFrequency; + } + public void setUnits(String units) { this.units = TimeUnit.valueOf(units); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java?rev=1057086&r1=1057085&r2=1057086&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java Mon Jan 10 05:24:23 2011 @@ -40,6 +40,8 @@ import org.apache.commons.logging.LogFac public class SamplingThrottler extends DelegateAsyncProcessor { protected final transient Log log = LogFactory.getLog(getClass()); + private long messageFrequency; + private long currentMessageCount; private long samplePeriod; private long periodInNanos; private TimeUnit units; @@ -48,6 +50,15 @@ public class SamplingThrottler extends D private Object calculationLock = new Object(); private SampleStats sampled = new SampleStats(); + public SamplingThrottler(Processor processor, long messageFrequency) { + super(processor); + + if (messageFrequency <= 0) { + throw new IllegalArgumentException("A positive value is required for the sampling message frequency"); + } + this.messageFrequency = messageFrequency; + } + public SamplingThrottler(Processor processor, long samplePeriod, TimeUnit units) { super(processor); @@ -64,11 +75,19 @@ public class SamplingThrottler extends D @Override public String toString() { - return "SamplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase() + " -> " + getProcessor() + "]"; + if (messageFrequency > 0) { + return "SamplingThrottler[1 exchange per: " + messageFrequency + " messages received -> " + getProcessor() + "]"; + } else { + return "SamplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase() + " -> " + getProcessor() + "]"; + } } public String getTraceLabel() { - return "samplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase() + "]"; + if (messageFrequency > 0) { + return "samplingThrottler[1 exchange per: " + messageFrequency + " messages received]"; + } else { + return "samplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase() + "]"; + } } @Override @@ -76,16 +95,24 @@ public class SamplingThrottler extends D boolean doSend = false; synchronized (calculationLock) { - long now = System.nanoTime(); - if (now >= timeOfLastExchange + periodInNanos) { - doSend = true; - if (log.isTraceEnabled()) { - log.trace(sampled.sample()); + + if (messageFrequency > 0) { + currentMessageCount++; + if (currentMessageCount % messageFrequency == 0) { + doSend = true; } - timeOfLastExchange = now; } else { - if (log.isTraceEnabled()) { - log.trace(sampled.drop()); + long now = System.nanoTime(); + if (now >= timeOfLastExchange + periodInNanos) { + doSend = true; + if (log.isTraceEnabled()) { + log.trace(sampled.sample()); + } + timeOfLastExchange = now; + } else { + if (log.isTraceEnabled()) { + log.trace(sampled.drop()); + } } } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java?rev=1057086&r1=1057085&r2=1057086&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java Mon Jan 10 05:24:23 2011 @@ -87,6 +87,32 @@ public class SamplingThrottlerTest exten mock.assertIsSatisfied(); } + public void testSamplingUsingmessageFrequency() throws Exception { + long totalMessages = 100; + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(10); + mock.setResultWaitTime(100); + + for (int i = 0; i < totalMessages; i++) { + template.sendBody("direct:sample-messageFrequency", "<message>" + i + "</message>"); + } + + mock.assertIsSatisfied(); + } + + public void testSamplingUsingmessageFrequencyViaDSL() throws Exception { + long totalMessages = 50; + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(10); + mock.setResultWaitTime(100); + + for (int i = 0; i < totalMessages; i++) { + template.sendBody("direct:sample-messageFrequency-via-dsl", "<message>" + i + "</message>"); + } + + mock.assertIsSatisfied(); + } + private void sendExchangesThroughDroppingThrottler(List<Exchange> sentExchanges, int messages) throws Exception { ProducerTemplate myTemplate = context.createProducerTemplate(); @@ -130,6 +156,15 @@ public class SamplingThrottlerTest exten from("direct:sample-configured-via-dsl") .sample().samplePeriod(1).timeUnits(TimeUnit.SECONDS) .to("mock:result"); + + from("direct:sample-messageFrequency") + .sample(10) + .to("mock:result"); + + from("direct:sample-messageFrequency-via-dsl") + .sample().sampleMessageFrequency(5) + .to("mock:result"); + // END SNIPPET: e1 } }; Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml?rev=1057086&r1=1057085&r2=1057086&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml Mon Jan 10 05:24:23 2011 @@ -21,7 +21,6 @@ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - <camelContext xmlns="http://camel.apache.org/schema/spring"> <!-- START SNIPPET: e1 --> <route> @@ -29,6 +28,18 @@ <sample samplePeriod="1" units="seconds"> <to uri="mock:result"/> </sample> + </route> + <route> + <from uri="direct:sample-messageFrequency"/> + <sample messageFrequency="10"> + <to uri="mock:result"/> + </sample> + </route> + <route> + <from uri="direct:sample-messageFrequency-via-dsl"/> + <sample messageFrequency="10"> + <to uri="mock:result"/> + </sample> </route> <!-- END SNIPPET: e1 -->