Repository: camel Updated Branches: refs/heads/master 8d33bc188 -> 98ba70378
CAMEL-10952 Initial work adding Kinesis Firehose Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f3f02f01 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f3f02f01 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f3f02f01 Branch: refs/heads/master Commit: f3f02f018540039ced717e9b84f596472b15f630 Parents: 8d33bc1 Author: Tony Tiger <thatrascalti...@gmail.com> Authored: Mon Mar 6 16:34:03 2017 +0000 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Wed Mar 8 08:40:51 2017 +0100 ---------------------------------------------------------------------- components/camel-aws/pom.xml | 1 + .../docs/aws-kinesis-firehose-component.adoc | 146 +++++++++++++++++++ .../aws/firehose/KinesisFirehoseComponent.java | 41 ++++++ .../aws/firehose/KinesisFirehoseConstants.java | 22 +++ .../aws/firehose/KinesisFirehoseEndpoint.java | 72 +++++++++ .../aws/firehose/KinesisFirehoseProducer.java | 61 ++++++++ .../apache/camel/component/aws-kinesis-firehose | 18 +++ .../firehose/KinesisFirehoseEndpointTest.java | 56 +++++++ .../firehose/KinesisFirehoseProducerTest.java | 82 +++++++++++ ...nesisFirehoseComponentAutoConfiguration.java | 80 ++++++++++ ...dditional-spring-configuration-metadata.json | 6 + .../main/resources/META-INF/spring.factories | 4 +- 12 files changed, 588 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/components/camel-aws/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-aws/pom.xml b/components/camel-aws/pom.xml index 43da218..0fe9938 100644 --- a/components/camel-aws/pom.xml +++ b/components/camel-aws/pom.xml @@ -37,6 +37,7 @@ org.apache.camel.spi.ComponentResolver;component=aws-ddb, org.apache.camel.spi.ComponentResolver;component=aws-ec2, org.apache.camel.spi.ComponentResolver;component=aws-kinesis, + org.apache.camel.spi.ComponentResolver;component=aws-kinesis-firehose, org.apache.camel.spi.ComponentResolver;component=aws-s3, org.apache.camel.spi.ComponentResolver;component=aws-sdb, org.apache.camel.spi.ComponentResolver;component=aws-ses, http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/components/camel-aws/src/main/docs/aws-kinesis-firehose-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/docs/aws-kinesis-firehose-component.adoc b/components/camel-aws/src/main/docs/aws-kinesis-firehose-component.adoc new file mode 100644 index 0000000..b691073 --- /dev/null +++ b/components/camel-aws/src/main/docs/aws-kinesis-firehose-component.adoc @@ -0,0 +1,146 @@ +## AWS Kinesis Firehose Component + +*Available as of Camel version 2.19* + +The Kinesis Firehose component supports sending messages to Amazon Kinesis Firehose service. + +Prerequisites + +You must have a valid Amazon Web Services developer account, and be +signed up to use Amazon Kinesis Firehose. More information are available +at https://aws.amazon.com/kinesis/firehose/[AWS Kinesis Firehose] + +### URI Format + +[source,java] +----------------------------------- +aws-kinesis-firehose://delivery-stream-name[?options] +----------------------------------- + +The stream needs to be created prior to it being used. + + You can append query options to the URI in the following format, +?options=value&option2=value&... + +### URI Options + + +// component options: START +The AWS Kinesis Firehose component has no options. +// component options: END + + + + + + + +// endpoint options: START +The AWS Kinesis Firehose component is configured using the URI syntax with the following path and query parameters: + + aws-kinesis-firehose:streamName + +#### 1 path parameters: + +[width="100%",cols="2,1,1m,6",options="header"] +|======================================================================= +| Name | Default | Java Type | Description +| streamName | | String | *Required* Name of the delivery stream +|======================================================================= + +#### 5 query parameters: + +[width="100%",cols="2,1,1m,1m,5",options="header"] +|======================================================================= +| Name | Group | Default | Java Type | Description +| amazonKinesisFirehoseClient | common | | AmazonKinesisFirehose | *Required* Amazon Kinesis Firehose client to use for all requests for this endpoint +| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). +|======================================================================= +// endpoint options: END + + + + + + +Required Kinesis Firehose component options + +You have to provide the amazonKinesisClient in the +link:registry.html[Registry] with proxies and relevant credentials +configured. + + +### Usage + +#### Amazon Kinesis Firehose configuration + +You will need to create an instance of AmazonKinesisClient and +bind it to the registry + +[source,java] +-------------------------------------------------------------------------------------------------------------------- +ClientConfiguration clientConfiguration = new ClientConfiguration(); +clientConfiguration.setProxyHost("http://myProxyHost"); +clientConfiguration.setProxyPort(8080); + +Region region = Region.getRegion(Regions.fromName(region)); +region.createClient(AmazonKinesisClient.class, null, clientConfiguration); +// the 'null' here is the AWSCredentialsProvider which defaults to an instance of DefaultAWSCredentialsProviderChain + +registry.bind("kinesisFirehoseClient", client); +-------------------------------------------------------------------------------------------------------------------- + +You then have to reference the AmazonKinesisFirehoseClient in the `amazonKinesisFirehoseClient` URI option. + +[source,java] +-------------------------------------------------------------------------------------------------------------------- +from("aws-kinesis-firehose://mykinesisdeliverystream?amazonKinesisFirehoseClient=#kinesisClient") + .to("log:out?showAll=true"); +-------------------------------------------------------------------------------------------------------------------- + +#### Providing AWS Credentials + +It is recommended that the credentials are obtained by using the +http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html[DefaultAWSCredentialsProviderChain] +that is the default when creating a new ClientConfiguration instance, +however, a +different http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html[AWSCredentialsProvider] +can be specified when calling createClient(...). + +#### Message headers set by the Kinesis producer on successful storage of a Record + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Header |Type |Description + +|`CamelAwsKinesisFirehoseRecordId` |`String` |The record ID, as defined in +http://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecord.html#API_PutRecord_ResponseSyntax[Response Syntax] + + +|======================================================================= + +### Dependencies + +Maven users will need to add the following dependency to their pom.xml. + +*pom.xml* + +[source,xml] +--------------------------------------- +<dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-aws</artifactId> + <version>${camel-version}</version> +</dependency> +--------------------------------------- + +where `${camel-version`} must be replaced by the actual version of Camel +(2.19 or higher). + +### See Also + +* link:configuring-camel.html[Configuring Camel] +* link:component.html[Component] +* link:endpoint.html[Endpoint] +* link:getting-started.html[Getting Started] + +* link:aws.html[AWS Component] http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java new file mode 100644 index 0000000..0df1504 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.firehose; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.DefaultComponent; + +public class KinesisFirehoseComponent extends DefaultComponent { + + public KinesisFirehoseComponent() { + + } + + public KinesisFirehoseComponent(CamelContext context) { + super(context); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + KinesisFirehoseEndpoint endpoint = new KinesisFirehoseEndpoint(uri, remaining, this); + setProperties(endpoint, parameters); + return endpoint; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConstants.java new file mode 100644 index 0000000..b85822e --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConstants.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.firehose; + +public interface KinesisFirehoseConstants { + + String RECORD_ID = "CamelAwsKinesisFirehoseRecordId"; +} http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java new file mode 100644 index 0000000..b5ee50c --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.firehose; + +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.aws.kinesis.KinesisConsumer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; + +@UriEndpoint(firstVersion = "2.19.0", scheme = "aws-kinesis-firehose", title = "AWS Kinesis Firehose", syntax = "aws-kinesis-firehose:streamName", producerOnly = true, + consumerClass = KinesisConsumer.class, label = "cloud,messaging") +public class KinesisFirehoseEndpoint extends DefaultEndpoint { + + @UriPath(description = "Name of the stream") + @Metadata(required = "true") + private String streamName; + @UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint") + @Metadata(required = "true") + private AmazonKinesisFirehose amazonKinesisFirehoseClient; + + public KinesisFirehoseEndpoint(String uri, String streamName, KinesisFirehoseComponent component) { + super(uri, component); + this.streamName = streamName; + } + + @Override + public Producer createProducer() throws Exception { + return new KinesisFirehoseProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("You cannot consume messages from this endpoint"); + } + + @Override + public boolean isSingleton() { + return true; + } + + public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) { + this.amazonKinesisFirehoseClient = client; + } + + public AmazonKinesisFirehose getClient() { + return amazonKinesisFirehoseClient; + } + + public String getStreamName() { + return streamName; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java new file mode 100644 index 0000000..6eee663 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.firehose; + +import java.nio.ByteBuffer; + +import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordResult; +import com.amazonaws.services.kinesisfirehose.model.Record; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.impl.DefaultProducer; + +import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; + +public class KinesisFirehoseProducer extends DefaultProducer { + + public KinesisFirehoseProducer(KinesisFirehoseEndpoint endpoint) { + super(endpoint); + } + + @Override + public KinesisFirehoseEndpoint getEndpoint() { + return (KinesisFirehoseEndpoint) super.getEndpoint(); + } + + @Override + public void process(Exchange exchange) throws Exception { + PutRecordRequest request = createRequest(exchange); + log.trace("Sending request [{}] from exchange [{}]...", request, exchange); + PutRecordResult putRecordResult = getEndpoint().getClient().putRecord(request); + log.trace("Received result [{}]", putRecordResult); + Message message = getMessageForResponse(exchange); + message.setHeader(KinesisFirehoseConstants.RECORD_ID, putRecordResult.getRecordId()); + } + + private PutRecordRequest createRequest(Exchange exchange) { + ByteBuffer body = exchange.getIn().getBody(ByteBuffer.class); + Record record = new Record(); + record.setData(body); + + PutRecordRequest putRecordRequest = new PutRecordRequest(); + putRecordRequest.setDeliveryStreamName(getEndpoint().getEndpointKey()); + putRecordRequest.setRecord(record); + return putRecordRequest; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis-firehose ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis-firehose b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis-firehose new file mode 100644 index 0000000..2f91974 --- /dev/null +++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis-firehose @@ -0,0 +1,18 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +class=org.apache.camel.component.aws.firehose.KinesisFirehoseComponent http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java new file mode 100644 index 0000000..2a0ef0f --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.firehose; + +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose; +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +@RunWith(MockitoJUnitRunner.class) +public class KinesisFirehoseEndpointTest { + + @Mock + private AmazonKinesisFirehose amazonKinesisFirehoseClient; + + private CamelContext camelContext; + + @Before + public void setup() throws Exception { + SimpleRegistry registry = new SimpleRegistry(); + registry.put("firehoseClient", amazonKinesisFirehoseClient); + camelContext = new DefaultCamelContext(registry); + } + + @Test + public void allEndpointParams() throws Exception { + KinesisFirehoseEndpoint endpoint = (KinesisFirehoseEndpoint) camelContext.getEndpoint("aws-kinesis-firehose://some_stream_name" + + "?amazonKinesisFirehoseClient=#firehoseClient" + ); + + assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient)); + assertThat(endpoint.getStreamName(), is("some_stream_name")); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java new file mode 100644 index 0000000..2096714 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.firehose; + +import java.nio.ByteBuffer; + +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose; +import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordResult; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class KinesisFirehoseProducerTest { + + private static final String STREAM_NAME = "streams"; + private static final String RECORD_ID = "sample_record_id"; + private static final String SAMPLE_RECORD_BODY = "SAMPLE"; + private static final ByteBuffer SAMPLE_BUFFER = ByteBuffer.wrap(SAMPLE_RECORD_BODY.getBytes()); + + @Mock + private AmazonKinesisFirehose kinesisFirehoseClient; + @Mock + private KinesisFirehoseEndpoint kinesisFirehoseEndpoint; + @Mock + private Message inMessage; + @Mock + private Message outMessage; + @Mock + private PutRecordResult putRecordResult; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Exchange exchange; + + private KinesisFirehoseProducer kinesisFirehoseProducer; + @Before + public void setup() throws Exception { + when(kinesisFirehoseEndpoint.getClient()).thenReturn(kinesisFirehoseClient); + when(kinesisFirehoseEndpoint.getEndpointUri()).thenReturn("aws-kinesis-firehose://etl"); + when(kinesisFirehoseEndpoint.getStreamName()).thenReturn(STREAM_NAME); + when(exchange.getOut()).thenReturn(outMessage); + when(exchange.getIn()).thenReturn(inMessage); + when(exchange.getPattern()).thenReturn(ExchangePattern.InOut); + + when(inMessage.getBody(ByteBuffer.class)).thenReturn(SAMPLE_BUFFER); + + when(putRecordResult.getRecordId()).thenReturn(RECORD_ID); + when(kinesisFirehoseClient.putRecord(any(PutRecordRequest.class))).thenReturn(putRecordResult); + kinesisFirehoseProducer = new KinesisFirehoseProducer(kinesisFirehoseEndpoint); + } + + @Test + public void shouldPutRecordIntoStreamWhenProcessingExchange() throws Exception { + kinesisFirehoseProducer.process(exchange); + verify(outMessage).setHeader(KinesisFirehoseConstants.RECORD_ID, RECORD_ID); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/firehose/springboot/KinesisFirehoseComponentAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/firehose/springboot/KinesisFirehoseComponentAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/firehose/springboot/KinesisFirehoseComponentAutoConfiguration.java new file mode 100644 index 0000000..7f60733 --- /dev/null +++ b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/firehose/springboot/KinesisFirehoseComponentAutoConfiguration.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.firehose.springboot; + +import org.apache.camel.CamelContext; +import org.apache.camel.component.aws.firehose.KinesisFirehoseComponent; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionMessage; +import org.springframework.boot.autoconfigure.condition.ConditionOutcome; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.SpringBootCondition; +import org.springframework.boot.bind.RelaxedPropertyResolver; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; +import org.springframework.core.type.AnnotatedTypeMetadata; + +/** + * Generated by camel-package-maven-plugin - do not edit this file! + */ +@Configuration +@ConditionalOnBean(type = "org.apache.camel.spring.boot.CamelAutoConfiguration") +@Conditional(KinesisFirehoseComponentAutoConfiguration.Condition.class) +@AutoConfigureAfter(name = "org.apache.camel.spring.boot.CamelAutoConfiguration") +public class KinesisFirehoseComponentAutoConfiguration { + + @Lazy + @Bean(name = "aws-kinesis-firehose-component") + @ConditionalOnClass(CamelContext.class) + @ConditionalOnMissingBean(KinesisFirehoseComponent.class) + public KinesisFirehoseComponent configureKinesisFirehoseComponent( + CamelContext camelContext) throws Exception { + KinesisFirehoseComponent component = new KinesisFirehoseComponent(); + component.setCamelContext(camelContext); + return component; + } + + public static class Condition extends SpringBootCondition { + @Override + public ConditionOutcome getMatchOutcome( + ConditionContext conditionContext, + AnnotatedTypeMetadata annotatedTypeMetadata) { + boolean groupEnabled = isEnabled(conditionContext, + "camel.component.", true); + ConditionMessage.Builder message = ConditionMessage + .forCondition("camel.component.aws-kinesis-firehose"); + if (isEnabled(conditionContext, + "camel.component.aws-kinesis-firehose.", groupEnabled)) { + return ConditionOutcome.match(message.because("enabled")); + } + return ConditionOutcome.noMatch(message.because("not enabled")); + } + + private boolean isEnabled( + org.springframework.context.annotation.ConditionContext context, + java.lang.String prefix, boolean defaultValue) { + RelaxedPropertyResolver resolver = new RelaxedPropertyResolver( + context.getEnvironment(), prefix); + return resolver.getProperty("enabled", Boolean.class, defaultValue); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/platforms/spring-boot/components-starter/camel-aws-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json index d043ada..ebf2a0a 100644 --- a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -65,6 +65,12 @@ "name": "camel.component.aws-ddb.enabled", "description": "Enable aws-ddb component", "type": "java.lang.Boolean" + }, + { + "defaultValue": true, + "name": "camel.component.aws-kinesis-firehose.enabled", + "description": "Enable aws-kinesis-firehose component", + "type": "java.lang.Boolean" } ] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f3f02f01/platforms/spring-boot/components-starter/camel-aws-starter/src/main/resources/META-INF/spring.factories ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/resources/META-INF/spring.factories b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/resources/META-INF/spring.factories index 729f3b3..49f3988 100644 --- a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/resources/META-INF/spring.factories +++ b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/resources/META-INF/spring.factories @@ -26,7 +26,9 @@ org.apache.camel.component.aws.ddbstream.springboot.DdbStreamComponentAutoConfig org.apache.camel.component.aws.sqs.springboot.SqsComponentAutoConfiguration,\ org.apache.camel.component.aws.ec2.springboot.EC2ComponentAutoConfiguration,\ org.apache.camel.component.aws.cw.springboot.CwComponentAutoConfiguration,\ -org.apache.camel.component.aws.ddb.springboot.DdbComponentAutoConfiguration +org.apache.camel.component.aws.ddb.springboot.DdbComponentAutoConfiguration,\ +org.apache.camel.component.aws.firehose.springboot.KinesisFirehoseComponentAutoConfiguration +