Repository: camel
Updated Branches:
  refs/heads/master fd659c108 -> d84cc7005


CAMEL-9818: Camel kafka consumer adds legacy (deprecated properties)


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d84cc700
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d84cc700
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d84cc700

Branch: refs/heads/master
Commit: d84cc70056160fdf98b286c52a9d558663d8e8e1
Parents: fd659c1
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Fri Apr 8 15:03:50 2016 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Fri Apr 8 15:05:11 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc |  9 ++--
 .../component/kafka/KafkaConfiguration.java     | 54 +-------------------
 .../camel/component/kafka/KafkaEndpoint.java    | 24 ---------
 .../component/kafka/KafkaComponentTest.java     |  6 ---
 4 files changed, 5 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc 
b/components/camel-kafka/src/main/docs/kafka.adoc
index 7528eb9..0f23849 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -65,8 +65,9 @@ The Kafka component has no options.
 
 
 
+
 // endpoint options: START
-The Kafka component supports 74 endpoint options which are listed below:
+The Kafka component supports 71 endpoint options which are listed below:
 
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
 |=======================================================================
@@ -76,7 +77,7 @@ The Kafka component supports 74 endpoint options which are 
listed below:
 | clientId | common |  | String | The client id is a user-specified string 
sent in each request to help trace calls. It should logically identify the 
application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group 
of consumer processes to which this consumer belongs. By setting the same group 
id multiple processes indicate that they are all part of the same consumer 
group.
 | partitioner | common | 
org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The 
partitioner class for partitioning messages amongst sub-topics. The default 
partitioner is based on the hash of the key.
-| topic | common |  | String | *Required* Name of the topic to use. When used 
on a consumer endpoint the topic can be a comma separated list of topics.
+| topic | common |  | String | *Required* Name of the topic to use.
 | autoCommitEnable | consumer | true | Boolean | If true periodically commit 
to ZooKeeper the offset of messages already fetched by the consumer. This 
committed offset will be used when the process fails as the position from which 
the new consumer will begin.
 | autoCommitIntervalMs | consumer | 5000 | Integer | The frequency in ms that 
the consumer offsets are committed to zookeeper.
 | autoOffsetReset | consumer | latest | String | What to do when there is no 
initial offset in ZooKeeper or if an offset is out of range: smallest : 
automatically reset the offset to the smallest offset largest : automatically 
reset the offset to the largest offset fail: throw exception to the consumer
@@ -98,7 +99,6 @@ The Kafka component supports 74 endpoint options which are 
listed below:
 | sessionTimeoutMs | consumer | 30000 | Integer | The timeout used to detect 
failures when using Kafka's group management facilities.
 | valueDeserializer | consumer | 
org.apache.kafka.common.serialization.StringDeserializer | String | 
Deserializer class for value that implements the Deserializer interface.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the 
consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler 
is enabled then this options is not in use. By default the consumer will deal 
with exceptions that will be logged at WARN/ERROR level and ignored.
-| blockOnBufferFull | producer | false | Boolean | When our memory buffer is 
exhausted we must either stop accepting new records (block) or throw errors. By 
default this setting is true and we block however in some scenarios blocking is 
not desirable and it is better to immediately give an error. Setting this to 
false will accomplish that: the producer will throw a BufferExhaustedException 
if a recrord is sent and the buffer space is full.
 | bufferMemorySize | producer | 33554432 | Integer | The total bytes of memory 
the producer can use to buffer records waiting to be sent to the server. If 
records are sent faster than they can be delivered to the server the producer 
will either block or throw an exception based on the preference specified by 
block.on.buffer.full.This setting should correspond roughly to the total memory 
the producer will use but is not a hard bound since not all memory the producer 
uses is used for buffering. Some additional memory will be used for compression 
(if compression is enabled) as well as for maintaining in-flight requests.
 | compressionCodec | producer | none | String | This parameter allows you to 
specify the compression codec for all data generated by this producer. Valid 
values are none gzip and snappy.
 | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections 
after the number of milliseconds specified by this config.
@@ -111,7 +111,6 @@ The Kafka component supports 74 endpoint options which are 
listed below:
 | maxBlockMs | producer | 60000 | Integer | The configuration controls how 
long sending to kafka will block. These methods can be blocked for multiple 
reasons. For e.g: buffer full metadata unavailable.This configuration imposes 
maximum limit on the total time spent in fetching metadata serialization of key 
and value partitioning and allocation of buffer memory when doing a send(). In 
case of partitionsFor() this configuration imposes a maximum time threshold on 
waiting for metadata
 | maxInFlightRequest | producer | 5 | Integer | The maximum number of 
unacknowledged requests the client will send on a single connection before 
blocking. Note that if this setting is set to be greater than 1 and there are 
failed sends there is a risk of message re-ordering due to retries (i.e. if 
retries are enabled).
 | maxRequestSize | producer | 1048576 | Integer | The maximum size of a 
