Added basic DynamoDb Stream component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78fd81e5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78fd81e5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78fd81e5 Branch: refs/heads/master Commit: 78fd81e5f7861e6fbde3fb9d63519bc1e775c93c Parents: 3da84a6 Author: Candle <can...@candle.me.uk> Authored: Mon Dec 7 09:33:10 2015 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Dec 16 14:19:11 2015 +0100 ---------------------------------------------------------------------- .../aws/ddbstream/DdbStreamComponent.java | 44 +++++++ .../aws/ddbstream/DdbStreamConsumer.java | 128 +++++++++++++++++++ .../aws/ddbstream/DdbStreamEndpoint.java | 117 +++++++++++++++++ .../org/apache/camel/component/aws-ddbstream | 18 +++ 4 files changed, 307 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java new file mode 100644 index 0000000..559597b --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import java.util.Map; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DdbStreamComponent extends UriEndpointComponent { + private static final Logger LOG = LoggerFactory.getLogger(DdbStreamComponent.class); + + public DdbStreamComponent() { + super(DdbStreamEndpoint.class); + } + + public DdbStreamComponent(CamelContext context) { + super(context, DdbStreamEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + DdbStreamEndpoint endpoint = new DdbStreamEndpoint(uri, remaining, this); + + LOG.debug("Created endpoint: {}", endpoint.toString()); + return endpoint; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java new file mode 100644 index 0000000..88d2ba5 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest; +import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult; +import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; +import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; +import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; +import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; +import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest; +import com.amazonaws.services.dynamodbv2.model.ListStreamsResult; +import com.amazonaws.services.dynamodbv2.model.Record; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; +import org.apache.camel.util.CastUtils; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { + private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class); + + private String currentShardIterator = null; + + public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + /* + * Returns the number of messages polled. + */ + @Override + protected int poll() throws Exception { + GetRecordsRequest req = new GetRecordsRequest() + .withShardIterator(getShardItertor()) + .withLimit(getEndpoint().getMaxResultsPerRequest()) + ; + GetRecordsResult result = getClient().getRecords(req); + + Queue<Exchange> exchanges = createExchanges(result.getRecords()); + int processedExchangeCount = processBatch(CastUtils.cast(exchanges)); + + currentShardIterator = result.getNextShardIterator(); + + return processedExchangeCount; + } + + @Override + public int processBatch(Queue<Object> exchanges) throws Exception { + int processedExchanges = 0; + while (!exchanges.isEmpty()) { + final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); + + LOG.trace("Processing exchange [{}] started.", exchange); + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + LOG.trace("Processing exchange [{}] done.", exchange); + } + }); + processedExchanges++; + } + return processedExchanges; + } + + private AmazonDynamoDBStreams getClient() { + return getEndpoint().getClient(); + } + + @Override + public DdbStreamEndpoint getEndpoint() { + return (DdbStreamEndpoint) super.getEndpoint(); + } + + private String getShardItertor() { + // either return a cached one or get a new one via a GetShardIterator request. + if (currentShardIterator == null) { + ListStreamsRequest req0 = new ListStreamsRequest() + .withTableName(getEndpoint().getTableName()) + ; + ListStreamsResult res0 = getClient().listStreams(req0); + final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream + DescribeStreamRequest req1 = new DescribeStreamRequest() + .withStreamArn(streamArn) + ; + DescribeStreamResult res1 = getClient().describeStream(req1); + + GetShardIteratorRequest req = new GetShardIteratorRequest() + .withStreamArn(streamArn) + .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard + .withShardIteratorType(getEndpoint().getIteratorType()) + ; + GetShardIteratorResult result = getClient().getShardIterator(req); + currentShardIterator = result.getShardIterator(); + } + LOG.trace("Shard Iterator is: {}", currentShardIterator); + return currentShardIterator; + } + + private Queue<Exchange> createExchanges(List<Record> records) { + Queue<Exchange> exchanges = new ArrayDeque<>(); + for (Record record : records) { + exchanges.add(getEndpoint().createExchange(record)); + } + return exchanges; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java new file mode 100644 index 0000000..18c042d --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.model.Record; +import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.ScheduledPollEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; + +@UriEndpoint(scheme = "aws-ddbstream", title = "AWS Kinesis", syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams") +public class DdbStreamEndpoint extends ScheduledPollEndpoint { + + @UriPath(label = "consumer,producer", description = "Name of the dynamodb table") + @Metadata(required = "true") + private String tableName; + + // For now, always assume that we've been supplied a client in the Camel registry. + @UriParam(label = "consumer", description = "Amazon DynamoDB client to use for all requests for this endpoint") + @Metadata(required = "true") + private AmazonDynamoDBStreams amazonDynamoDbStreamsClient; + + @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll") + private int maxResultsPerRequest = 1; + + @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records") + private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON; + + public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) { + super(uri, component); + this.tableName = tableName; + } + + @Override + public Producer createProducer() throws Exception { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + DdbStreamConsumer consumer = new DdbStreamConsumer(this, processor); + consumer.setSchedulerProperties(consumer.getEndpoint().getSchedulerProperties()); + return consumer; + } + + Exchange createExchange(Record record) { + Exchange ex = super.createExchange(); + ex.getIn().setBody(record, Record.class); + + return ex; + } + + @Override + public boolean isSingleton() { + // probably right. + return true; + } + + AmazonDynamoDBStreams getClient() { + return amazonDynamoDbStreamsClient; + } + + // required for injection. + public AmazonDynamoDBStreams getAmazonDynamoDBStreamsClient() { + return amazonDynamoDbStreamsClient; + } + + public void setAmazonDynamoDbStreamsClient(AmazonDynamoDBStreams amazonDynamoDbStreamsClient) { + this.amazonDynamoDbStreamsClient = amazonDynamoDbStreamsClient; + } + + public int getMaxResultsPerRequest() { + return maxResultsPerRequest; + } + + public void setMaxResultsPerRequest(int maxResultsPerRequest) { + this.maxResultsPerRequest = maxResultsPerRequest; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public ShardIteratorType getIteratorType() { + return iteratorType; + } + + public void setIteratorType(ShardIteratorType iteratorType) { + this.iteratorType = iteratorType; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream new file mode 100644 index 0000000..48a8509 --- /dev/null +++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream @@ -0,0 +1,18 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +class=org.apache.camel.component.aws.ddbstream.DdbStreamComponent