Implemented metrics:counter and metrics:meter endpoints.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/98560c8e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/98560c8e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/98560c8e Branch: refs/heads/master Commit: 98560c8e44fc15aee75a2ccc1c35aa555d74677c Parents: 375f13e Author: Lauri Kimmel <lauri.kim...@gmx.com> Authored: Sun May 11 04:10:54 2014 +1000 Committer: Lauri Kimmel <lauri.kim...@gmx.com> Committed: Sun May 11 04:10:54 2014 +1000 ---------------------------------------------------------------------- pom.xml | 7 +- .../java/org/apache/camel/MetricsComponent.java | 18 ---- .../java/org/apache/camel/MetricsConsumer.java | 39 ------- .../java/org/apache/camel/MetricsEndpoint.java | 35 ------ .../java/org/apache/camel/MetricsProducer.java | 24 ----- .../camel/metrics/AbstractMetricsEndpoint.java | 37 +++++++ .../apache/camel/metrics/MetricsComponent.java | 107 +++++++++++++++++++ .../org/apache/camel/metrics/MetricsType.java | 37 +++++++ .../camel/metrics/counter/CounterEndpoint.java | 50 +++++++++ .../camel/metrics/counter/CounterProducer.java | 32 ++++++ .../camel/metrics/meter/MeterEndpoint.java | 39 +++++++ .../camel/metrics/meter/MeterProducer.java | 30 ++++++ .../services/org/apache/camel/component/metrics | 2 +- .../org/apache/camel/MetricsComponentTest.java | 28 ----- .../camel/metrics/MetricsComponentTest.java | 42 ++++++++ 15 files changed, 381 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ae5ac4a..2407292 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ <version>2.13.0</version> <name>Camel Metrics Component</name> - <url>http://www.myorganization.org</url> + <url>http://camel.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> @@ -22,6 +22,11 @@ <artifactId>camel-core</artifactId> <version>2.13.0</version> </dependency> + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>3.0.1</version> + </dependency> <!-- logging --> <dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/MetricsComponent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/MetricsComponent.java b/src/main/java/org/apache/camel/MetricsComponent.java deleted file mode 100644 index 2690fdb..0000000 --- a/src/main/java/org/apache/camel/MetricsComponent.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.apache.camel; - -import java.util.Map; - -import org.apache.camel.Endpoint; -import org.apache.camel.impl.DefaultComponent; - -/** - * Represents the component that manages {@link MetricsEndpoint}. - */ -public class MetricsComponent extends DefaultComponent { - - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - Endpoint endpoint = new MetricsEndpoint(uri, this); - setProperties(endpoint, parameters); - return endpoint; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/MetricsConsumer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/MetricsConsumer.java b/src/main/java/org/apache/camel/MetricsConsumer.java deleted file mode 100644 index 26199ea..0000000 --- a/src/main/java/org/apache/camel/MetricsConsumer.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.apache.camel; - -import java.util.Date; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.impl.ScheduledPollConsumer; - -/** - * The Metrics consumer. - */ -public class MetricsConsumer extends ScheduledPollConsumer { - private final MetricsEndpoint endpoint; - - public MetricsConsumer(MetricsEndpoint endpoint, Processor processor) { - super(endpoint, processor); - this.endpoint = endpoint; - } - - @Override - protected int poll() throws Exception { - Exchange exchange = endpoint.createExchange(); - - // create a message body - Date now = new Date(); - exchange.getIn().setBody("Hello World! The time is " + now); - - try { - // send message to next processor in the route - getProcessor().process(exchange); - return 1; // number of messages polled - } finally { - // log exception if an exception occurred and was not handled - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/MetricsEndpoint.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/MetricsEndpoint.java b/src/main/java/org/apache/camel/MetricsEndpoint.java deleted file mode 100644 index 4327a03..0000000 --- a/src/main/java/org/apache/camel/MetricsEndpoint.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.camel; - -import org.apache.camel.Consumer; -import org.apache.camel.Processor; -import org.apache.camel.Producer; -import org.apache.camel.impl.DefaultEndpoint; - -/** - * Represents a Metrics endpoint. - */ -public class MetricsEndpoint extends DefaultEndpoint { - - public MetricsEndpoint() { - } - - public MetricsEndpoint(String uri, MetricsComponent component) { - super(uri, component); - } - - public MetricsEndpoint(String endpointUri) { - super(endpointUri); - } - - public Producer createProducer() throws Exception { - return new MetricsProducer(this); - } - - public Consumer createConsumer(Processor processor) throws Exception { - return new MetricsConsumer(this, processor); - } - - public boolean isSingleton() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/MetricsProducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/MetricsProducer.java b/src/main/java/org/apache/camel/MetricsProducer.java deleted file mode 100644 index 4d6a1cd..0000000 --- a/src/main/java/org/apache/camel/MetricsProducer.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.apache.camel; - -import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultProducer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The Metrics producer. - */ -public class MetricsProducer extends DefaultProducer { - private static final Logger LOG = LoggerFactory.getLogger(MetricsProducer.class); - private MetricsEndpoint endpoint; - - public MetricsProducer(MetricsEndpoint endpoint) { - super(endpoint); - this.endpoint = endpoint; - } - - public void process(Exchange exchange) throws Exception { - System.out.println(exchange.getIn().getBody()); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/metrics/AbstractMetricsEndpoint.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/metrics/AbstractMetricsEndpoint.java b/src/main/java/org/apache/camel/metrics/AbstractMetricsEndpoint.java new file mode 100644 index 0000000..d8c8af7 --- /dev/null +++ b/src/main/java/org/apache/camel/metrics/AbstractMetricsEndpoint.java @@ -0,0 +1,37 @@ +package org.apache.camel.metrics; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.impl.DefaultEndpoint; + +import com.codahale.metrics.MetricRegistry; + +public abstract class AbstractMetricsEndpoint extends DefaultEndpoint { + + protected final MetricRegistry registry; + protected final String metricsName; + + public AbstractMetricsEndpoint(MetricRegistry registry, String metricsName) { + this.registry = registry; + this.metricsName = metricsName; + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new RuntimeCamelException("Cannot consume from " + getClass().getSimpleName() + ": " + getEndpointUri()); + } + + @Override + public boolean isSingleton() { + return false; + } + + public MetricRegistry getRegistry() { + return registry; + } + + public String getMetricsName() { + return metricsName; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/metrics/MetricsComponent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/metrics/MetricsComponent.java b/src/main/java/org/apache/camel/metrics/MetricsComponent.java new file mode 100644 index 0000000..14e93cb --- /dev/null +++ b/src/main/java/org/apache/camel/metrics/MetricsComponent.java @@ -0,0 +1,107 @@ +package org.apache.camel.metrics; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.metrics.counter.CounterEndpoint; +import org.apache.camel.metrics.meter.MeterEndpoint; +import org.apache.camel.spi.Registry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Slf4jReporter; + +/** + * Represents the component that manages {@link MetricsEndpoint}. + */ +public class MetricsComponent extends DefaultComponent { + + public static final String NAME_METRIC_REGISTRY = "metricRegistry"; + public static final MetricsType DEFAULT_METRICS_TYPE = MetricsType.METER; + public static final long DEFAULT_REPORTING_INTERVAL_SECONDS = 60L; + + private static final Logger LOG = LoggerFactory.getLogger(MetricsComponent.class); + + private MetricRegistry metricRegistry; + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + Registry camelRegistry = getCamelContext().getRegistry(); + getOrCreateMetricRegistry(camelRegistry, NAME_METRIC_REGISTRY); + String metricsName = getMetricsName(remaining); + MetricsType metricsType = getMetricsType(remaining); + LOG.info("Metrics type: {}; name: {}", metricsType, metricsName); + Endpoint endpoint = createNewEndpoint(metricRegistry, metricsType, metricsName); + setProperties(endpoint, parameters); + return endpoint; + } + + String getMetricsName(String remaining) { + int index = remaining.indexOf(":"); + return index < 0 ? remaining : remaining.substring(index + 1); + } + + Endpoint createNewEndpoint(MetricRegistry registry, MetricsType type, String metricsName) { + Endpoint endpoint; + switch (type) { + case COUNTER: + endpoint = new CounterEndpoint(registry, metricsName); + break; + case METER: + endpoint = new MeterEndpoint(registry, metricsName); + break; + default: + throw new RuntimeCamelException("Metrics type \"" + type.toString() + "\" not supported"); + } + return endpoint; + } + + MetricsType getMetricsType(String remaining) { + int index = remaining.indexOf(":"); + String name = null; + MetricsType type; + if (index < 0) { + type = DEFAULT_METRICS_TYPE; + } + else { + name = remaining.substring(0, index); + type = MetricsType.getByName(name); + } + if (type == null) { + throw new RuntimeCamelException("Unknow metrics type \"" + name + "\""); + } + return type; + } + + MetricRegistry getOrCreateMetricRegistry(Registry camelRegistry, String registryName) { + if (metricRegistry == null) { + LOG.debug("Looking up MetricRegistry from Camel Registry for name \"{}\"", registryName); + metricRegistry = getMetricRegistryFromCamelRegistry(camelRegistry, registryName); + } + if (metricRegistry == null) { + LOG.debug("MetricRegistry not found from Camel Registry for name \"{}\"", registryName); + LOG.info("Creating new default MetricRegistry"); + metricRegistry = createMetricRegistry(); + } + return metricRegistry; + } + + MetricRegistry getMetricRegistryFromCamelRegistry(Registry camelRegistry, String registryName) { + return camelRegistry.lookupByNameAndType(registryName, MetricRegistry.class); + } + + MetricRegistry createMetricRegistry() { + MetricRegistry registry = new MetricRegistry(); + final Slf4jReporter reporter = Slf4jReporter.forRegistry(registry) + .outputTo(LOG) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + reporter.start(DEFAULT_REPORTING_INTERVAL_SECONDS, TimeUnit.SECONDS); + return registry; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/metrics/MetricsType.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/metrics/MetricsType.java b/src/main/java/org/apache/camel/metrics/MetricsType.java new file mode 100644 index 0000000..f602647 --- /dev/null +++ b/src/main/java/org/apache/camel/metrics/MetricsType.java @@ -0,0 +1,37 @@ +package org.apache.camel.metrics; + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +public enum MetricsType { + + GAUGE("gauge"), + COUNTER("counter"), + HISTOGRAM("histogram"), + METER("meter"), + TIMER("timer"), ; + + private static final Map<String, MetricsType> map = new HashMap<String, MetricsType>(); + + private final String name; + + static { + for (MetricsType type : EnumSet.allOf(MetricsType.class)) { + map.put(type.name, type); + } + } + + private MetricsType(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + + public static MetricsType getByName(String name) { + return map.get(name); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/metrics/counter/CounterEndpoint.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/metrics/counter/CounterEndpoint.java b/src/main/java/org/apache/camel/metrics/counter/CounterEndpoint.java new file mode 100644 index 0000000..8d0b34e --- /dev/null +++ b/src/main/java/org/apache/camel/metrics/counter/CounterEndpoint.java @@ -0,0 +1,50 @@ +package org.apache.camel.metrics.counter; + +import org.apache.camel.Producer; +import org.apache.camel.metrics.AbstractMetricsEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; + +import com.codahale.metrics.MetricRegistry; + +@UriEndpoint(scheme = "metrics:counter") +public class CounterEndpoint extends AbstractMetricsEndpoint { + + public static final String ENDPOINT_URI = "metrics:counter"; + + @UriParam + private Long increment; + + @UriParam + private Long decrement; + + public CounterEndpoint(MetricRegistry registry, String metricsName) { + super(registry, metricsName); + } + + @Override + public Producer createProducer() throws Exception { + return new CounterProducer(this); + } + + public Long getIncrement() { + return increment; + } + + public void setIncrement(Long increment) { + this.increment = increment; + } + + public Long getDecrement() { + return decrement; + } + + public void setDecrement(Long decrement) { + this.decrement = decrement; + } + + @Override + protected String createEndpointUri() { + return ENDPOINT_URI; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/metrics/counter/CounterProducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/metrics/counter/CounterProducer.java b/src/main/java/org/apache/camel/metrics/counter/CounterProducer.java new file mode 100644 index 0000000..19c470f --- /dev/null +++ b/src/main/java/org/apache/camel/metrics/counter/CounterProducer.java @@ -0,0 +1,32 @@ +package org.apache.camel.metrics.counter; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; + +import com.codahale.metrics.Counter; + +public class CounterProducer extends DefaultProducer { + + public CounterProducer(Endpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + CounterEndpoint endpoint = (CounterEndpoint) getEndpoint(); + String metricName = endpoint.getMetricsName(); + Counter counter = endpoint.getRegistry().counter(metricName); + Long increment = endpoint.getIncrement(); + Long decrement = endpoint.getDecrement(); + if (increment != null) { + counter.inc(increment); + } + else if (decrement != null) { + counter.dec(decrement); + } + else { + counter.inc(); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/metrics/meter/MeterEndpoint.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/metrics/meter/MeterEndpoint.java b/src/main/java/org/apache/camel/metrics/meter/MeterEndpoint.java new file mode 100644 index 0000000..d0b609e --- /dev/null +++ b/src/main/java/org/apache/camel/metrics/meter/MeterEndpoint.java @@ -0,0 +1,39 @@ +package org.apache.camel.metrics.meter; + +import org.apache.camel.Producer; +import org.apache.camel.metrics.AbstractMetricsEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; + +import com.codahale.metrics.MetricRegistry; + +@UriEndpoint(scheme = "metrics:meter") +public class MeterEndpoint extends AbstractMetricsEndpoint { + + public static final String ENDPOINT_URI = "metrics:meter"; + + @UriParam + private Long mark; + + public MeterEndpoint(MetricRegistry registry, String metricsName) { + super(registry, metricsName); + } + + @Override + public Producer createProducer() throws Exception { + return new MeterProducer(this); + } + + public Long getMark() { + return mark; + } + + public void setMark(Long mark) { + this.mark = mark; + } + + @Override + protected String createEndpointUri() { + return ENDPOINT_URI; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/java/org/apache/camel/metrics/meter/MeterProducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/metrics/meter/MeterProducer.java b/src/main/java/org/apache/camel/metrics/meter/MeterProducer.java new file mode 100644 index 0000000..6cb7896 --- /dev/null +++ b/src/main/java/org/apache/camel/metrics/meter/MeterProducer.java @@ -0,0 +1,30 @@ +package org.apache.camel.metrics.meter; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; + +public class MeterProducer extends DefaultProducer { + + public MeterProducer(Endpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + MeterEndpoint endpoint = (MeterEndpoint) getEndpoint(); + String metricsName = endpoint.getMetricsName(); + MetricRegistry registry = endpoint.getRegistry(); + Meter meter = registry.meter(metricsName); + Long mark = endpoint.getMark(); + if (mark == null) { + meter.mark(); + } + else { + meter.mark(mark); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/main/resources/META-INF/services/org/apache/camel/component/metrics ---------------------------------------------------------------------- diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/metrics b/src/main/resources/META-INF/services/org/apache/camel/component/metrics index e7239ec..debbe31 100644 --- a/src/main/resources/META-INF/services/org/apache/camel/component/metrics +++ b/src/main/resources/META-INF/services/org/apache/camel/component/metrics @@ -1 +1 @@ -class=org.apache.camel.MetricsComponent +class=org.apache.camel.metrics.MetricsComponent http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/test/java/org/apache/camel/MetricsComponentTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/camel/MetricsComponentTest.java b/src/test/java/org/apache/camel/MetricsComponentTest.java deleted file mode 100644 index 2fb5926..0000000 --- a/src/test/java/org/apache/camel/MetricsComponentTest.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.apache.camel; - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Test; - -public class MetricsComponentTest extends CamelTestSupport { - - @Test - public void testMetrics() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMinimumMessageCount(1); - - assertMockEndpointsSatisfied(); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - public void configure() { - from("metrics://foo") - .to("metrics://bar") - .to("mock:result"); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/98560c8e/src/test/java/org/apache/camel/metrics/MetricsComponentTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/camel/metrics/MetricsComponentTest.java b/src/test/java/org/apache/camel/metrics/MetricsComponentTest.java new file mode 100644 index 0000000..8eb6b42 --- /dev/null +++ b/src/test/java/org/apache/camel/metrics/MetricsComponentTest.java @@ -0,0 +1,42 @@ +package org.apache.camel.metrics; + +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class MetricsComponentTest extends CamelTestSupport { + + @Produce(uri = "direct:start") + protected ProducerTemplate template; + + @Test + public void testMetrics() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(1); + template.sendBody(new Object()); + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + // .to("metrics") + // .to("metrics:") + .to("metrics:A") + .to("metrics:counter://B") + .to("metrics:counter:C?increment=19291") + .to("metrics:counter:C?decrement=19292") + .to("metrics:counter:C") + .to("metrics:meter:D") + .to("metrics:meter:D?mark=90001") + .to("mock:result"); + } + }; + } +}