request. This is also effectively a cap on the maximum record size. Note that 
the server has its own cap on record size which may be different from this. 
This setting will limit the number of record batches the producer will send in 
a single request to avoid sending huge requests.
-| metadataFetchTimeoutMs | producer | 60000 | Integer | The first time data is 
sent to a topic we must fetch metadata about that topic to know which servers 
host the topic's partitions. This fetch to succeed before throwing an exception 
back to the client.
 | metadataMaxAgeMs | producer | 300000 | Integer | The period of time in 
milliseconds after which we force a refresh of metadata even if we haven't seen 
any partition leadership changes to proactively discover any new brokers or 
partitions.
 | metricReporters | producer |  | String | A list of classes to use as metrics 
reporters. Implementing the MetricReporter interface allows plugging in classes 
that will be notified of new metric creation. The JmxReporter is always 
included to register JMX statistics.
 | metricsSampleWindowMs | producer | 30000 | Integer | The number of samples 
maintained to compute metrics.
@@ -142,13 +141,13 @@ The Kafka component supports 74 endpoint options which 
are listed below:
 | sslTruststoreLocation | producer |  | String | The location of the trust 
store file.
 | sslTruststorePassword | producer |  | String | The password for the trust 
store file.
 | sslTruststoreType | producer | JKS | String | The file format of the trust 
store file. Default value is JKS.
-| timeoutMs | producer | 30000 | Integer | The configuration controls the 
maximum amount of time the server will wait for acknowledgments from followers 
to meet the acknowledgment requirements the producer has specified with the 
acks configuration. If the requested number of acknowledgments are not met when 
the timeout elapses an error will be returned. This timeout is measured on the 
server side and does not include the network latency of the request.
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default 
exchange pattern when creating an exchange
 | synchronous | advanced | false | boolean | Sets whether synchronous 
processing should be strictly used or Camel is allowed to use asynchronous 
processing (if supported).
 |=======================================================================
 // endpoint options: END
 
 
+
 For more information about Producer/Consumer configuration:
 
 
http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 49d7fe2..e0580e9 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -184,18 +184,9 @@ public class KafkaConfiguration {
     //ssl.truststore.type
     @UriParam(label = "producer", defaultValue = 
SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE)
     private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
-    //timeout.ms
-    @UriParam(label = "producer", defaultValue = "30000")
-    private Integer timeoutMs = 30000;
-    //block.on.buffer.full
-    @UriParam(label = "producer", defaultValue = "false")
-    private Boolean blockOnBufferFull = false;
     //max.in.flight.requests.per.connection
     @UriParam(label = "producer", defaultValue = "5")
     private Integer maxInFlightRequest = 5;
-    //metadata.fetch.timeout.ms
-    @UriParam(label = "producer", defaultValue = "60000")
-    private Integer metadataFetchTimeoutMs = 600 * 1000;
     //metadata.max.age.ms
     @UriParam(label = "producer", defaultValue = "300000")
     private Integer metadataMaxAgeMs = 300000;
@@ -276,10 +267,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, 
getSslProtocol());
         addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, 
getSslProvider());
         addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
getSslTruststoreType());
-        addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, 
getTimeoutMs());
-        addPropertyIfNotNull(props, 
ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull());
         addPropertyIfNotNull(props, 
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest());
-        addPropertyIfNotNull(props, 
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs());
         addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, 
getMetadataMaxAgeMs());
         addPropertyIfNotNull(props, 
ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
         addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, 
getNoOfMetricsSample());
@@ -389,7 +377,7 @@ public class KafkaConfiguration {
     }
 
     /**
-     * Name of the topic to use. When used on a consumer endpoint the topic 
can be a comma separated list of topics.
+     * Name of the topic to use.
      */
     public void setTopic(String topic) {
         this.topic = topic;
@@ -870,20 +858,6 @@ public class KafkaConfiguration {
         this.bufferMemorySize = bufferMemorySize;
     }
 
-    public Boolean getBlockOnBufferFull() {
-        return blockOnBufferFull;
-    }
-
-    /**
-     * When our memory buffer is exhausted we must either stop accepting new 
records (block) or throw errors.
-     * By default this setting is true and we block, however in some scenarios 
blocking is not desirable and it
-     * is better to immediately give an error. Setting this to false will 
accomplish that: the producer will throw
-     * a BufferExhaustedException if a recrord is sent and the buffer space is 
full.
-     */
-    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
-        this.blockOnBufferFull = blockOnBufferFull;
-    }
-
     public Integer getRequestRequiredAcks() {
         return requestRequiredAcks;
     }
@@ -1003,20 +977,6 @@ public class KafkaConfiguration {
         this.receiveBufferBytes = receiveBufferBytes;
     }
 
-    public Integer getTimeoutMs() {
-        return timeoutMs;
-    }
-
-    /**
-     * The configuration controls the maximum amount of time the server will 
wait for acknowledgments from followers to meet the
-     * acknowledgment requirements the producer has specified with the acks 
configuration. If the requested number of acknowledgments
-     * are not met when the timeout elapses an error will be returned. This 
timeout is measured on the server side and does not include
-     * the network latency of the request.
-     */
-    public void setTimeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-    }
-
     public Integer getMaxInFlightRequest() {
         return maxInFlightRequest;
     }
