Repository: camel Updated Branches: refs/heads/camel-2.17.x a4543cffb -> 52c30bf28
CAMEL-9818: Camel kafka consumer adds legacy (deprecated properties) Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/52c30bf2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/52c30bf2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/52c30bf2 Branch: refs/heads/camel-2.17.x Commit: 52c30bf28404b328583f81aa06ccf54db9d3be98 Parents: a4543cf Author: Andrea Cosentino <anco...@gmail.com> Authored: Fri Apr 8 15:03:50 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Apr 8 15:18:33 2016 +0200 ---------------------------------------------------------------------- .../component/kafka/KafkaConfiguration.java | 54 +------------------- .../camel/component/kafka/KafkaEndpoint.java | 24 --------- .../component/kafka/KafkaComponentTest.java | 6 --- 3 files changed, 1 insertion(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/52c30bf2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 6132841..6b67fc5 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -182,18 +182,9 @@ public class KafkaConfiguration { //ssl.truststore.type @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE) private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE; - //timeout.ms - @UriParam(label = "producer", defaultValue = "30000") - private Integer timeoutMs = 30000; - //block.on.buffer.full - @UriParam(label = "producer", defaultValue = "false") - private Boolean blockOnBufferFull = false; //max.in.flight.requests.per.connection @UriParam(label = "producer", defaultValue = "5") private Integer maxInFlightRequest = 5; - //metadata.fetch.timeout.ms - @UriParam(label = "producer", defaultValue = "60000") - private Integer metadataFetchTimeoutMs = 600 * 1000; //metadata.max.age.ms @UriParam(label = "producer", defaultValue = "300000") private Integer metadataMaxAgeMs = 300000; @@ -274,10 +265,7 @@ public class KafkaConfiguration { addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol()); addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider()); addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType()); - addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, getTimeoutMs()); - addPropertyIfNotNull(props, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull()); addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest()); - addPropertyIfNotNull(props, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs()); addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs()); addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters()); addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample()); @@ -387,7 +375,7 @@ public class KafkaConfiguration { } /** - * Name of the topic to use. When used on a consumer endpoint the topic can be a comma separated list of topics. + * Name of the topic to use. */ public void setTopic(String topic) { this.topic = topic; @@ -868,20 +856,6 @@ public class KafkaConfiguration { this.bufferMemorySize = bufferMemorySize; } - public Boolean getBlockOnBufferFull() { - return blockOnBufferFull; - } - - /** - * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. - * By default this setting is true and we block, however in some scenarios blocking is not desirable and it - * is better to immediately give an error. Setting this to false will accomplish that: the producer will throw - * a BufferExhaustedException if a recrord is sent and the buffer space is full. - */ - public void setBlockOnBufferFull(Boolean blockOnBufferFull) { - this.blockOnBufferFull = blockOnBufferFull; - } - public Integer getRequestRequiredAcks() { return requestRequiredAcks; } @@ -1001,20 +975,6 @@ public class KafkaConfiguration { this.receiveBufferBytes = receiveBufferBytes; } - public Integer getTimeoutMs() { - return timeoutMs; - } - - /** - * The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the - * acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments - * are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include - * the network latency of the request. - */ - public void setTimeoutMs(Integer timeoutMs) { - this.timeoutMs = timeoutMs; - } - public Integer getMaxInFlightRequest() { return maxInFlightRequest; } @@ -1027,18 +987,6 @@ public class KafkaConfiguration { this.maxInFlightRequest = maxInFlightRequest; } - public Integer getMetadataFetchTimeoutMs() { - return metadataFetchTimeoutMs; - } - - /** - * The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. - * This fetch to succeed before throwing an exception back to the client. - */ - public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) { - this.metadataFetchTimeoutMs = metadataFetchTimeoutMs; - } - public Integer getMetadataMaxAgeMs() { return metadataMaxAgeMs; } http://git-wip-us.apache.org/repos/asf/camel/blob/52c30bf2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 5a56c39..369537a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -283,10 +283,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS configuration.setBrokers(brokers); } - public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) { - configuration.setMetadataFetchTimeoutMs(metadataFetchTimeoutMs); - } - public String getValueDeserializer() { return configuration.getValueDeserializer(); } @@ -375,10 +371,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS configuration.setSslKeystorePassword(sslKeystorePassword); } - public Boolean getBlockOnBufferFull() { - return configuration.getBlockOnBufferFull(); - } - public void setCheckCrcs(Boolean checkCrcs) { configuration.setCheckCrcs(checkCrcs); } @@ -415,10 +407,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS configuration.setSslKeyPassword(sslKeyPassword); } - public void setBlockOnBufferFull(Boolean blockOnBufferFull) { - configuration.setBlockOnBufferFull(blockOnBufferFull); - } - public Integer getRequestRequiredAcks() { return configuration.getRequestRequiredAcks(); } @@ -495,10 +483,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS return configuration.getSslTruststorePassword(); } - public void setTimeoutMs(Integer timeoutMs) { - configuration.setTimeoutMs(timeoutMs); - } - public void setConsumerStreams(int consumerStreams) { configuration.setConsumerStreams(consumerStreams); } @@ -551,10 +535,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS return configuration.getPartitionAssignor(); } - public Integer getMetadataFetchTimeoutMs() { - return configuration.getMetadataFetchTimeoutMs(); - } - public void setSecurityProtocol(String securityProtocol) { configuration.setSecurityProtocol(securityProtocol); } @@ -655,10 +635,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS return configuration.getSendBufferBytes(); } - public Integer getTimeoutMs() { - return configuration.getTimeoutMs(); - } - public String getSslProtocol() { return configuration.getSslProtocol(); } http://git-wip-us.apache.org/repos/asf/camel/blob/52c30bf2/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index 31c2dd6..1c2c564 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -64,21 +64,18 @@ public class KafkaComponentTest { assertEquals(new Integer(10), endpoint.getProducerBatchSize()); assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs()); assertEquals(new Integer(1), endpoint.getMaxBlockMs()); - assertEquals(false, endpoint.getBlockOnBufferFull()); assertEquals(new Integer(1), endpoint.getBufferMemorySize()); assertEquals("testing", endpoint.getClientId()); assertEquals("none", endpoint.getCompressionCodec()); assertEquals(new Integer(1), endpoint.getLingerMs()); assertEquals(new Integer(100), endpoint.getMaxRequestSize()); assertEquals(100, endpoint.getRequestTimeoutMs().intValue()); - assertEquals(new Integer(9043), endpoint.getMetadataFetchTimeoutMs()); assertEquals(new Integer(1029), endpoint.getMetadataMaxAgeMs()); assertEquals(new Integer(23), endpoint.getReceiveBufferBytes()); assertEquals(new Integer(234), endpoint.getReconnectBackoffMs()); assertEquals(new Integer(0), endpoint.getRetries()); assertEquals(3782, endpoint.getRetryBackoffMs().intValue()); assertEquals(765, endpoint.getSendBufferBytes().intValue()); - assertEquals(new Integer(2045), endpoint.getTimeoutMs()); assertEquals(new Integer(1), endpoint.getMaxInFlightRequest()); assertEquals("org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport", endpoint.getMetricReporters()); assertEquals(new Integer(3), endpoint.getNoOfMetricsSample()); @@ -134,10 +131,7 @@ public class KafkaComponentTest { props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768"); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); props.put(ProducerConfig.SEND_BUFFER_CONFIG, "131072"); - props.put(ProducerConfig.TIMEOUT_CONFIG, "30000"); - props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false"); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); - props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "60000"); props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000"); props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2"); props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");