Do not use exchange property to store message data.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e78a0296 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e78a0296 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e78a0296 Branch: refs/heads/master Commit: e78a029601a808c78b4de12a0f495c76f6bec0a4 Parents: 92f6c9b Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Dec 5 10:41:14 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Dec 5 10:41:14 2015 +0100 ---------------------------------------------------------------------- .../camel/component/aws/kinesis/KinesisEndpoint.java | 15 +++++++-------- .../component/aws/kinesis/KinesisConsumerTest.java | 8 ++++---- 2 files changed, 11 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e78a0296/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java index e34da43..5a8b1cd 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java @@ -62,14 +62,13 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { return new KinesisConsumer(this, processor); } - Exchange createExchange(Record record) { - Exchange ex = super.createExchange(); - ex.getIn().setBody(record, Record.class); - ex.setProperty(KinesisConstants.APPROX_ARRIVAL_TIME, record.getApproximateArrivalTimestamp()); - ex.setProperty(KinesisConstants.PARTITION_KEY, record.getPartitionKey()); - ex.setProperty(KinesisConstants.SEQUENCE_NUMBER, record.getSequenceNumber()); - - return ex; + public Exchange createExchange(Record record) { + Exchange exchange = super.createExchange(); + exchange.getIn().setBody(record); + exchange.getIn().setHeader(KinesisConstants.APPROX_ARRIVAL_TIME, record.getApproximateArrivalTimestamp()); + exchange.getIn().setHeader(KinesisConstants.PARTITION_KEY, record.getPartitionKey()); + exchange.getIn().setHeader(KinesisConstants.SEQUENCE_NUMBER, record.getSequenceNumber()); + return exchange; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/e78a0296/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java index 3f31986..62f5b14 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java @@ -159,10 +159,10 @@ public class KinesisConsumerTest { final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class)); - assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L)); - assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.PARTITION_KEY, String.class), is("shardId")); - assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.SEQUENCE_NUMBER, String.class), is("1")); - assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.SHARD_ID, String.class), is("shardId")); + assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L)); + assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is("shardId")); + assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is("1")); + assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SHARD_ID, String.class), is("shardId")); } } \ No newline at end of file