Repository: camel Updated Branches: refs/heads/master 8dbe714d3 -> aaa85fc74
CAMEL-7999: More components include documentation Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c8ce8f83 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c8ce8f83 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c8ce8f83 Branch: refs/heads/master Commit: c8ce8f839bca8dbc39cfe39a6cc4a3cec8ad1292 Parents: 8dbe714 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Jan 5 15:21:28 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jan 5 15:21:28 2015 +0100 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaComponent.java | 10 ++--- .../component/kafka/KafkaConfiguration.java | 47 ++++++++++++++++++++ .../camel/component/kafka/KafkaEndpoint.java | 9 ++-- 3 files changed, 57 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c8ce8f83/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index 845f060..bf33a03 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -19,18 +19,16 @@ package org.apache.camel.component.kafka; import java.util.Map; import org.apache.camel.CamelContext; -import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.impl.UriEndpointComponent; -/** - * - */ -public class KafkaComponent extends DefaultComponent { +public class KafkaComponent extends UriEndpointComponent { public KafkaComponent() { + super(KafkaEndpoint.class); } public KafkaComponent(CamelContext context) { - super(context); + super(context, KafkaEndpoint.class); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/c8ce8f83/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 669e1a0..576cb9e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -19,62 +19,109 @@ package org.apache.camel.component.kafka; import java.util.Properties; import kafka.producer.DefaultPartitioner; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; +@UriParams public class KafkaConfiguration { + + @UriParam private String zookeeperConnect; + @UriParam private String zookeeperHost; + @UriParam(defaultValue = "2181") private int zookeeperPort = 2181; + @UriParam private String topic; + @UriParam private String groupId; + @UriParam(defaultValue = "DefaultPartitioner") private String partitioner = DefaultPartitioner.class.getCanonicalName(); + @UriParam(defaultValue = "10") private int consumerStreams = 10; + @UriParam(defaultValue = "1") private int consumersCount = 1; + @UriParam(defaultValue = "100") private int batchSize = 100; + @UriParam(defaultValue = "10000") private int barrierAwaitTimeoutMs = 10000; //Common configuration properties + @UriParam private String clientId; //Consumer configuration properties + @UriParam private String consumerId; + @UriParam private Integer socketTimeoutMs; + @UriParam private Integer socketReceiveBufferBytes; + @UriParam private Integer fetchMessageMaxBytes; + @UriParam private Boolean autoCommitEnable; + @UriParam private Integer autoCommitIntervalMs; + @UriParam private Integer queuedMaxMessages; + @UriParam private Integer rebalanceMaxRetries; + @UriParam private Integer fetchMinBytes; + @UriParam private Integer fetchWaitMaxMs; + @UriParam private Integer rebalanceBackoffMs; + @UriParam private Integer refreshLeaderBackoffMs; + @UriParam private String autoOffsetReset; + @UriParam private Integer consumerTimeoutMs; //Zookeepr configuration properties + @UriParam private Integer zookeeperSessionTimeoutMs; + @UriParam private Integer zookeeperConnectionTimeoutMs; + @UriParam private Integer zookeeperSyncTimeMs; //Producer configuration properties + @UriParam private String producerType; + @UriParam private String compressionCodec; + @UriParam private String compressedTopics; + @UriParam private Integer messageSendMaxRetries; + @UriParam private Integer retryBackoffMs; + @UriParam private Integer topicMetadataRefreshIntervalMs; //Sync producer config + @UriParam private Integer sendBufferBytes; + @UriParam private short requestRequiredAcks; + @UriParam private Integer requestTimeoutMs; //Async producer config + @UriParam private Integer queueBufferingMaxMs; + @UriParam private Integer queueBufferingMaxMessages; + @UriParam private Integer queueEnqueueTimeoutMs; + @UriParam private Integer batchNumMessages; + @UriParam private String serializerClass; + @UriParam private String keySerializerClass; public KafkaConfiguration() { http://git-wip-us.apache.org/repos/asf/camel/blob/c8ce8f83/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index ce68b47..e0ee9b0 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -28,13 +28,16 @@ import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; -/** - * - */ +@UriEndpoint(scheme = "kafka", consumerClass = KafkaConsumer.class, label = "messaging") public class KafkaEndpoint extends DefaultEndpoint { + @UriPath private String brokers; + @UriParam private KafkaConfiguration configuration = new KafkaConfiguration(); public KafkaEndpoint() {