Repository: camel
Updated Branches:
  refs/heads/master 38ba3c0bc -> 92da4bf49


CAMEL-10658: Camel-InfluxDB: Support BatchPoints


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92da4bf4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92da4bf4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92da4bf4

Branch: refs/heads/master
Commit: 92da4bf499956fc5b278cea5750db5ade9a80600
Parents: 38ba3c0
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Wed Dec 28 11:53:43 2016 +0100
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Wed Dec 28 11:53:58 2016 +0100

----------------------------------------------------------------------
 .../src/main/docs/influxdb-component.adoc       |  3 +-
 .../component/influxdb/InfluxDbEndpoint.java    | 13 +++
 .../component/influxdb/InfluxDbProducer.java    | 29 +++++--
 .../influxdb/InfluxDbProducerBatchTest.java     | 90 ++++++++++++++++++++
 4 files changed, 126 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/92da4bf4/components/camel-influxdb/src/main/docs/influxdb-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/docs/influxdb-component.adoc 
b/components/camel-influxdb/src/main/docs/influxdb-component.adoc
index d82c81d..3235d8e 100644
--- a/components/camel-influxdb/src/main/docs/influxdb-component.adoc
+++ b/components/camel-influxdb/src/main/docs/influxdb-component.adoc
@@ -54,13 +54,14 @@ The InfluxDB component has no options.
 
 
 // endpoint options: START
-The InfluxDB component supports 4 endpoint options which are listed below:
+The InfluxDB component supports 5 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
 |=======================================================================
 | Name | Group | Default | Java Type | Description
 | connectionBean | producer |  | String | *Required* Connection to the influx 
database of class InfluxDB.class
+| batch | producer | false | boolean | Define if this operation is a batch 
operation or not
 | databaseName | producer |  | String | The name of the database where the 
time series will be stored
 | retentionPolicy | producer | default | String | The string that defines the 
retention policy to the data created by the endpoint
 | synchronous | advanced | false | boolean | Sets whether synchronous 
processing should be strictly used or Camel is allowed to use asynchronous 
processing (if supported).

http://git-wip-us.apache.org/repos/asf/camel/blob/92da4bf4/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
 
b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
index 1300469..c449ef3 100644
--- 
a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
+++ 
b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
@@ -45,6 +45,8 @@ public class InfluxDbEndpoint extends DefaultEndpoint {
     private String databaseName;
     @UriParam(defaultValue = "default")
     private String retentionPolicy = "default";
+    @UriParam(defaultValue = "false")
+    private boolean batch;
 
     public InfluxDbEndpoint(String uri, InfluxDbComponent influxDbComponent, 
InfluxDB dbConn) {
         super(uri, influxDbComponent);
@@ -117,4 +119,15 @@ public class InfluxDbEndpoint extends DefaultEndpoint {
     public void setConnectionBean(String connectionBean) {
         this.connectionBean = connectionBean;
     }
+
+    public boolean isBatch() {
+        return batch;
+    }
+
+    /**
+     * Define if this operation is a batch operation or not
+     */
+    public void setBatch(boolean batch) {
+        this.batch = batch;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/92da4bf4/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
 
b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
index 92b4fc3..4074931 100644
--- 
a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
+++ 
b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.influxdb;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
 import org.influxdb.InfluxDB;
+import org.influxdb.dto.BatchPoints;
 import org.influxdb.dto.Point;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,18 +60,30 @@ public class InfluxDbProducer extends DefaultProducer {
 
         String dataBaseName = calculateDatabaseName(exchange);
         String retentionPolicy = calculateRetentionPolicy(exchange);
-        Point p = exchange.getIn().getMandatoryBody(Point.class);
+        if (!endpoint.isBatch()) {
+            Point p = exchange.getIn().getMandatoryBody(Point.class);
 
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Writing point {}", p.lineProtocol());
+                }
 
-
-        try {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Writing point {}", p.lineProtocol());
+                connection.write(dataBaseName, retentionPolicy, p);
+            } catch (Exception ex) {
+                exchange.setException(new CamelInfluxDbException(ex));
             }
+        } else {
+            BatchPoints batchPoints = 
exchange.getIn().getMandatoryBody(BatchPoints.class);
+
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Writing BatchPoints {}", 
batchPoints.lineProtocol());
+                }
 
-            connection.write(dataBaseName, retentionPolicy, p);
-        } catch (Exception ex) {
-            exchange.setException(new CamelInfluxDbException(ex));
+                connection.write(batchPoints);
+            } catch (Exception ex) {
+                exchange.setException(new CamelInfluxDbException(ex));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/92da4bf4/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerBatchTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerBatchTest.java
 
b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerBatchTest.java
new file mode 100644
index 0000000..ea791c1
--- /dev/null
+++ 
b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerBatchTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.influxdb;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InfluxDbProducerBatchTest extends AbstractInfluxDbTest {
+
+    @EndpointInject(uri = "mock:test")
+    MockEndpoint successEndpoint;
+
+    @EndpointInject(uri = "mock:error")
+    MockEndpoint errorEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+
+                
errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(0));
+
+                //test route
+                from("direct:test")
+                        .to("influxdb:influxDbBean?batch=true")
+                        .to("mock:test");
+            }
+        };
+    }
+
+    @Before
+    public void resetEndpoints() {
+        errorEndpoint.reset();
+        successEndpoint.reset();
+    }
+
+    @Test
+    public void writeBatchPoints() throws InterruptedException {
+
+        errorEndpoint.expectedMessageCount(0);
+        successEndpoint.expectedMessageCount(1);
+
+        BatchPoints batchPoints = createBatchPoints();
+        sendBody("direct:test", batchPoints);
+
+        errorEndpoint.assertIsSatisfied();
+        successEndpoint.assertIsSatisfied();
+
+    }
+
+    private BatchPoints createBatchPoints() {
+        BatchPoints batchPoints = 
BatchPoints.database("myTestTimeSeries").build();
+        Point point1 = Point.measurement("cpu")
+                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
+                .addField("idle", 90L)
+                .addField("user", 9L)
+                .addField("system", 1L)
+                .build();
+        Point point2 = Point.measurement("disk")
+                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
+                .addField("used", 8L)
+                .addField("free", 1L)
+                .build();
+        batchPoints.point(point1);
+        batchPoints.point(point2);
+        return batchPoints;
+    }
+
+}

Reply via email to