Add producer support for raw input data
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b170152d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b170152d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b170152d Branch: refs/heads/master Commit: b170152df80611118a7aad628747be15ea7fe07c Parents: 18e89e7 Author: Preben Asmussen <preben.asmus...@gmail.com> Authored: Sat Jul 25 14:26:30 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Sat Jul 25 15:34:27 2015 +0200 ---------------------------------------------------------------------- .../component/splunk/SplunkConfiguration.java | 18 ++++- .../camel/component/splunk/SplunkProducer.java | 7 +- .../component/splunk/event/SplunkEvent.java | 6 +- .../component/splunk/support/DataWriter.java | 4 +- .../splunk/support/SplunkDataWriter.java | 14 ++-- .../splunk/support/SubmitDataWriter.java | 7 +- .../camel/component/splunk/ProducerTest.java | 12 +++- .../camel/component/splunk/RawProducerTest.java | 76 ++++++++++++++++++++ .../SplunkComponentConfigurationTest.java | 14 ++-- 9 files changed, 134 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java index 94132e8..087bf33 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java @@ -17,6 +17,7 @@ package org.apache.camel.component.splunk; import com.splunk.Service; + import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; @@ -57,6 +58,8 @@ public class SplunkConfiguration { private String source; @UriParam(label = "producer") private int tcpReceiverPort; + @UriParam(label = "producer", defaultValue = "false") + private boolean raw; @UriParam(label = "consumer") private int count; @@ -147,6 +150,17 @@ public class SplunkConfiguration { this.tcpReceiverPort = tcpReceiverPort; } + public boolean isRaw() { + return raw; + } + + /** + * Should the payload be inserted raw + */ + public void setRaw(boolean raw) { + this.raw = raw; + } + public String getSourceType() { return sourceType; } @@ -260,7 +274,7 @@ public class SplunkConfiguration { public boolean isStreaming() { return streaming != null ? streaming : false; } - + /** * Sets streaming mode. * <p> @@ -269,7 +283,7 @@ public class SplunkConfiguration { public void setStreaming(boolean streaming) { this.streaming = streaming; } - + public int getConnectionTimeout() { return connectionTimeout; } http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java index a98d036..139f68c 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkProducer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.splunk; import com.splunk.Args; + import org.apache.camel.Exchange; import org.apache.camel.component.splunk.event.SplunkEvent; import org.apache.camel.component.splunk.support.DataWriter; @@ -46,7 +47,11 @@ public class SplunkProducer extends DefaultProducer { if (!dataWriter.isConnected()) { dataWriter.start(); } - dataWriter.write(exchange.getIn().getMandatoryBody(SplunkEvent.class)); + if (endpoint.getConfiguration().isRaw()) { + dataWriter.write(exchange.getIn().getMandatoryBody(String.class)); + } else { + dataWriter.write(exchange.getIn().getMandatoryBody(SplunkEvent.class)); + } } catch (Exception e) { if (endpoint.reset(e)) { dataWriter.stop(); http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java index 1c19c66..10daad3 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/event/SplunkEvent.java @@ -159,6 +159,10 @@ public class SplunkEvent implements Serializable { */ public static final String COMMON_VENDOR = "vendor"; + /** + * Event break delimiter + */ + public static final String LINEBREAK = "\n"; // ---------------------------------- // Update // ---------------------------------- @@ -202,8 +206,6 @@ public class SplunkEvent implements Serializable { private static final String THROWABLE_MESSAGE = "throwable_message"; private static final String THROWABLE_STACKTRACE_ELEMENTS = "stacktrace_elements"; - private static final String LINEBREAK = "\n"; - private static final long serialVersionUID = 1L; /** http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java ---------------------------------------------------------------------- diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java index 44fd851..fb83b59 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/DataWriter.java @@ -20,7 +20,9 @@ import org.apache.camel.component.splunk.event.SplunkEvent; public interface DataWriter { void write(SplunkEvent data) throws Exception; - + + void write(String data) throws Exception; + void stop(); void start(); http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java ---------------------------------------------------------------------- diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java index d0ca8b0..ee1a349 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataWriter.java @@ -46,14 +46,18 @@ public abstract class SplunkDataWriter implements DataWriter { protected abstract Socket createSocket(Service service) throws IOException; public void write(SplunkEvent event) throws Exception { - LOG.debug("writing event to splunk:" + event); - doWrite(event); + doWrite(event.toString()); + } + + public void write(String event) throws Exception { + doWrite(event + SplunkEvent.LINEBREAK); } - protected void doWrite(SplunkEvent event) throws IOException { + protected void doWrite(String event) throws IOException { + LOG.debug("writing event to splunk:" + event); OutputStream ostream = socket.getOutputStream(); - Writer writer = new OutputStreamWriter(ostream, "UTF8"); - writer.write(event.toString()); + Writer writer = new OutputStreamWriter(ostream, "UTF-8"); + writer.write(event); writer.flush(); } http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java ---------------------------------------------------------------------- diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java index c39336b..43c62b0 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SubmitDataWriter.java @@ -24,7 +24,6 @@ import com.splunk.Index; import com.splunk.Receiver; import com.splunk.Service; import org.apache.camel.component.splunk.SplunkEndpoint; -import org.apache.camel.component.splunk.event.SplunkEvent; public class SubmitDataWriter extends SplunkDataWriter { private String index; @@ -34,13 +33,13 @@ public class SubmitDataWriter extends SplunkDataWriter { } @Override - protected void doWrite(SplunkEvent event) throws IOException { + protected void doWrite(String event) throws IOException { Index index = getIndex(); if (index != null) { - index.submit(args, event.toString()); + index.submit(args, event); } else { Receiver receiver = endpoint.getService().getReceiver(); - receiver.submit(args, event.toString()); + receiver.submit(args, event); } } http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java index dbbb02c..e372598 100644 --- a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java +++ b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/ProducerTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.splunk; +import org.apache.camel.CamelExecutionException; import org.apache.camel.EndpointInject; import org.apache.camel.Producer; import org.apache.camel.builder.RouteBuilder; @@ -48,7 +49,7 @@ public class ProducerTest extends SplunkMockTestSupport { template.sendBody("direct:stream", splunkEvent); assertMockEndpointsSatisfied(); Producer streamProducer = streamEndpoint.createProducer(); - assertIsInstanceOf(StreamDataWriter.class, ((SplunkProducer) streamProducer).getDataWriter()); + assertIsInstanceOf(StreamDataWriter.class, ((SplunkProducer)streamProducer).getDataWriter()); } @Test @@ -62,7 +63,7 @@ public class ProducerTest extends SplunkMockTestSupport { template.sendBody("direct:submit", splunkEvent); assertMockEndpointsSatisfied(); Producer submitProducer = submitEndpoint.createProducer(); - assertIsInstanceOf(SubmitDataWriter.class, ((SplunkProducer) submitProducer).getDataWriter()); + assertIsInstanceOf(SubmitDataWriter.class, ((SplunkProducer)submitProducer).getDataWriter()); } @Test @@ -76,7 +77,12 @@ public class ProducerTest extends SplunkMockTestSupport { template.sendBody("direct:tcp", splunkEvent); assertMockEndpointsSatisfied(); Producer tcpProducer = tcpEndpoint.createProducer(); - assertIsInstanceOf(TcpDataWriter.class, ((SplunkProducer) tcpProducer).getDataWriter()); + assertIsInstanceOf(TcpDataWriter.class, ((SplunkProducer)tcpProducer).getDataWriter()); + } + + @Test(expected = CamelExecutionException.class) + public void testBodyWithoutRawOption() throws Exception { + template.sendBody("direct:tcp", "foobar"); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java new file mode 100644 index 0000000..b90370d --- /dev/null +++ b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/RawProducerTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.splunk; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class RawProducerTest extends SplunkMockTestSupport { + private static final String PAYLOAD = "{foo:1, bar:2}"; + + @EndpointInject(uri = "splunk://stream") + protected SplunkEndpoint streamEndpoint; + + @EndpointInject(uri = "splunk://submit") + protected SplunkEndpoint submitEndpoint; + + @EndpointInject(uri = "splunk://tcp") + protected SplunkEndpoint tcpEndpoint; + + @Test + public void testStreamWriter() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:stream-result"); + mock.setExpectedMessageCount(1); + mock.expectedBodiesReceived(PAYLOAD); + template.sendBody("direct:stream", PAYLOAD); + assertMockEndpointsSatisfied(); + } + + @Test + public void testSubmitWriter() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:submitresult"); + mock.setExpectedMessageCount(1); + mock.expectedBodiesReceived(PAYLOAD); + template.sendBody("direct:submit", PAYLOAD); + assertMockEndpointsSatisfied(); + } + + @Test + public void testTcpWriter() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:tcpresult"); + mock.setExpectedMessageCount(1); + mock.expectedBodiesReceived(PAYLOAD); + template.sendBody("direct:tcp", PAYLOAD); + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:stream").to("splunk://stream?username=foo&password=bar&index=myindex&sourceType=SourceType&source=Source&raw=true").to("mock:stream-result"); + + from("direct:submit").to("splunk://submit?username=foo&password=bar&index=myindex&sourceType=testSource&source=test&raw=true").to("mock:submitresult"); + + from("direct:tcp").to("splunk://tcp?username=foo&password=bar&tcpReceiverPort=2222&index=myindex&sourceType=testSource&source=test&raw=true").to("mock:tcpresult"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/b170152d/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java index 06e9bd9..e36c197 100644 --- a/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java +++ b/components/camel-splunk/src/test/java/org/apache/camel/component/splunk/SplunkComponentConfigurationTest.java @@ -35,6 +35,7 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport { assertEquals(Service.DEFAULT_SCHEME, endpoint.getConfiguration().getScheme()); assertEquals(5000, endpoint.getConfiguration().getConnectionTimeout()); assertFalse(endpoint.getConfiguration().isUseSunHttpsHandler()); + assertFalse(endpoint.getConfiguration().isRaw()); } @Test(expected = IllegalArgumentException.class) @@ -47,9 +48,9 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport { public void createProducerEndpointWithMaximalConfiguration() throws Exception { SplunkComponent component = context.getComponent("splunk", SplunkComponent.class); - SplunkEndpoint endpoint = (SplunkEndpoint)component.createEndpoint("splunk://tcp?username=test&password=pw&host=myhost&port=3333&" - + "tcpReceiverPort=4444&index=myindex&sourceType=testSource&" - + "source=test&owner=me&app=fantasticapp&useSunHttpsHandler=true"); + SplunkEndpoint endpoint = (SplunkEndpoint)component + .createEndpoint("splunk://tcp?username=test&password=pw&host=myhost&port=3333&" + "tcpReceiverPort=4444&index=myindex&sourceType=testSource&" + + "source=test&owner=me&app=fantasticapp&useSunHttpsHandler=true&raw=true"); assertEquals("myhost", endpoint.getConfiguration().getHost()); assertEquals(3333, endpoint.getConfiguration().getPort()); assertEquals("test", endpoint.getConfiguration().getUsername()); @@ -61,6 +62,7 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport { assertEquals("me", endpoint.getConfiguration().getOwner()); assertEquals("fantasticapp", endpoint.getConfiguration().getApp()); assertTrue(endpoint.getConfiguration().isUseSunHttpsHandler()); + assertTrue(endpoint.getConfiguration().isRaw()); } @Test @@ -81,9 +83,9 @@ public class SplunkComponentConfigurationTest extends CamelTestSupport { public void createConsumerEndpointWithMaximalConfiguration() throws Exception { SplunkComponent component = context.getComponent("splunk", SplunkComponent.class); - SplunkEndpoint endpoint = (SplunkEndpoint)component.createEndpoint("splunk://normal?username=test&password=pw&host=myhost&port=3333&delay=10s&" - + "search=Splunk search query goes here&initEarliestTime=-1d" + "&latestTime=now&count=10&" - + "owner=me&app=fantasticapp"); + SplunkEndpoint endpoint = (SplunkEndpoint)component + .createEndpoint("splunk://normal?username=test&password=pw&host=myhost&port=3333&delay=10s&" + "search=Splunk search query goes here&initEarliestTime=-1d" + + "&latestTime=now&count=10&" + "owner=me&app=fantasticapp"); assertEquals("myhost", endpoint.getConfiguration().getHost()); assertEquals(3333, endpoint.getConfiguration().getPort()); assertEquals("test", endpoint.getConfiguration().getUsername());