Fixed CS and polished. This closes #840. This closes #839.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b0c7e793 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b0c7e793 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b0c7e793 Branch: refs/heads/master Commit: b0c7e793def54732264505c30156d0ffc2bc3467 Parents: 0bf2d09 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Feb 15 09:34:12 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Feb 15 09:37:24 2016 +0100 ---------------------------------------------------------------------- .../component/aws/common/AwsExchangeUtil.java | 32 ++++++++++--------- .../component/aws/kinesis/KinesisComponent.java | 6 +--- .../component/aws/kinesis/KinesisEndpoint.java | 24 +------------- .../component/aws/kinesis/KinesisProducer.java | 33 ++++++++------------ .../aws/kinesis/KinesisProducerTest.java | 15 ++++----- 5 files changed, 39 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b0c7e793/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java index 88abfba..8c9eb95 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/common/AwsExchangeUtil.java @@ -1,25 +1,29 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.camel.component.aws.common; import org.apache.camel.Exchange; import org.apache.camel.Message; -public class AwsExchangeUtil { +public final class AwsExchangeUtil { + + private AwsExchangeUtil() { + } + public static Message getMessageForResponse(final Exchange exchange) { if (exchange.getPattern().isOutCapable()) { Message out = exchange.getOut(); http://git-wip-us.apache.org/repos/asf/camel/blob/b0c7e793/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java index 9740600..270104a 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java @@ -21,11 +21,8 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class KinesisComponent extends UriEndpointComponent { - private static final Logger LOG = LoggerFactory.getLogger(KinesisComponent.class); public KinesisComponent() { super(KinesisEndpoint.class); @@ -38,8 +35,7 @@ public class KinesisComponent extends UriEndpointComponent { @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { KinesisEndpoint endpoint = new KinesisEndpoint(uri, remaining, this); - - LOG.debug("Created endpoint: {}", endpoint.toString()); + setProperties(endpoint, parameters); return endpoint; } } http://git-wip-us.apache.org/repos/asf/camel/blob/b0c7e793/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java index fdf1bdd..871c992 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java @@ -24,8 +24,6 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.ScheduledPollEndpoint; -import org.apache.camel.spi.HeaderFilterStrategy; -import org.apache.camel.spi.HeaderFilterStrategyAware; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; @@ -35,7 +33,7 @@ import org.apache.camel.spi.UriPath; * The aws-kinesis component is for consuming records from Amazon Kinesis Streams. */ @UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = "aws-kinesis:streamName", consumerClass = KinesisConsumer.class, label = "cloud,messaging") -public class KinesisEndpoint extends ScheduledPollEndpoint implements HeaderFilterStrategyAware { +public class KinesisEndpoint extends ScheduledPollEndpoint { @UriPath(label = "consumer", description = "Name of the stream") @Metadata(required = "true") @@ -52,9 +50,6 @@ public class KinesisEndpoint extends ScheduledPollEndpoint implements HeaderFilt @UriParam(label = "consumer", description = "Defines where in the Kinesis stream to start getting records") private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON; - @UriParam - private HeaderFilterStrategy headerFilterStrategy; - public KinesisEndpoint(String uri, String streamName, KinesisComponent component) { super(uri, component); this.streamName = streamName; @@ -124,21 +119,4 @@ public class KinesisEndpoint extends ScheduledPollEndpoint implements HeaderFilt this.iteratorType = iteratorType; } - @Override - public HeaderFilterStrategy getHeaderFilterStrategy() { - return headerFilterStrategy; - } - - @Override - /** - * To use a custom HeaderFilterStrategy to map headers to/from Camel. - */ - public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) { - this.headerFilterStrategy = headerFilterStrategy; - } - - @Override - public String toString() { - return "KinesisEndpoint{amazonKinesisClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest + ", iteratorType=" + iteratorType + ", streamName=" + streamName + '}'; - } } http://git-wip-us.apache.org/repos/asf/camel/blob/b0c7e793/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java index 4ecc1f4..3c48239 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java @@ -16,29 +16,20 @@ */ package org.apache.camel.component.aws.kinesis; +import java.nio.ByteBuffer; + import com.amazonaws.services.kinesis.model.PutRecordRequest; import com.amazonaws.services.kinesis.model.PutRecordResult; import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.impl.DefaultProducer; -import java.nio.ByteBuffer; - import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse; public class KinesisProducer extends DefaultProducer { - public KinesisProducer(KinesisEndpoint endpoint) throws NoFactoryAvailableException { - super(endpoint); - } - @Override - public void process(Exchange exchange) throws Exception { - PutRecordRequest request = createRequest(exchange); - PutRecordResult putRecordResult = getEndpoint().getClient().putRecord(request); - Message message = getMessageForResponse(exchange); - message.setHeader(KinesisConstants.SEQUENCE_NUMBER,putRecordResult.getSequenceNumber()); - message.setHeader(KinesisConstants.SHARD_ID, putRecordResult.getShardId()); + public KinesisProducer(KinesisEndpoint endpoint) { + super(endpoint); } @Override @@ -47,24 +38,26 @@ public class KinesisProducer extends DefaultProducer { } @Override - public String toString() { - final StringBuilder sb = new StringBuilder("KinesisProducer{"); - sb.append(super.getEndpoint().getEndpointUri()); - sb.append('}'); - return sb.toString(); + public void process(Exchange exchange) throws Exception { + PutRecordRequest request = createRequest(exchange); + PutRecordResult putRecordResult = getEndpoint().getClient().putRecord(request); + Message message = getMessageForResponse(exchange); + message.setHeader(KinesisConstants.SEQUENCE_NUMBER, putRecordResult.getSequenceNumber()); + message.setHeader(KinesisConstants.SHARD_ID, putRecordResult.getShardId()); } private PutRecordRequest createRequest(Exchange exchange) { ByteBuffer body = exchange.getIn().getBody(ByteBuffer.class); Object partitionKey = exchange.getIn().getHeader(KinesisConstants.PARTITION_KEY); Object sequenceNumber = exchange.getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER); + PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setData(body); putRecordRequest.setStreamName(getEndpoint().getStreamName()); - if(sequenceNumber != null) { + if (sequenceNumber != null) { putRecordRequest.setSequenceNumberForOrdering(sequenceNumber.toString()); } - if(partitionKey != null) { + if (partitionKey != null) { putRecordRequest.setPartitionKey(partitionKey.toString()); } return putRecordRequest; http://git-wip-us.apache.org/repos/asf/camel/blob/b0c7e793/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java index b636522..3db0023 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,15 +16,14 @@ */ package org.apache.camel.component.aws.kinesis; +import java.nio.ByteBuffer; + import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.PutRecordRequest; import com.amazonaws.services.kinesis.model.PutRecordResult; -import com.amazonaws.services.kinesis.model.ShardIteratorType; -import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; -import org.apache.camel.impl.DefaultCamelContext; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,8 +32,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import java.nio.ByteBuffer; - import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.verify; @@ -83,7 +80,7 @@ public class KinesisProducerTest { } @Test - public void shouldPutRecordInRightStreamWhenProcessingExchange() throws Exception{ + public void shouldPutRecordInRightStreamWhenProcessingExchange() throws Exception { kinesisProducer.process(exchange); ArgumentCaptor<PutRecordRequest> capture = ArgumentCaptor.forClass(PutRecordRequest.class);