Repository: camel Updated Branches: refs/heads/master 92013909b -> ad64a54ec
CAMEL-10963 Correcting getEndpointKey call This also includes an additional integration test to verify the change. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ad64a54e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ad64a54e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ad64a54e Branch: refs/heads/master Commit: ad64a54ecf5e2d41ad7c09cbf6242172cc964b6c Parents: 9201390 Author: Tony Tiger <thatrascalti...@gmail.com> Authored: Wed Mar 8 16:15:00 2017 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Mar 8 20:27:44 2017 +0100 ---------------------------------------------------------------------- .../aws/firehose/KinesisFirehoseProducer.java | 2 +- ...KinesisFirehoseComponentIntegrationTest.java | 60 ++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ad64a54e/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java index 6eee663..ce24736 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java @@ -54,7 +54,7 @@ public class KinesisFirehoseProducer extends DefaultProducer { record.setData(body); PutRecordRequest putRecordRequest = new PutRecordRequest(); - putRecordRequest.setDeliveryStreamName(getEndpoint().getEndpointKey()); + putRecordRequest.setDeliveryStreamName(getEndpoint().getStreamName()); putRecordRequest.setRecord(record); return putRecordRequest; } http://git-wip-us.apache.org/repos/asf/camel/blob/ad64a54e/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/integration/KinesisFirehoseComponentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/integration/KinesisFirehoseComponentIntegrationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/integration/KinesisFirehoseComponentIntegrationTest.java new file mode 100644 index 0000000..c7114e4 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/integration/KinesisFirehoseComponentIntegrationTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.firehose.integration; + +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsyncClientBuilder; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.aws.firehose.KinesisFirehoseConstants; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class KinesisFirehoseComponentIntegrationTest extends CamelTestSupport { + + @Test + public void testFirehoseRouting() throws Exception { + Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("my message text"); + } + }); + assertNotNull(exchange.getIn().getHeader(KinesisFirehoseConstants.RECORD_ID)); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + AmazonKinesisFirehose client = AmazonKinesisFirehoseAsyncClientBuilder.defaultClient(); + JndiRegistry registry = super.createRegistry(); + registry.bind("FirehoseClient", client); + return registry; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("aws-kinesis-firehose://mystream?amazonKinesisFirehoseClient=#FirehoseClient"); + } + }; + } +} +