Repository: camel Updated Branches: refs/heads/master 38ba3c0bc -> 92da4bf49
CAMEL-10658: Camel-InfluxDB: Support BatchPoints Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92da4bf4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92da4bf4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92da4bf4 Branch: refs/heads/master Commit: 92da4bf499956fc5b278cea5750db5ade9a80600 Parents: 38ba3c0 Author: Andrea Cosentino <anco...@gmail.com> Authored: Wed Dec 28 11:53:43 2016 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Wed Dec 28 11:53:58 2016 +0100 ---------------------------------------------------------------------- .../src/main/docs/influxdb-component.adoc | 3 +- .../component/influxdb/InfluxDbEndpoint.java | 13 +++ .../component/influxdb/InfluxDbProducer.java | 29 +++++-- .../influxdb/InfluxDbProducerBatchTest.java | 90 ++++++++++++++++++++ 4 files changed, 126 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/92da4bf4/components/camel-influxdb/src/main/docs/influxdb-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/main/docs/influxdb-component.adoc b/components/camel-influxdb/src/main/docs/influxdb-component.adoc index d82c81d..3235d8e 100644 --- a/components/camel-influxdb/src/main/docs/influxdb-component.adoc +++ b/components/camel-influxdb/src/main/docs/influxdb-component.adoc @@ -54,13 +54,14 @@ The InfluxDB component has no options. // endpoint options: START -The InfluxDB component supports 4 endpoint options which are listed below: +The InfluxDB component supports 5 endpoint options which are listed below: {% raw %} [width="100%",cols="2,1,1m,1m,5",options="header"] |======================================================================= | Name | Group | Default | Java Type | Description | connectionBean | producer | | String | *Required* Connection to the influx database of class InfluxDB.class +| batch | producer | false | boolean | Define if this operation is a batch operation or not | databaseName | producer | | String | The name of the database where the time series will be stored | retentionPolicy | producer | default | String | The string that defines the retention policy to the data created by the endpoint | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). http://git-wip-us.apache.org/repos/asf/camel/blob/92da4bf4/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java index 1300469..c449ef3 100644 --- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java +++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java @@ -45,6 +45,8 @@ public class InfluxDbEndpoint extends DefaultEndpoint { private String databaseName; @UriParam(defaultValue = "default") private String retentionPolicy = "default"; + @UriParam(defaultValue = "false") + private boolean batch; public InfluxDbEndpoint(String uri, InfluxDbComponent influxDbComponent, InfluxDB dbConn) { super(uri, influxDbComponent); @@ -117,4 +119,15 @@ public class InfluxDbEndpoint extends DefaultEndpoint { public void setConnectionBean(String connectionBean) { this.connectionBean = connectionBean; } + + public boolean isBatch() { + return batch; + } + + /** + * Define if this operation is a batch operation or not + */ + public void setBatch(boolean batch) { + this.batch = batch; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/92da4bf4/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java index 92b4fc3..4074931 100644 --- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java +++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.influxdb; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,18 +60,30 @@ public class InfluxDbProducer extends DefaultProducer { String dataBaseName = calculateDatabaseName(exchange); String retentionPolicy = calculateRetentionPolicy(exchange); - Point p = exchange.getIn().getMandatoryBody(Point.class); + if (!endpoint.isBatch()) { + Point p = exchange.getIn().getMandatoryBody(Point.class); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Writing point {}", p.lineProtocol()); + } - - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Writing point {}", p.lineProtocol()); + connection.write(dataBaseName, retentionPolicy, p); + } catch (Exception ex) { + exchange.setException(new CamelInfluxDbException(ex)); } + } else { + BatchPoints batchPoints = exchange.getIn().getMandatoryBody(BatchPoints.class); + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Writing BatchPoints {}", batchPoints.lineProtocol()); + } - connection.write(dataBaseName, retentionPolicy, p); - } catch (Exception ex) { - exchange.setException(new CamelInfluxDbException(ex)); + connection.write(batchPoints); + } catch (Exception ex) { + exchange.setException(new CamelInfluxDbException(ex)); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/92da4bf4/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerBatchTest.java ---------------------------------------------------------------------- diff --git a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerBatchTest.java b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerBatchTest.java new file mode 100644 index 0000000..ea791c1 --- /dev/null +++ b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerBatchTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.influxdb; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.junit.Before; +import org.junit.Test; + +public class InfluxDbProducerBatchTest extends AbstractInfluxDbTest { + + @EndpointInject(uri = "mock:test") + MockEndpoint successEndpoint; + + @EndpointInject(uri = "mock:error") + MockEndpoint errorEndpoint; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + + errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(0)); + + //test route + from("direct:test") + .to("influxdb:influxDbBean?batch=true") + .to("mock:test"); + } + }; + } + + @Before + public void resetEndpoints() { + errorEndpoint.reset(); + successEndpoint.reset(); + } + + @Test + public void writeBatchPoints() throws InterruptedException { + + errorEndpoint.expectedMessageCount(0); + successEndpoint.expectedMessageCount(1); + + BatchPoints batchPoints = createBatchPoints(); + sendBody("direct:test", batchPoints); + + errorEndpoint.assertIsSatisfied(); + successEndpoint.assertIsSatisfied(); + + } + + private BatchPoints createBatchPoints() { + BatchPoints batchPoints = BatchPoints.database("myTestTimeSeries").build(); + Point point1 = Point.measurement("cpu") + .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) + .addField("idle", 90L) + .addField("user", 9L) + .addField("system", 1L) + .build(); + Point point2 = Point.measurement("disk") + .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) + .addField("used", 8L) + .addField("free", 1L) + .build(); + batchPoints.point(point1); + batchPoints.point(point2); + return batchPoints; + } + +}