Repository: camel Updated Branches: refs/heads/master bbf15fdd8 -> ef669f95a
CAMEL-9603 Documentation for Kinesis producer. Fixed one of the headers, even though it means something similar its a discreet value from Kinesis. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ef669f95 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ef669f95 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ef669f95 Branch: refs/heads/master Commit: ef669f95a7cc95ad069db6e43cab39e294a26a19 Parents: bbf15fd Author: John D. Ament <johndam...@apache.org> Authored: Sat Feb 20 07:39:53 2016 -0500 Committer: John D. Ament <johndam...@apache.org> Committed: Sat Feb 20 07:43:48 2016 -0500 ---------------------------------------------------------------------- .../camel-aws/src/main/docs/aws-kinesis.adoc | 29 ++++++++++++++++++++ .../component/aws/kinesis/KinesisConstants.java | 5 ++-- .../component/aws/kinesis/KinesisProducer.java | 4 +-- .../aws/kinesis/KinesisConsumerTest.java | 11 ++++---- .../aws/kinesis/KinesisProducerTest.java | 6 ++-- 5 files changed, 41 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/main/docs/aws-kinesis.adoc ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/docs/aws-kinesis.adoc b/components/camel-aws/src/main/docs/aws-kinesis.adoc index 5f0c28b..6889008 100644 --- a/components/camel-aws/src/main/docs/aws-kinesis.adoc +++ b/components/camel-aws/src/main/docs/aws-kinesis.adoc @@ -114,6 +114,35 @@ however, a different http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html[AWSCredentialsProvider] can be specified when calling createClient(...). +[[AWS-Kinesis-MessageheaderssetbytheKinesisproducer]] +Message headers used by the Kinesis producer to write to Kinesis. The producer expects that the message body is a `ByteBuffer`. ++++++++++++++++++++++++++++++++++++++++ + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Header |Type |Description + +|`CamelAwsKinesisPartitionKey` |`String` |The PartitionKey to pass to Kinesis to store this record. + +|`CamelAwsKinesisSequenceNumber` |`String` |Optional paramter to indicate the sequence number of this record. + +|======================================================================= + +Message headers set by the Kinesis producer on successful storage of a Record ++++++++++++++++++++++++++++++++++++++++ + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Header |Type |Description + +|`CamelAwsKinesisSequenceNumber` |`String` |The sequence number of the record, as defined in +http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_ResponseSyntax[Response Syntax] + +|`CamelAwsKinesisShardId` |`String` |The shard ID of where the Record was stored + + +|======================================================================= + [[AWS-KINESIS-Dependencies]] Dependencies ^^^^^^^^^^^^ http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java index 22da493..23891c6 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java @@ -23,8 +23,7 @@ public interface KinesisConstants { String PARTITION_KEY = "CamelAwsKinesisPartitionKey"; /** - * in a Kinesis Record object, the shard ID is obtained from the getPartitionKey method. + * in a Kinesis Record object, the shard ID is used on writes to indicate where the data was stored */ - String SHARD_ID = "CamelAwsKinesisPartitionKey"; - + String SHARD_ID = "CamelAwsKinesisShardId"; } http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java index 3c48239..8e0c0a3 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisProducer.java @@ -54,12 +54,10 @@ public class KinesisProducer extends DefaultProducer { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setData(body); putRecordRequest.setStreamName(getEndpoint().getStreamName()); + putRecordRequest.setPartitionKey(partitionKey.toString()); if (sequenceNumber != null) { putRecordRequest.setSequenceNumberForOrdering(sequenceNumber.toString()); } - if (partitionKey != null) { - putRecordRequest.setPartitionKey(partitionKey.toString()); - } return putRecordRequest; } } http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java index 62f5b14..8478f26 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java @@ -144,13 +144,15 @@ public class KinesisConsumerTest { @Test public void exchangePropertiesAreSet() throws Exception { + String partitionKey = "partitionKey"; + String sequenceNumber = "1"; when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") .withRecords(new Record() - .withSequenceNumber("1") + .withSequenceNumber(sequenceNumber) .withApproximateArrivalTimestamp(new Date(42)) - .withPartitionKey("shardId") + .withPartitionKey(partitionKey) ) ); @@ -160,9 +162,8 @@ public class KinesisConsumerTest { verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L)); - assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is("shardId")); - assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is("1")); - assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SHARD_ID, String.class), is("shardId")); + assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is(partitionKey)); + assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is(sequenceNumber)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/ef669f95/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java index 3db0023..d0e3250 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisProducerTest.java @@ -43,6 +43,7 @@ public class KinesisProducerTest { private static final String SEQUENCE_NUMBER = "SEQ123"; private static final String STREAM_NAME = "streams"; private static final String SAMPLE_RECORD_BODY = "SAMPLE"; + private static final String PARTITION_KEY = "partition"; private static final ByteBuffer SAMPLE_BUFFER = ByteBuffer.wrap(SAMPLE_RECORD_BODY.getBytes()); @Mock @@ -70,6 +71,7 @@ public class KinesisProducerTest { when(exchange.getPattern()).thenReturn(ExchangePattern.InOut); when(inMessage.getBody(ByteBuffer.class)).thenReturn(SAMPLE_BUFFER); + when(inMessage.getHeader(KinesisConstants.PARTITION_KEY)).thenReturn(PARTITION_KEY); when(putRecordResult.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER); when(putRecordResult.getShardId()).thenReturn(SHARD_ID); @@ -95,10 +97,8 @@ public class KinesisProducerTest { @Test public void shouldHaveProperHeadersWhenSending() throws Exception { - String partitionKey = "partition"; String seqNoForOrdering = "1851"; when(inMessage.getHeader(KinesisConstants.SEQUENCE_NUMBER)).thenReturn(seqNoForOrdering); - when(inMessage.getHeader(KinesisConstants.PARTITION_KEY)).thenReturn(partitionKey); kinesisProducer.process(exchange); @@ -106,7 +106,7 @@ public class KinesisProducerTest { verify(kinesisClient).putRecord(capture.capture()); PutRecordRequest request = capture.getValue(); - assertEquals(partitionKey, request.getPartitionKey()); + assertEquals(PARTITION_KEY, request.getPartitionKey()); assertEquals(seqNoForOrdering, request.getSequenceNumberForOrdering()); verify(outMessage).setHeader(KinesisConstants.SEQUENCE_NUMBER, SEQUENCE_NUMBER); verify(outMessage).setHeader(KinesisConstants.SHARD_ID, SHARD_ID);