@@ -1029,18 +989,6 @@ public class KafkaConfiguration {
         this.maxInFlightRequest = maxInFlightRequest;
     }
 
-    public Integer getMetadataFetchTimeoutMs() {
-        return metadataFetchTimeoutMs;
-    }
-
-    /**
-     * The first time data is sent to a topic we must fetch metadata about 
that topic to know which servers host the topic's partitions.
-     * This fetch to succeed before throwing an exception back to the client.
-     */
-    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
-        this.metadataFetchTimeoutMs = metadataFetchTimeoutMs;
-    }
-
     public Integer getMetadataMaxAgeMs() {
         return metadataMaxAgeMs;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 327ecdc..1c239c8 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -283,10 +283,6 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         configuration.setBrokers(brokers);
     }
 
-    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
-        configuration.setMetadataFetchTimeoutMs(metadataFetchTimeoutMs);
-    }
-
     public String getValueDeserializer() {
         return configuration.getValueDeserializer();
     }
@@ -375,10 +371,6 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         configuration.setSslKeystorePassword(sslKeystorePassword);
     }
 
-    public Boolean getBlockOnBufferFull() {
-        return configuration.getBlockOnBufferFull();
-    }
-
     public void setCheckCrcs(Boolean checkCrcs) {
         configuration.setCheckCrcs(checkCrcs);
     }
@@ -415,10 +407,6 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         configuration.setSslKeyPassword(sslKeyPassword);
     }
 
-    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
-        configuration.setBlockOnBufferFull(blockOnBufferFull);
-    }
-
     public Integer getRequestRequiredAcks() {
         return configuration.getRequestRequiredAcks();
     }
@@ -495,10 +483,6 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         return configuration.getSslTruststorePassword();
     }
 
-    public void setTimeoutMs(Integer timeoutMs) {
-        configuration.setTimeoutMs(timeoutMs);
-    }
-
     public void setConsumerStreams(int consumerStreams) {
         configuration.setConsumerStreams(consumerStreams);
     }
@@ -551,10 +535,6 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         return configuration.getPartitionAssignor();
     }
 
-    public Integer getMetadataFetchTimeoutMs() {
-        return configuration.getMetadataFetchTimeoutMs();
-    }
-
     public void setSecurityProtocol(String securityProtocol) {
         configuration.setSecurityProtocol(securityProtocol);
     }
@@ -655,10 +635,6 @@ public class KafkaEndpoint extends DefaultEndpoint 
implements MultipleConsumersS
         return configuration.getSendBufferBytes();
     }
 
-    public Integer getTimeoutMs() {
-        return configuration.getTimeoutMs();
-    }
-
     public String getSslProtocol() {
         return configuration.getSslProtocol();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 31c2dd6..1c2c564 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -64,21 +64,18 @@ public class KafkaComponentTest {
         assertEquals(new Integer(10), endpoint.getProducerBatchSize());
         assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs());
         assertEquals(new Integer(1), endpoint.getMaxBlockMs());
-        assertEquals(false, endpoint.getBlockOnBufferFull());
         assertEquals(new Integer(1), endpoint.getBufferMemorySize());
         assertEquals("testing", endpoint.getClientId());
         assertEquals("none", endpoint.getCompressionCodec());
         assertEquals(new Integer(1), endpoint.getLingerMs());
         assertEquals(new Integer(100), endpoint.getMaxRequestSize());
         assertEquals(100, endpoint.getRequestTimeoutMs().intValue());
-        assertEquals(new Integer(9043), endpoint.getMetadataFetchTimeoutMs());
         assertEquals(new Integer(1029), endpoint.getMetadataMaxAgeMs());
         assertEquals(new Integer(23), endpoint.getReceiveBufferBytes());
         assertEquals(new Integer(234), endpoint.getReconnectBackoffMs());
         assertEquals(new Integer(0), endpoint.getRetries());
         assertEquals(3782, endpoint.getRetryBackoffMs().intValue());
         assertEquals(765, endpoint.getSendBufferBytes().intValue());
-        assertEquals(new Integer(2045), endpoint.getTimeoutMs());
         assertEquals(new Integer(1), endpoint.getMaxInFlightRequest());
         
assertEquals("org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport",
 endpoint.getMetricReporters());
         assertEquals(new Integer(3), endpoint.getNoOfMetricsSample());
@@ -134,10 +131,7 @@ public class KafkaComponentTest {
         props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
         props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
         props.put(ProducerConfig.SEND_BUFFER_CONFIG, "131072");
-        props.put(ProducerConfig.TIMEOUT_CONFIG, "30000");
-        props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false");
         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
-        props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "60000");
         props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
         props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2");
         props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");

Reply via email to